File indexing completed on 2024-04-06 11:59:48
0001
0002 from __future__ import absolute_import
0003 from __future__ import print_function
0004 import urllib
0005 from . import Config
0006 import string
0007 import os
0008 import sys
0009 import subprocess
0010 import time
0011 import optparse
0012
0013 def dasQuery(query,config):
0014 cmd = config.dasClient+" --limit=9999 --query=\"%s\""%query
0015 print(cmd)
0016 output = subprocess.getstatusoutput(config.initEnv+cmd)
0017 if output[0]!=0:
0018 print("DAS CLIENT ERROR %s"%output[0])
0019 print(output[1])
0020 sys.exit()
0021 return(output[1].splitlines())
0022
0023 def checkDatasetStructure(dataset,silent=False):
0024 goodDataset=True
0025 d = dataset.split("/")
0026 if not len(d) == 4:
0027 if not silent:
0028 print("Bad dataset. Expecting 4 '/'")
0029 goodDataset=False
0030 return False
0031 if not d[0]=='':
0032 if not silent:
0033 print("Bad dataset. Expecting nothing before first '/'")
0034 goodDataset=False
0035 if not len(d[1])>0 or not len(d[2]) > 0 or not len(d[3]) > 0:
0036 if not silent:
0037 print("Bad dataset. Expecting text between '/'")
0038 goodDataset=False
0039 if os.path.isdir(dataset):
0040 if not silent:
0041 print("Bad dataset. Can't be an existing directory")
0042 goodDataset=False
0043 return goodDataset
0044
0045 def getDatasetFromPattern(pattern,conf):
0046 if not checkDatasetStructure(pattern):
0047 print("FATAL ERROR, bad dataset pattern")
0048 return([])
0049 result = dasQuery("dataset dataset=%s"%pattern,conf)
0050 datasets = []
0051 for line in result:
0052 print(line)
0053 if checkDatasetStructure(line,silent=False):
0054 datasets.append(line)
0055 return datasets
0056
0057 def getRunsFromDataset(dataset,conf):
0058 if not checkDatasetStructure(dataset):
0059 print("FATAL ERROR, bad dataset pattern")
0060 return([])
0061 result = dasQuery("run dataset=%s"%dataset,conf)
0062 runs=[]
0063 for line in result:
0064 if line.isdigit:
0065 if len(line)==6:
0066 runs.append(int(line))
0067 runs.sort()
0068 return runs
0069
0070 def getNumberOfEvents(run,dataset,conf):
0071 if not int(run) > 99999 or not int(run)<1000000:
0072 print("Invalid run number")
0073 return 0
0074 if not checkDatasetStructure(dataset):
0075 print("Invalid dataset")
0076 return 0
0077
0078
0079
0080 NEventsDasOut = dasQuery("summary run=%s dataset=%s |grep summary.nevents"%(run,dataset),conf)[-1].replace(" ","")
0081 if not NEventsDasOut.isdigit():
0082 print("Invalid number of events:")
0083 print("__%s__"%NEventsDasOut)
0084 return 0
0085 else:
0086 return int(NEventsDasOut)
0087
0088 def getNumberOfFiles (run,dataset,conf):
0089 if not int(run) > 99999 or not int(run)<1000000:
0090 print("Invalid run number")
0091 return 0
0092 if not checkDatasetStructure(dataset):
0093 print("Invalid dataset")
0094 return 0
0095 NFilesDasOut = dasQuery('summary dataset=%s run=%s | grep summary.nfiles'%(dataset,run),conf)[-1].replace(" ","")
0096 if not NFilesDasOut.isdigit():
0097 print("Invalid number of files.")
0098 return 0
0099 else :
0100 return int(NFilesDasOut)
0101
0102
0103 def reSubmitJob(run, dataset, conf, first, last):
0104 print("Re-submitting jobs for run = %s, dataset = %s"%(run, dataset))
0105
0106
0107
0108 def submitJobs(run, dataset, nFiles, conf):
0109 print("Submitting jobs for run = %s, dataset = %s"%(run, dataset))
0110
0111
0112 files = ''
0113 if not checkDatasetStructure(dataset,conf):
0114 print("FATAL ERROR, bad dataset")
0115 return -1
0116 if not run > 99999 or not run<1000000:
0117 print("FATAL ERROR, bad run number")
0118 return -1
0119 filesList = dasQuery('file dataset=%s run=%s'%(dataset,run),conf)
0120 filesInJob = 0
0121 firstFile = 0
0122 for f in filesList:
0123 if(not f.startswith('/store')):continue
0124 if filesInJob<conf.nFilesPerJob:
0125 files+="'"+f+"',"
0126 filesInJob+=1
0127 else:
0128 firstFile = firstFile+filesInJob
0129 sendJob(dataset,run,files,conf,firstFile)
0130 files="'"+f+"',"
0131 filesInJob=1
0132 sendJob(dataset,run,files,conf,firstFile)
0133
0134 def sendJob(dataset,run,files,conf,firstFile):
0135 cmd = "python %s/submitCalibTree/runJob.py -f %s --firstFile %s -d %s -r %s "%(conf.RUNDIR, files,firstFile,dataset,run)
0136 if conf.AAG:
0137 cmd+=" -a "
0138 bsub = 'bsub -q 2nd -J calibTree_' + str(run) + '_' + str(firstFile)+ '_' + '_%s'%("Aag" if conf.AAG else 'Std')+' -R "type == SLC6_64 && pool > 30000" ' + ' "'+cmd+'"'
0139 conf.launchedRuns.append([run,firstFile])
0140 if conf.submit:
0141 os.system(bsub)
0142 else:
0143 print(cmd + " --stageout False")
0144
0145 def generateJobs(conf):
0146 print("Gathering jobs to launch.")
0147 print(conf)
0148 lastRunProcessed = conf.firstRun
0149 datasets = getDatasetFromPattern(conf.datasetPat,conf)
0150 for d in datasets:
0151 datasetRuns = getRunsFromDataset(d,conf)
0152 print(datasetRuns)
0153 for r in datasetRuns:
0154 if int(r) > conf.firstRun and int(r)<conf.lastRun:
0155 print("Checking run %s"%r)
0156 n=getNumberOfEvents(r,d,conf)
0157 if n < 250:
0158 print("Skipped. (%s evt)"%n)
0159 else:
0160 nFiles = getNumberOfFiles(r,d,conf)
0161 if nFiles > 0:
0162 print("Will be processed ! (%s evt, %s files)"%(n,nFiles))
0163 if r > lastRunProcessed:
0164 lastRunProcessed = r
0165 submitJobs(r,d,nFiles,conf)
0166 else:
0167 print("Skipped. (%s evt,%s files)"%(n,nFiles))
0168 else:
0169 for failled in conf.relaunchList:
0170 if int(failled[0]) == int(r):
0171 print("Relaunching job %s "% failled)
0172 if len(failled)==3:
0173 reSubmitJob(int(failled[0]),d,conf,failled[1],failled[2])
0174 else:
0175 submitJobs(int(failled[0]),d,25,conf)
0176 return lastRunProcessed
0177
0178
0179 def cleanUp():
0180 os.system('rm core.*')
0181
0182
0183 def checkCorrupted(lastGood, config):
0184 calibTreeList = ""
0185 print("Get the list of calibTree from" + config.CASTORDIR + ")")
0186 calibTreeInfo = subprocess.getstatusoutput(config.eosLs +config.CASTORDIR)[1].split('\n');
0187 NTotalEvents = 0;
0188 runList = []
0189
0190 for info in calibTreeInfo:
0191 subParts = info.split();
0192 if(len(subParts)<4): continue
0193
0194 runList.append(subParts[-1].replace("calibTree_","").replace(".root","").split("_"))
0195 print(runList)
0196 datasets = getDatasetFromPattern(config.datasetPat,config)
0197 for d in datasets:
0198 datasetRuns = getRunsFromDataset(d,config)
0199 print(datasetRuns)
0200 for r in datasetRuns:
0201 if int(r) > lastGood:
0202 print("Checking run %s"%r)
0203 n=getNumberOfEvents(r,d,config)
0204 if n < 250:
0205 print("Skipped. (%s evt)"%n)
0206 else:
0207 nFiles = getNumberOfFiles(r,d,config)
0208 if nFiles < 25:
0209 print("Found run %s ? %s"%(r,[str(r)] in runList))
0210 else:
0211 x=25
0212 while x<nFiles:
0213 print("Found run %s , %s ? %s "%(r,x, [str(r),str(x)] in runList))
0214 x+=25
0215
0216
0217
0218
0219
0220
0221
0222
0223
0224
0225
0226
0227
0228
0229
0230
0231
0232 if __name__ == "__main__":
0233 print("DEBUG DEBUG DEBUG")
0234 c = Config.configuration(False)
0235 c.runNumber = 1
0236 c.firstRun = 274500
0237 c.lastRun = 275000
0238 c.debug = True
0239
0240 checkCorrupted(0,c)
0241
0242 if False:
0243
0244
0245
0246 calibTreeList = ""
0247 print("Get the list of calibTree from" + CASTORDIR + ")")
0248 calibTreeInfo = subprocess.getstatusoutput(initEnv+"eos ls " + CASTORDIR)[1].split('\n');
0249 NTotalEvents = 0;
0250 run = 0
0251 for info in calibTreeInfo:
0252 subParts = info.split();
0253 if(len(subParts)<4):continue
0254
0255 run = int(subParts[4].replace("/calibTree_","").replace(".root","").replace(CASTORDIR,""))
0256 file = "root://eoscms//eos/cms"+subParts[4]
0257 print("Checking " + file)
0258 results = subprocess.getstatusoutput(initEnv+'root -l -b -q ' + file)
0259 if(len(results[1].splitlines())>3):
0260 print(results[1]);
0261 print("add " + str(run) + " to the list of failled runs")
0262 os.system('echo ' + str(run) + ' >> FailledRun%s.txt'%('_Aag' if AAG else ''))
0263
0264
0265 if opt.datasetType.lower()=="all":
0266 system("cd "+RUNDIR+"; python SubmitJobs.py -c -d Aag")
0267
0268
0269
0270