File indexing completed on 2023-10-25 09:38:30
0001 from __future__ import print_function
0002 from threading import Thread
0003 from Configuration.PyReleaseValidation import WorkFlow
0004 import os,time
0005 import shutil
0006 from subprocess import Popen
0007 from os.path import exists, basename, join
0008 from datetime import datetime
0009
0010 class WorkFlowRunner(Thread):
0011 def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, nStreams=0, maxSteps=9999, nEvents=0):
0012 Thread.__init__(self)
0013 self.wf = wf
0014
0015 self.status=-1
0016 self.report=''
0017 self.nfail=0
0018 self.npass=0
0019 self.noRun=noRun
0020 self.dryRun=dryRun
0021 self.cafVeto=cafVeto
0022 self.dasOptions=dasOptions
0023 self.jobReport=jobReport
0024 self.nThreads=nThreads
0025 self.nStreams=nStreams
0026 self.maxSteps=maxSteps
0027 self.nEvents=nEvents
0028
0029 self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
0030 return
0031
0032 def doCmd(self, cmd):
0033
0034 msg = "\n# in: " +os.getcwd()
0035 if self.dryRun: msg += " dryRun for '"
0036 else: msg += " going to execute "
0037 msg += cmd.replace(';','\n')
0038 print(msg)
0039
0040 cmdLog = open(self.wfDir+'/cmdLog','a')
0041 cmdLog.write(msg+'\n')
0042 cmdLog.close()
0043
0044 ret = 0
0045 if not self.dryRun:
0046 p = Popen(cmd, shell=True)
0047 ret = os.waitpid(p.pid, 0)[1]
0048 if ret != 0:
0049 print("ERROR executing ",cmd,'ret=', ret)
0050
0051 return ret
0052
0053 def run(self):
0054
0055 startDir = os.getcwd()
0056
0057 if not os.path.exists(self.wfDir):
0058 os.makedirs(self.wfDir)
0059 elif not self.dryRun:
0060 print("cleaning up ", self.wfDir, ' in ', os.getcwd())
0061 shutil.rmtree(self.wfDir)
0062 os.makedirs(self.wfDir)
0063
0064 preamble = 'cd '+self.wfDir+'; '
0065
0066 realstarttime = datetime.now()
0067 startime='date %s' %time.asctime()
0068
0069
0070 onCAF = False
0071 if 'cms/caf/cms' in os.environ['CMS_PATH']:
0072 onCAF = True
0073
0074
0075
0076 self.npass = []
0077 self.nfail = []
0078 self.stat = []
0079 self.retStep = []
0080
0081 def closeCmd(i,ID):
0082 return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
0083
0084 inFile=None
0085 lumiRangeFile=None
0086 aborted=False
0087 for (istepmone,com) in enumerate(self.wf.cmds):
0088
0089
0090
0091 isInputOk=True
0092 istep=istepmone+1
0093 cmd = preamble
0094 if aborted:
0095 self.npass.append(0)
0096 self.nfail.append(0)
0097 self.retStep.append(0)
0098 self.stat.append('NOTRUN')
0099 continue
0100 if not isinstance(com,str):
0101 if self.cafVeto and (com.location == 'CAF' and not onCAF):
0102 print("You need to be no CAF to run",self.wf.numId)
0103 self.npass.append(0)
0104 self.nfail.append(0)
0105 self.retStep.append(0)
0106 self.stat.append('NOTRUN')
0107 aborted=True
0108 continue
0109
0110 cmd2 = com.lumiRanges()
0111 if cmd2:
0112 cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
0113 lumiRangeFile='step%d_lumiRanges.log'%(istep,)
0114 retStep = self.doCmd(cmd2)
0115 if (com.dataSetParent):
0116 cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
0117 retStep = self.doCmd(cmd3)
0118 cmd+=com.das(self.dasOptions,com.dataSet)
0119 cmd+=closeCmd(istep,'dasquery')
0120 retStep = self.doCmd(cmd)
0121
0122
0123
0124 dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
0125
0126 if not self.dryRun:
0127 if not exists(dasOutputPath):
0128 retStep = 1
0129 dasOutput = None
0130 else:
0131
0132
0133 dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
0134 if not dasOutput:
0135 retStep = 1
0136 isInputOk = False
0137
0138 inFile = 'filelist:' + basename(dasOutputPath)
0139 print("---")
0140 else:
0141
0142 cmd += com
0143 if self.noRun:
0144 cmd +=' --no_exec'
0145
0146
0147 if inFile and not 'premix_stage1' in cmd:
0148 cmd += ' --filein '+inFile
0149 inFile=None
0150 if lumiRangeFile:
0151 cmd += ' --lumiToProcess '+lumiRangeFile
0152 lumiRangeFile=None
0153
0154 if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
0155 cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
0156 else:
0157
0158
0159
0160 if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
0161 cmd+=' --filein file:step%s.root '%(istep-1,)
0162 if not '--fileout' in com:
0163 cmd+=' --fileout file:step%s.root '%(istep,)
0164 if self.jobReport:
0165 cmd += ' --suffix "-j JobReport%s.xml " ' % istep
0166 if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0167 cmd += ' --nThreads %s' % self.nThreads
0168 if (self.nStreams > 0) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0169 cmd += ' --nStreams %s' % self.nStreams
0170 if (self.nEvents > 0):
0171 event_token = " -n "
0172 split = cmd.split(event_token)
0173 pos_cmd = " ".join(split[1].split(" ")[1:])
0174 cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd
0175 cmd+=closeCmd(istep,self.wf.nameId)
0176 retStep = 0
0177 if istep>self.maxSteps:
0178 wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
0179 wf_stats.write('step%s:%s\n' % (istep, cmd))
0180 wf_stats.close()
0181 else: retStep = self.doCmd(cmd)
0182
0183 self.retStep.append(retStep)
0184 if retStep == 32000:
0185
0186 self.npass.append(0)
0187 self.nfail.append(1)
0188 self.stat.append('TIMEOUT')
0189 aborted = True
0190 elif (retStep!=0):
0191
0192 self.npass.append(0)
0193 self.nfail.append(1)
0194 if not isInputOk:
0195 self.stat.append("DAS_ERROR")
0196 else:
0197 self.stat.append('FAILED')
0198
0199 aborted=True
0200 else:
0201
0202 self.npass.append(1)
0203 self.nfail.append(0)
0204 self.stat.append('PASSED')
0205
0206 os.chdir(startDir)
0207 endtime='date %s' %time.asctime()
0208 tottime='%s-%s'%(endtime,startime)
0209
0210
0211
0212
0213 logStat=''
0214 for i,s in enumerate(self.stat):
0215 logStat+='Step%d-%s '%(i,s)
0216
0217 self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
0218
0219 return
0220