File indexing completed on 2025-04-11 03:30:59
0001 from threading import Thread
0002 from Configuration.PyReleaseValidation import WorkFlow
0003 import os,time
0004 import shutil
0005 import re
0006 from subprocess import Popen
0007 from os.path import exists, basename, join
0008 from datetime import datetime
0009
0010 class WorkFlowRunner(Thread):
0011 def __init__(self, wf, opt, noRun=False, dryRun=False, cafVeto=True, jobNumber=None, gpu = None):
0012 Thread.__init__(self)
0013 self.wf = wf
0014
0015 self.status = -1
0016 self.report =''
0017 self.nfail = 0
0018 self.npass = 0
0019 self.noRun = noRun
0020 self.dryRun = dryRun
0021 self.cafVeto = cafVeto
0022 self.gpu = gpu
0023
0024 self.dasOptions = opt.dasOptions
0025 self.jobReport = opt.jobReports
0026 self.nThreads = opt.nThreads
0027 self.nStreams = opt.nStreams
0028 self.maxSteps = opt.maxSteps
0029 self.nEvents = opt.nEvents
0030 self.recoOutput = ''
0031 self.startFrom = opt.startFrom
0032 self.recycle = opt.recycle
0033
0034 self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
0035 if jobNumber is not None:
0036 self.wfDir = self.wfDir + '_job' + str(jobNumber)
0037
0038 return
0039
0040 def doCmd(self, cmd):
0041
0042 msg = "\n# in: " +os.getcwd()
0043 if self.dryRun: msg += " dryRun for '"
0044 else: msg += " going to execute "
0045 msg += cmd.replace(';','\n')
0046 print(msg)
0047
0048 cmdLog = open(self.wfDir+'/cmdLog','a')
0049 cmdLog.write(msg+'\n')
0050 cmdLog.close()
0051
0052 ret = 0
0053 if not self.dryRun:
0054 p = Popen(cmd, shell=True)
0055 ret = os.waitpid(p.pid, 0)[1]
0056 if ret != 0:
0057 print("ERROR executing ",cmd,'ret=', ret)
0058
0059 return ret
0060
0061 def run(self):
0062
0063 startDir = os.getcwd()
0064
0065 if not os.path.exists(self.wfDir):
0066 os.makedirs(self.wfDir)
0067 elif not self.dryRun:
0068 print("cleaning up ", self.wfDir, ' in ', os.getcwd())
0069 shutil.rmtree(self.wfDir)
0070 os.makedirs(self.wfDir)
0071
0072 preamble = 'cd '+self.wfDir+'; '
0073
0074 realstarttime = datetime.now()
0075 startime='date %s' %time.asctime()
0076
0077
0078 onCAF = False
0079 if 'cms/caf/cms' in os.environ['CMS_PATH']:
0080 onCAF = True
0081
0082
0083
0084 self.npass = []
0085 self.nfail = []
0086 self.stat = []
0087 self.retStep = []
0088
0089 def closeCmd(i,ID):
0090 return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
0091
0092 inFile=None
0093 lumiRangeFile=None
0094 aborted=False
0095 outputExtensionForStep = {}
0096 for (istepmone,com) in enumerate(self.wf.cmds):
0097
0098
0099
0100 isInputOk=True
0101 istep=istepmone+1
0102 cmd = preamble
0103 outputExtensionForStep[istep]=''
0104 if aborted:
0105 self.npass.append(0)
0106 self.nfail.append(0)
0107 self.retStep.append(0)
0108 self.stat.append('NOTRUN')
0109 continue
0110 if not isinstance(com,str):
0111 if self.recycle:
0112 inFile = self.recycle
0113 continue
0114 if self.cafVeto and (com.location == 'CAF' and not onCAF):
0115 print("You need to be no CAF to run",self.wf.numId)
0116 self.npass.append(0)
0117 self.nfail.append(0)
0118 self.retStep.append(0)
0119 self.stat.append('NOTRUN')
0120 aborted=True
0121 continue
0122
0123 cmd2 = com.lumiRanges()
0124 if cmd2:
0125 cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
0126 lumiRangeFile='step%d_lumiRanges.log'%(istep,)
0127 retStep = self.doCmd(cmd2)
0128 if (com.dataSetParent):
0129 cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
0130 retStep = self.doCmd(cmd3)
0131 cmd+=com.das(self.dasOptions,com.dataSet)
0132 cmd+=closeCmd(istep,'dasquery')
0133 retStep = self.doCmd(cmd)
0134
0135
0136
0137 dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
0138
0139 if not self.dryRun:
0140 if not exists(dasOutputPath):
0141 retStep = 1
0142 dasOutput = None
0143 else:
0144
0145
0146 dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/") or l.startswith("root://eoscms.cern.ch")]
0147 if not dasOutput:
0148 retStep = 1
0149 isInputOk = False
0150
0151 inFile = 'filelist:' + basename(dasOutputPath)
0152
0153 if com.skimEvents:
0154 lumiRangeFile='step%d_lumiRanges.log'%(istep,)
0155 cmd2 = preamble + "mv lumi_ranges.txt " + lumiRangeFile
0156 retStep = self.doCmd(cmd2)
0157
0158 print("---")
0159
0160 else:
0161
0162 if self.gpu is not None:
0163 cmd = cmd + self.gpu
0164
0165 cmd += com
0166
0167 if self.startFrom:
0168 steps = cmd.split("-s ")[1].split(" ")[0]
0169 if self.startFrom not in steps:
0170 continue
0171 else:
0172 self.startFrom = False
0173 inFile = self.recycle
0174
0175 if self.noRun:
0176 cmd +=' --no_exec'
0177
0178
0179 if inFile and not 'premix_stage1' in cmd:
0180 cmd += ' --filein '+inFile
0181 inFile=None
0182 if lumiRangeFile:
0183 cmd += ' --lumiToProcess '+lumiRangeFile
0184 lumiRangeFile=None
0185
0186 if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
0187 cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
0188 outputExtensionForStep[istep] = '.root'
0189 else:
0190
0191
0192
0193 extension = '.root'
0194 if '--rntuple_out' in cmd:
0195 extension = '.rntpl'
0196 outputExtensionForStep[istep] = extension
0197 if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
0198 steps = cmd.split("-s ")[1].split(" ")[0]
0199 if "ALCA" not in steps:
0200 cmd+=' --filein file:step%s%s '%(istep-1,extension)
0201 elif "ALCA" in steps and "RECO" in steps:
0202 cmd+=' --filein file:step%s%s '%(istep-1,extension)
0203 elif self.recoOutput:
0204 cmd+=' --filein %s'%(self.recoOutput)
0205 else:
0206 cmd+=' --filein file:step%s%s '%(istep-1,extension)
0207 elif istep!=1 and '--filein' in cmd and '--filetype' not in cmd:
0208
0209
0210 expression = '--filein\s+file:step([1-9])(_[a-zA-Z]+)*\.[a-z]+'
0211 m = re.search(expression, cmd)
0212 if m:
0213 cmd = re.sub(expression,r'--filein file:step\1\2'+outputExtensionForStep[int(m.group(1))],cmd)
0214 elif extension == '.rntpl':
0215
0216 expression = '--filein\s+file:([a-zA-Z0-9_]+)*\.[a-z]+'
0217 m = re.search(expression, cmd)
0218 if m:
0219 cmd = re.sub(expression,r'--filein file:\1.rntpl',cmd)
0220 if not '--fileout' in com:
0221 cmd+=' --fileout file:step%s%s '%(istep,extension)
0222 if "RECO" in cmd:
0223 self.recoOutput = "file:step%d%s"%(istep,extension)
0224 if self.jobReport:
0225 cmd += ' --suffix "-j JobReport%s.xml " ' % istep
0226 if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0227 cmd += ' --nThreads %s' % self.nThreads
0228 if (self.nStreams > 0) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0229 cmd += ' --nStreams %s' % self.nStreams
0230 if (self.nEvents > 0):
0231 event_token = " -n "
0232 split = cmd.split(event_token)
0233 pos_cmd = " ".join(split[1].split(" ")[1:])
0234 cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd
0235 cmd+=closeCmd(istep,self.wf.nameId)
0236 retStep = 0
0237
0238 if istep>self.maxSteps:
0239 wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
0240 wf_stats.write('step%s:%s\n' % (istep, cmd))
0241 wf_stats.close()
0242 else: retStep = self.doCmd(cmd)
0243
0244 self.retStep.append(retStep)
0245 if retStep == 32000:
0246
0247 self.npass.append(0)
0248 self.nfail.append(1)
0249 self.stat.append('TIMEOUT')
0250 aborted = True
0251 elif (retStep!=0):
0252
0253 self.npass.append(0)
0254 self.nfail.append(1)
0255 if not isInputOk:
0256 self.stat.append("DAS_ERROR")
0257 else:
0258 self.stat.append('FAILED')
0259
0260 aborted=True
0261 else:
0262
0263 self.npass.append(1)
0264 self.nfail.append(0)
0265 self.stat.append('PASSED')
0266
0267 os.chdir(startDir)
0268 endtime='date %s' %time.asctime()
0269 tottime='%s-%s'%(endtime,startime)
0270
0271
0272
0273
0274 logStat=''
0275 for i,s in enumerate(self.stat):
0276 logStat+='Step%d-%s '%(i,s)
0277
0278 self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
0279
0280 return
0281