File indexing completed on 2024-11-25 02:29:11
0001
0002 import urllib
0003 from . import Config
0004 import string
0005 import os
0006 import sys
0007 import subprocess
0008 import time
0009 import optparse
0010
0011 def dasQuery(query,config):
0012 cmd = config.dasClient+" --limit=9999 --query=\"%s\""%query
0013 print(cmd)
0014 output = subprocess.getstatusoutput(config.initEnv+cmd)
0015 if output[0]!=0:
0016 print("DAS CLIENT ERROR %s"%output[0])
0017 print(output[1])
0018 sys.exit()
0019 return(output[1].splitlines())
0020
0021 def checkDatasetStructure(dataset,silent=False):
0022 goodDataset=True
0023 d = dataset.split("/")
0024 if not len(d) == 4:
0025 if not silent:
0026 print("Bad dataset. Expecting 4 '/'")
0027 goodDataset=False
0028 return False
0029 if not d[0]=='':
0030 if not silent:
0031 print("Bad dataset. Expecting nothing before first '/'")
0032 goodDataset=False
0033 if not len(d[1])>0 or not len(d[2]) > 0 or not len(d[3]) > 0:
0034 if not silent:
0035 print("Bad dataset. Expecting text between '/'")
0036 goodDataset=False
0037 if os.path.isdir(dataset):
0038 if not silent:
0039 print("Bad dataset. Can't be an existing directory")
0040 goodDataset=False
0041 return goodDataset
0042
0043 def getDatasetFromPattern(pattern,conf):
0044 if not checkDatasetStructure(pattern):
0045 print("FATAL ERROR, bad dataset pattern")
0046 return([])
0047 result = dasQuery("dataset dataset=%s"%pattern,conf)
0048 datasets = []
0049 for line in result:
0050 print(line)
0051 if checkDatasetStructure(line,silent=False):
0052 datasets.append(line)
0053 return datasets
0054
0055 def getRunsFromDataset(dataset,conf):
0056 if not checkDatasetStructure(dataset):
0057 print("FATAL ERROR, bad dataset pattern")
0058 return([])
0059 result = dasQuery("run dataset=%s"%dataset,conf)
0060 runs=[]
0061 for line in result:
0062 if line.isdigit:
0063 if len(line)==6:
0064 runs.append(int(line))
0065 runs.sort()
0066 return runs
0067
0068 def getNumberOfEvents(run,dataset,conf):
0069 if not int(run) > 99999 or not int(run)<1000000:
0070 print("Invalid run number")
0071 return 0
0072 if not checkDatasetStructure(dataset):
0073 print("Invalid dataset")
0074 return 0
0075
0076
0077
0078 NEventsDasOut = dasQuery("summary run=%s dataset=%s |grep summary.nevents"%(run,dataset),conf)[-1].replace(" ","")
0079 if not NEventsDasOut.isdigit():
0080 print("Invalid number of events:")
0081 print("__%s__"%NEventsDasOut)
0082 return 0
0083 else:
0084 return int(NEventsDasOut)
0085
0086 def getNumberOfFiles (run,dataset,conf):
0087 if not int(run) > 99999 or not int(run)<1000000:
0088 print("Invalid run number")
0089 return 0
0090 if not checkDatasetStructure(dataset):
0091 print("Invalid dataset")
0092 return 0
0093 NFilesDasOut = dasQuery('summary dataset=%s run=%s | grep summary.nfiles'%(dataset,run),conf)[-1].replace(" ","")
0094 if not NFilesDasOut.isdigit():
0095 print("Invalid number of files.")
0096 return 0
0097 else :
0098 return int(NFilesDasOut)
0099
0100
0101 def reSubmitJob(run, dataset, conf, first, last):
0102 print("Re-submitting jobs for run = %s, dataset = %s"%(run, dataset))
0103
0104
0105
0106 def submitJobs(run, dataset, nFiles, conf):
0107 print("Submitting jobs for run = %s, dataset = %s"%(run, dataset))
0108
0109
0110 files = ''
0111 if not checkDatasetStructure(dataset,conf):
0112 print("FATAL ERROR, bad dataset")
0113 return -1
0114 if not run > 99999 or not run<1000000:
0115 print("FATAL ERROR, bad run number")
0116 return -1
0117 filesList = dasQuery('file dataset=%s run=%s'%(dataset,run),conf)
0118 filesInJob = 0
0119 firstFile = 0
0120 for f in filesList:
0121 if(not f.startswith('/store')):continue
0122 if filesInJob<conf.nFilesPerJob:
0123 files+="'"+f+"',"
0124 filesInJob+=1
0125 else:
0126 firstFile = firstFile+filesInJob
0127 sendJob(dataset,run,files,conf,firstFile)
0128 files="'"+f+"',"
0129 filesInJob=1
0130 sendJob(dataset,run,files,conf,firstFile)
0131
0132 def sendJob(dataset,run,files,conf,firstFile):
0133 cmd = "python %s/submitCalibTree/runJob.py -f %s --firstFile %s -d %s -r %s "%(conf.RUNDIR, files,firstFile,dataset,run)
0134 if conf.AAG:
0135 cmd+=" -a "
0136 bsub = 'bsub -q 2nd -J calibTree_' + str(run) + '_' + str(firstFile)+ '_' + '_%s'%("Aag" if conf.AAG else 'Std')+' -R "type == SLC6_64 && pool > 30000" ' + ' "'+cmd+'"'
0137 conf.launchedRuns.append([run,firstFile])
0138 if conf.submit:
0139 os.system(bsub)
0140 else:
0141 print(cmd + " --stageout False")
0142
0143 def generateJobs(conf):
0144 print("Gathering jobs to launch.")
0145 print(conf)
0146 lastRunProcessed = conf.firstRun
0147 datasets = getDatasetFromPattern(conf.datasetPat,conf)
0148 for d in datasets:
0149 datasetRuns = getRunsFromDataset(d,conf)
0150 print(datasetRuns)
0151 for r in datasetRuns:
0152 if int(r) > conf.firstRun and int(r)<conf.lastRun:
0153 print("Checking run %s"%r)
0154 n=getNumberOfEvents(r,d,conf)
0155 if n < 250:
0156 print("Skipped. (%s evt)"%n)
0157 else:
0158 nFiles = getNumberOfFiles(r,d,conf)
0159 if nFiles > 0:
0160 print("Will be processed ! (%s evt, %s files)"%(n,nFiles))
0161 if r > lastRunProcessed:
0162 lastRunProcessed = r
0163 submitJobs(r,d,nFiles,conf)
0164 else:
0165 print("Skipped. (%s evt,%s files)"%(n,nFiles))
0166 else:
0167 for failled in conf.relaunchList:
0168 if int(failled[0]) == int(r):
0169 print("Relaunching job %s "% failled)
0170 if len(failled)==3:
0171 reSubmitJob(int(failled[0]),d,conf,failled[1],failled[2])
0172 else:
0173 submitJobs(int(failled[0]),d,25,conf)
0174 return lastRunProcessed
0175
0176
0177 def cleanUp():
0178 os.system('rm core.*')
0179
0180
0181 def checkCorrupted(lastGood, config):
0182 calibTreeList = ""
0183 print("Get the list of calibTree from" + config.CASTORDIR + ")")
0184 calibTreeInfo = subprocess.getstatusoutput(config.eosLs +config.CASTORDIR)[1].split('\n');
0185 NTotalEvents = 0;
0186 runList = []
0187
0188 for info in calibTreeInfo:
0189 subParts = info.split();
0190 if(len(subParts)<4): continue
0191
0192 runList.append(subParts[-1].replace("calibTree_","").replace(".root","").split("_"))
0193 print(runList)
0194 datasets = getDatasetFromPattern(config.datasetPat,config)
0195 for d in datasets:
0196 datasetRuns = getRunsFromDataset(d,config)
0197 print(datasetRuns)
0198 for r in datasetRuns:
0199 if int(r) > lastGood:
0200 print("Checking run %s"%r)
0201 n=getNumberOfEvents(r,d,config)
0202 if n < 250:
0203 print("Skipped. (%s evt)"%n)
0204 else:
0205 nFiles = getNumberOfFiles(r,d,config)
0206 if nFiles < 25:
0207 print("Found run %s ? %s"%(r,[str(r)] in runList))
0208 else:
0209 x=25
0210 while x<nFiles:
0211 print("Found run %s , %s ? %s "%(r,x, [str(r),str(x)] in runList))
0212 x+=25
0213
0214
0215
0216
0217
0218
0219
0220
0221
0222
0223
0224
0225
0226
0227
0228
0229
0230 if __name__ == "__main__":
0231 print("DEBUG DEBUG DEBUG")
0232 c = Config.configuration(False)
0233 c.runNumber = 1
0234 c.firstRun = 274500
0235 c.lastRun = 275000
0236 c.debug = True
0237
0238 checkCorrupted(0,c)
0239
0240 if False:
0241
0242
0243
0244 calibTreeList = ""
0245 print("Get the list of calibTree from" + CASTORDIR + ")")
0246 calibTreeInfo = subprocess.getstatusoutput(initEnv+"eos ls " + CASTORDIR)[1].split('\n');
0247 NTotalEvents = 0;
0248 run = 0
0249 for info in calibTreeInfo:
0250 subParts = info.split();
0251 if(len(subParts)<4):continue
0252
0253 run = int(subParts[4].replace("/calibTree_","").replace(".root","").replace(CASTORDIR,""))
0254 file = "root://eoscms//eos/cms"+subParts[4]
0255 print("Checking " + file)
0256 results = subprocess.getstatusoutput(initEnv+'root -l -b -q ' + file)
0257 if(len(results[1].splitlines())>3):
0258 print(results[1]);
0259 print("add " + str(run) + " to the list of failled runs")
0260 os.system('echo ' + str(run) + ' >> FailledRun%s.txt'%('_Aag' if AAG else ''))
0261
0262
0263 if opt.datasetType.lower()=="all":
0264 system("cd "+RUNDIR+"; python SubmitJobs.py -c -d Aag")
0265
0266
0267
0268