Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:11

0001 #!/usr/bin/env python3
0002 import urllib
0003 from . import Config
0004 import string
0005 import os
0006 import sys
0007 import subprocess
0008 import time
0009 import optparse
0010 
0011 def dasQuery(query,config):
0012   cmd = config.dasClient+" --limit=9999 --query=\"%s\""%query
0013   print(cmd)
0014   output = subprocess.getstatusoutput(config.initEnv+cmd)
0015   if output[0]!=0:
0016     print("DAS CLIENT ERROR %s"%output[0])
0017     print(output[1])
0018     sys.exit()
0019   return(output[1].splitlines())
0020 
0021 def checkDatasetStructure(dataset,silent=False):
0022    goodDataset=True
0023    d = dataset.split("/")
0024    if not len(d) == 4:
0025       if not silent:
0026          print("Bad dataset. Expecting 4 '/'")
0027       goodDataset=False
0028       return False
0029    if not d[0]=='':
0030       if not silent:
0031          print("Bad dataset. Expecting nothing before first '/'")
0032       goodDataset=False
0033    if not len(d[1])>0 or not len(d[2]) > 0 or not len(d[3]) > 0:
0034       if not silent:
0035          print("Bad dataset. Expecting text between '/'")
0036       goodDataset=False
0037    if os.path.isdir(dataset):
0038       if not silent:
0039          print("Bad dataset. Can't be an existing directory")
0040       goodDataset=False
0041    return goodDataset
0042 
0043 def getDatasetFromPattern(pattern,conf):
0044    if not checkDatasetStructure(pattern):
0045       print("FATAL ERROR, bad dataset pattern")
0046       return([])
0047    result = dasQuery("dataset dataset=%s"%pattern,conf)
0048    datasets = []
0049    for line in result:
0050       print(line)
0051       if checkDatasetStructure(line,silent=False):
0052          datasets.append(line)
0053    return datasets
0054 
0055 def getRunsFromDataset(dataset,conf):
0056    if not checkDatasetStructure(dataset):
0057       print("FATAL ERROR, bad dataset pattern")
0058       return([])
0059    result = dasQuery("run dataset=%s"%dataset,conf)
0060    runs=[]
0061    for line in result:
0062       if line.isdigit:
0063          if len(line)==6: #We want the run number to be btw 100.000 and 999.999
0064             runs.append(int(line))
0065    runs.sort()
0066    return runs
0067 
0068 def getNumberOfEvents(run,dataset,conf):
0069    if not int(run) > 99999 or not int(run)<1000000:
0070       print("Invalid run number")
0071       return 0
0072    if not checkDatasetStructure(dataset):
0073       print("Invalid dataset")
0074       return 0
0075 
0076 
0077 
0078    NEventsDasOut = dasQuery("summary run=%s dataset=%s |grep summary.nevents"%(run,dataset),conf)[-1].replace(" ","")
0079    if not NEventsDasOut.isdigit():
0080       print("Invalid number of events:")
0081       print("__%s__"%NEventsDasOut)
0082       return 0
0083    else:
0084       return int(NEventsDasOut)
0085 
0086 def getNumberOfFiles (run,dataset,conf):
0087    if not int(run) > 99999 or not int(run)<1000000:
0088       print("Invalid run number")
0089       return 0
0090    if not checkDatasetStructure(dataset):
0091       print("Invalid dataset")
0092       return 0
0093    NFilesDasOut = dasQuery('summary dataset=%s run=%s | grep summary.nfiles'%(dataset,run),conf)[-1].replace(" ","")
0094    if not NFilesDasOut.isdigit():
0095       print("Invalid number of files.")
0096       return 0
0097    else :
0098       return int(NFilesDasOut)
0099 
0100 
0101 def reSubmitJob(run, dataset, conf, first, last):
0102    print("Re-submitting jobs for run = %s, dataset = %s"%(run, dataset))
0103 
0104 
0105 
0106 def submitJobs(run, dataset, nFiles, conf):
0107    print("Submitting jobs for run = %s, dataset = %s"%(run, dataset))
0108 
0109    #GET THE LIST OF FILE FROM THE DATABASE
0110    files = ''
0111    if not checkDatasetStructure(dataset,conf):
0112       print("FATAL ERROR, bad dataset")
0113       return -1
0114    if not run > 99999 or not run<1000000:
0115       print("FATAL ERROR, bad run number")
0116       return -1
0117    filesList = dasQuery('file dataset=%s run=%s'%(dataset,run),conf)
0118    filesInJob = 0
0119    firstFile = 0
0120    for f in filesList:
0121       if(not f.startswith('/store')):continue
0122       if filesInJob<conf.nFilesPerJob:
0123          files+="'"+f+"',"
0124          filesInJob+=1
0125       else:
0126          firstFile = firstFile+filesInJob
0127          sendJob(dataset,run,files,conf,firstFile)
0128          files="'"+f+"',"
0129          filesInJob=1
0130    sendJob(dataset,run,files,conf,firstFile)
0131 
0132 def sendJob(dataset,run,files,conf,firstFile):
0133    cmd = "python %s/submitCalibTree/runJob.py -f %s --firstFile %s -d %s -r %s "%(conf.RUNDIR, files,firstFile,dataset,run)
0134    if conf.AAG:
0135       cmd+=" -a "
0136    bsub = 'bsub -q 2nd -J calibTree_' + str(run) + '_' + str(firstFile)+ '_' + '_%s'%("Aag" if conf.AAG else 'Std')+' -R "type == SLC6_64 && pool > 30000" ' + ' "'+cmd+'"'
0137    conf.launchedRuns.append([run,firstFile])
0138    if conf.submit:
0139       os.system(bsub)
0140    else:
0141       print(cmd + " --stageout False")
0142 
0143 def generateJobs(conf):
0144    print("Gathering jobs to launch.")
0145    print(conf)
0146    lastRunProcessed = conf.firstRun
0147    datasets = getDatasetFromPattern(conf.datasetPat,conf)
0148    for d in datasets:
0149       datasetRuns = getRunsFromDataset(d,conf)
0150       print(datasetRuns)
0151       for r in datasetRuns:
0152          if int(r) > conf.firstRun and int(r)<conf.lastRun:
0153             print("Checking run %s"%r)
0154             n=getNumberOfEvents(r,d,conf)
0155             if n < 250:
0156                print("Skipped. (%s evt)"%n)
0157             else:
0158                nFiles = getNumberOfFiles(r,d,conf)
0159                if nFiles > 0:
0160                   print("Will be processed ! (%s evt, %s files)"%(n,nFiles))
0161                   if r > lastRunProcessed:
0162                      lastRunProcessed = r
0163                   submitJobs(r,d,nFiles,conf)
0164                else:
0165                   print("Skipped. (%s evt,%s files)"%(n,nFiles))
0166          else:
0167             for failled in conf.relaunchList:
0168                if int(failled[0]) == int(r):
0169                   print("Relaunching job %s "% failled)
0170                   if len(failled)==3:
0171                      reSubmitJob(int(failled[0]),d,conf,failled[1],failled[2])
0172                   else:
0173                      submitJobs(int(failled[0]),d,25,conf)
0174    return lastRunProcessed
0175 
0176 
0177 def cleanUp():
0178    os.system('rm core.*')
0179 
0180 
0181 def checkCorrupted(lastGood, config):
0182    calibTreeList = ""
0183    print("Get the list of calibTree from" + config.CASTORDIR + ")")
0184    calibTreeInfo = subprocess.getstatusoutput(config.eosLs +config.CASTORDIR)[1].split('\n');
0185    NTotalEvents = 0;
0186    runList = []
0187 
0188    for info in calibTreeInfo:
0189       subParts = info.split();
0190       if(len(subParts)<4):  continue
0191 
0192       runList.append(subParts[-1].replace("calibTree_","").replace(".root","").split("_"))
0193    print(runList)
0194    datasets = getDatasetFromPattern(config.datasetPat,config)
0195    for d in datasets:
0196       datasetRuns = getRunsFromDataset(d,config)
0197       print(datasetRuns)
0198       for r in datasetRuns:
0199          if int(r) > lastGood:
0200             print("Checking run %s"%r)
0201             n=getNumberOfEvents(r,d,config)
0202             if n < 250:
0203                print("Skipped. (%s evt)"%n)
0204             else:
0205                nFiles = getNumberOfFiles(r,d,config)
0206                if nFiles < 25:
0207                   print("Found run %s ? %s"%(r,[str(r)] in runList))
0208                else:
0209                   x=25
0210                   while x<nFiles:
0211                      print("Found run %s , %s ? %s "%(r,x, [str(r),str(x)] in runList))
0212                      x+=25
0213 
0214 
0215 
0216 
0217 
0218    #for line in runList:
0219    #   print line
0220       #file = "root://eoscms/"+config.CASTORDIR+"/"+subParts[-1]
0221       #print("Checking " + file)
0222       #results = subprocess.getstatusoutput(config.initEnv+'root -l -b -q ' + file)
0223       #if(len(results[1].splitlines())>3):
0224       #   print(results[1]);
0225       #   print("add " + str(run) + " to the list of failled runs")
0226 #         os.system('echo ' + str(run) + ' >> FailledRun%s.txt'%('_Aag' if AAG else ''))
0227 
0228 
0229 
0230 if __name__ == "__main__":
0231    print("DEBUG DEBUG DEBUG")
0232    c = Config.configuration(False)
0233    c.runNumber = 1
0234    c.firstRun  = 274500
0235    c.lastRun   = 275000
0236    c.debug     = True
0237    #generateJobs(c)
0238    checkCorrupted(0,c)
0239 
0240 if False:
0241 #elif(checkCorrupted):
0242    #### FIND ALL CORRUPTED FILES ON CASTOR AND MARK THEM AS FAILLED RUN
0243 
0244    calibTreeList = ""
0245    print("Get the list of calibTree from" + CASTORDIR + ")")
0246    calibTreeInfo = subprocess.getstatusoutput(initEnv+"eos ls " + CASTORDIR)[1].split('\n');
0247    NTotalEvents = 0;
0248    run = 0
0249    for info in calibTreeInfo:
0250       subParts = info.split();
0251       if(len(subParts)<4):continue
0252 
0253       run = int(subParts[4].replace("/calibTree_","").replace(".root","").replace(CASTORDIR,""))
0254       file = "root://eoscms//eos/cms"+subParts[4]
0255       print("Checking " + file)
0256       results = subprocess.getstatusoutput(initEnv+'root -l -b -q ' + file)
0257       if(len(results[1].splitlines())>3):
0258          print(results[1]);
0259          print("add " + str(run) + " to the list of failled runs")
0260          os.system('echo ' + str(run) + ' >> FailledRun%s.txt'%('_Aag' if AAG else ''))
0261 
0262    #### If mode = All, relaunch with mode = Aag
0263    if opt.datasetType.lower()=="all":
0264       system("cd "+RUNDIR+"; python SubmitJobs.py -c -d Aag")
0265 
0266 #else:
0267    #### UNKNOWN CASE
0268 #   print "unknown argument: make sure you know what you are doing?"