Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 11:59:48

0001 #!/usr/bin/env python3
0002 from __future__ import absolute_import
0003 from __future__ import print_function
0004 import urllib
0005 from . import Config
0006 import string
0007 import os
0008 import sys
0009 import subprocess
0010 import time
0011 import optparse
0012 
0013 def dasQuery(query,config):
0014   cmd = config.dasClient+" --limit=9999 --query=\"%s\""%query
0015   print(cmd)
0016   output = subprocess.getstatusoutput(config.initEnv+cmd)
0017   if output[0]!=0:
0018     print("DAS CLIENT ERROR %s"%output[0])
0019     print(output[1])
0020     sys.exit()
0021   return(output[1].splitlines())
0022 
0023 def checkDatasetStructure(dataset,silent=False):
0024    goodDataset=True
0025    d = dataset.split("/")
0026    if not len(d) == 4:
0027       if not silent:
0028          print("Bad dataset. Expecting 4 '/'")
0029       goodDataset=False
0030       return False
0031    if not d[0]=='':
0032       if not silent:
0033          print("Bad dataset. Expecting nothing before first '/'")
0034       goodDataset=False
0035    if not len(d[1])>0 or not len(d[2]) > 0 or not len(d[3]) > 0:
0036       if not silent:
0037          print("Bad dataset. Expecting text between '/'")
0038       goodDataset=False
0039    if os.path.isdir(dataset):
0040       if not silent:
0041          print("Bad dataset. Can't be an existing directory")
0042       goodDataset=False
0043    return goodDataset
0044 
0045 def getDatasetFromPattern(pattern,conf):
0046    if not checkDatasetStructure(pattern):
0047       print("FATAL ERROR, bad dataset pattern")
0048       return([])
0049    result = dasQuery("dataset dataset=%s"%pattern,conf)
0050    datasets = []
0051    for line in result:
0052       print(line)
0053       if checkDatasetStructure(line,silent=False):
0054          datasets.append(line)
0055    return datasets
0056 
0057 def getRunsFromDataset(dataset,conf):
0058    if not checkDatasetStructure(dataset):
0059       print("FATAL ERROR, bad dataset pattern")
0060       return([])
0061    result = dasQuery("run dataset=%s"%dataset,conf)
0062    runs=[]
0063    for line in result:
0064       if line.isdigit:
0065          if len(line)==6: #We want the run number to be btw 100.000 and 999.999
0066             runs.append(int(line))
0067    runs.sort()
0068    return runs
0069 
0070 def getNumberOfEvents(run,dataset,conf):
0071    if not int(run) > 99999 or not int(run)<1000000:
0072       print("Invalid run number")
0073       return 0
0074    if not checkDatasetStructure(dataset):
0075       print("Invalid dataset")
0076       return 0
0077 
0078 
0079 
0080    NEventsDasOut = dasQuery("summary run=%s dataset=%s |grep summary.nevents"%(run,dataset),conf)[-1].replace(" ","")
0081    if not NEventsDasOut.isdigit():
0082       print("Invalid number of events:")
0083       print("__%s__"%NEventsDasOut)
0084       return 0
0085    else:
0086       return int(NEventsDasOut)
0087 
0088 def getNumberOfFiles (run,dataset,conf):
0089    if not int(run) > 99999 or not int(run)<1000000:
0090       print("Invalid run number")
0091       return 0
0092    if not checkDatasetStructure(dataset):
0093       print("Invalid dataset")
0094       return 0
0095    NFilesDasOut = dasQuery('summary dataset=%s run=%s | grep summary.nfiles'%(dataset,run),conf)[-1].replace(" ","")
0096    if not NFilesDasOut.isdigit():
0097       print("Invalid number of files.")
0098       return 0
0099    else :
0100       return int(NFilesDasOut)
0101 
0102 
0103 def reSubmitJob(run, dataset, conf, first, last):
0104    print("Re-submitting jobs for run = %s, dataset = %s"%(run, dataset))
0105 
0106 
0107 
0108 def submitJobs(run, dataset, nFiles, conf):
0109    print("Submitting jobs for run = %s, dataset = %s"%(run, dataset))
0110 
0111    #GET THE LIST OF FILE FROM THE DATABASE
0112    files = ''
0113    if not checkDatasetStructure(dataset,conf):
0114       print("FATAL ERROR, bad dataset")
0115       return -1
0116    if not run > 99999 or not run<1000000:
0117       print("FATAL ERROR, bad run number")
0118       return -1
0119    filesList = dasQuery('file dataset=%s run=%s'%(dataset,run),conf)
0120    filesInJob = 0
0121    firstFile = 0
0122    for f in filesList:
0123       if(not f.startswith('/store')):continue
0124       if filesInJob<conf.nFilesPerJob:
0125          files+="'"+f+"',"
0126          filesInJob+=1
0127       else:
0128          firstFile = firstFile+filesInJob
0129          sendJob(dataset,run,files,conf,firstFile)
0130          files="'"+f+"',"
0131          filesInJob=1
0132    sendJob(dataset,run,files,conf,firstFile)
0133 
0134 def sendJob(dataset,run,files,conf,firstFile):
0135    cmd = "python %s/submitCalibTree/runJob.py -f %s --firstFile %s -d %s -r %s "%(conf.RUNDIR, files,firstFile,dataset,run)
0136    if conf.AAG:
0137       cmd+=" -a "
0138    bsub = 'bsub -q 2nd -J calibTree_' + str(run) + '_' + str(firstFile)+ '_' + '_%s'%("Aag" if conf.AAG else 'Std')+' -R "type == SLC6_64 && pool > 30000" ' + ' "'+cmd+'"'
0139    conf.launchedRuns.append([run,firstFile])
0140    if conf.submit:
0141       os.system(bsub)
0142    else:
0143       print(cmd + " --stageout False")
0144 
0145 def generateJobs(conf):
0146    print("Gathering jobs to launch.")
0147    print(conf)
0148    lastRunProcessed = conf.firstRun
0149    datasets = getDatasetFromPattern(conf.datasetPat,conf)
0150    for d in datasets:
0151       datasetRuns = getRunsFromDataset(d,conf)
0152       print(datasetRuns)
0153       for r in datasetRuns:
0154          if int(r) > conf.firstRun and int(r)<conf.lastRun:
0155             print("Checking run %s"%r)
0156             n=getNumberOfEvents(r,d,conf)
0157             if n < 250:
0158                print("Skipped. (%s evt)"%n)
0159             else:
0160                nFiles = getNumberOfFiles(r,d,conf)
0161                if nFiles > 0:
0162                   print("Will be processed ! (%s evt, %s files)"%(n,nFiles))
0163                   if r > lastRunProcessed:
0164                      lastRunProcessed = r
0165                   submitJobs(r,d,nFiles,conf)
0166                else:
0167                   print("Skipped. (%s evt,%s files)"%(n,nFiles))
0168          else:
0169             for failled in conf.relaunchList:
0170                if int(failled[0]) == int(r):
0171                   print("Relaunching job %s "% failled)
0172                   if len(failled)==3:
0173                      reSubmitJob(int(failled[0]),d,conf,failled[1],failled[2])
0174                   else:
0175                      submitJobs(int(failled[0]),d,25,conf)
0176    return lastRunProcessed
0177 
0178 
0179 def cleanUp():
0180    os.system('rm core.*')
0181 
0182 
0183 def checkCorrupted(lastGood, config):
0184    calibTreeList = ""
0185    print("Get the list of calibTree from" + config.CASTORDIR + ")")
0186    calibTreeInfo = subprocess.getstatusoutput(config.eosLs +config.CASTORDIR)[1].split('\n');
0187    NTotalEvents = 0;
0188    runList = []
0189 
0190    for info in calibTreeInfo:
0191       subParts = info.split();
0192       if(len(subParts)<4):  continue
0193 
0194       runList.append(subParts[-1].replace("calibTree_","").replace(".root","").split("_"))
0195    print(runList)
0196    datasets = getDatasetFromPattern(config.datasetPat,config)
0197    for d in datasets:
0198       datasetRuns = getRunsFromDataset(d,config)
0199       print(datasetRuns)
0200       for r in datasetRuns:
0201          if int(r) > lastGood:
0202             print("Checking run %s"%r)
0203             n=getNumberOfEvents(r,d,config)
0204             if n < 250:
0205                print("Skipped. (%s evt)"%n)
0206             else:
0207                nFiles = getNumberOfFiles(r,d,config)
0208                if nFiles < 25:
0209                   print("Found run %s ? %s"%(r,[str(r)] in runList))
0210                else:
0211                   x=25
0212                   while x<nFiles:
0213                      print("Found run %s , %s ? %s "%(r,x, [str(r),str(x)] in runList))
0214                      x+=25
0215 
0216 
0217 
0218 
0219 
0220    #for line in runList:
0221    #   print line
0222       #file = "root://eoscms/"+config.CASTORDIR+"/"+subParts[-1]
0223       #print("Checking " + file)
0224       #results = subprocess.getstatusoutput(config.initEnv+'root -l -b -q ' + file)
0225       #if(len(results[1].splitlines())>3):
0226       #   print(results[1]);
0227       #   print("add " + str(run) + " to the list of failled runs")
0228 #         os.system('echo ' + str(run) + ' >> FailledRun%s.txt'%('_Aag' if AAG else ''))
0229 
0230 
0231 
0232 if __name__ == "__main__":
0233    print("DEBUG DEBUG DEBUG")
0234    c = Config.configuration(False)
0235    c.runNumber = 1
0236    c.firstRun  = 274500
0237    c.lastRun   = 275000
0238    c.debug     = True
0239    #generateJobs(c)
0240    checkCorrupted(0,c)
0241 
0242 if False:
0243 #elif(checkCorrupted):
0244    #### FIND ALL CORRUPTED FILES ON CASTOR AND MARK THEM AS FAILLED RUN
0245 
0246    calibTreeList = ""
0247    print("Get the list of calibTree from" + CASTORDIR + ")")
0248    calibTreeInfo = subprocess.getstatusoutput(initEnv+"eos ls " + CASTORDIR)[1].split('\n');
0249    NTotalEvents = 0;
0250    run = 0
0251    for info in calibTreeInfo:
0252       subParts = info.split();
0253       if(len(subParts)<4):continue
0254 
0255       run = int(subParts[4].replace("/calibTree_","").replace(".root","").replace(CASTORDIR,""))
0256       file = "root://eoscms//eos/cms"+subParts[4]
0257       print("Checking " + file)
0258       results = subprocess.getstatusoutput(initEnv+'root -l -b -q ' + file)
0259       if(len(results[1].splitlines())>3):
0260          print(results[1]);
0261          print("add " + str(run) + " to the list of failled runs")
0262          os.system('echo ' + str(run) + ' >> FailledRun%s.txt'%('_Aag' if AAG else ''))
0263 
0264    #### If mode = All, relaunch with mode = Aag
0265    if opt.datasetType.lower()=="all":
0266       system("cd "+RUNDIR+"; python SubmitJobs.py -c -d Aag")
0267 
0268 #else:
0269    #### UNKNOWN CASE
0270 #   print "unknown argument: make sure you know what you are doing?"