File indexing completed on 2024-07-02 00:53:32
0001 from __future__ import print_function
0002 from threading import Thread
0003 from Configuration.PyReleaseValidation import WorkFlow
0004 import os,time
0005 import shutil
0006 from subprocess import Popen
0007 from os.path import exists, basename, join
0008 from datetime import datetime
0009
0010 class WorkFlowRunner(Thread):
0011 def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, nStreams=0, maxSteps=9999, nEvents=0):
0012 Thread.__init__(self)
0013 self.wf = wf
0014
0015 self.status=-1
0016 self.report=''
0017 self.nfail=0
0018 self.npass=0
0019 self.noRun=noRun
0020 self.dryRun=dryRun
0021 self.cafVeto=cafVeto
0022 self.dasOptions=dasOptions
0023 self.jobReport=jobReport
0024 self.nThreads=nThreads
0025 self.nStreams=nStreams
0026 self.maxSteps=maxSteps
0027 self.nEvents=nEvents
0028 self.recoOutput=''
0029
0030 self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
0031 return
0032
0033 def doCmd(self, cmd):
0034
0035 msg = "\n# in: " +os.getcwd()
0036 if self.dryRun: msg += " dryRun for '"
0037 else: msg += " going to execute "
0038 msg += cmd.replace(';','\n')
0039 print(msg)
0040
0041 cmdLog = open(self.wfDir+'/cmdLog','a')
0042 cmdLog.write(msg+'\n')
0043 cmdLog.close()
0044
0045 ret = 0
0046 if not self.dryRun:
0047 p = Popen(cmd, shell=True)
0048 ret = os.waitpid(p.pid, 0)[1]
0049 if ret != 0:
0050 print("ERROR executing ",cmd,'ret=', ret)
0051
0052 return ret
0053
0054 def run(self):
0055
0056 startDir = os.getcwd()
0057
0058 if not os.path.exists(self.wfDir):
0059 os.makedirs(self.wfDir)
0060 elif not self.dryRun:
0061 print("cleaning up ", self.wfDir, ' in ', os.getcwd())
0062 shutil.rmtree(self.wfDir)
0063 os.makedirs(self.wfDir)
0064
0065 preamble = 'cd '+self.wfDir+'; '
0066
0067 realstarttime = datetime.now()
0068 startime='date %s' %time.asctime()
0069
0070
0071 onCAF = False
0072 if 'cms/caf/cms' in os.environ['CMS_PATH']:
0073 onCAF = True
0074
0075
0076
0077 self.npass = []
0078 self.nfail = []
0079 self.stat = []
0080 self.retStep = []
0081
0082 def closeCmd(i,ID):
0083 return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
0084
0085 inFile=None
0086 lumiRangeFile=None
0087 aborted=False
0088 for (istepmone,com) in enumerate(self.wf.cmds):
0089
0090
0091
0092 isInputOk=True
0093 istep=istepmone+1
0094 cmd = preamble
0095 if aborted:
0096 self.npass.append(0)
0097 self.nfail.append(0)
0098 self.retStep.append(0)
0099 self.stat.append('NOTRUN')
0100 continue
0101 if not isinstance(com,str):
0102 if self.cafVeto and (com.location == 'CAF' and not onCAF):
0103 print("You need to be no CAF to run",self.wf.numId)
0104 self.npass.append(0)
0105 self.nfail.append(0)
0106 self.retStep.append(0)
0107 self.stat.append('NOTRUN')
0108 aborted=True
0109 continue
0110
0111 cmd2 = com.lumiRanges()
0112 if cmd2:
0113 cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
0114 lumiRangeFile='step%d_lumiRanges.log'%(istep,)
0115 retStep = self.doCmd(cmd2)
0116 if (com.dataSetParent):
0117 cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
0118 retStep = self.doCmd(cmd3)
0119 cmd+=com.das(self.dasOptions,com.dataSet)
0120 cmd+=closeCmd(istep,'dasquery')
0121 retStep = self.doCmd(cmd)
0122
0123
0124
0125 dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
0126
0127 if not self.dryRun:
0128 if not exists(dasOutputPath):
0129 retStep = 1
0130 dasOutput = None
0131 else:
0132
0133
0134 dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
0135 if not dasOutput:
0136 retStep = 1
0137 isInputOk = False
0138
0139 inFile = 'filelist:' + basename(dasOutputPath)
0140 print("---")
0141 else:
0142
0143 cmd += com
0144 if self.noRun:
0145 cmd +=' --no_exec'
0146
0147
0148 if inFile and not 'premix_stage1' in cmd:
0149 cmd += ' --filein '+inFile
0150 inFile=None
0151 if lumiRangeFile:
0152 cmd += ' --lumiToProcess '+lumiRangeFile
0153 lumiRangeFile=None
0154
0155 if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
0156 cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
0157 else:
0158
0159
0160
0161 if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
0162 steps = cmd.split("-s ")[1].split(" ")[0]
0163 if "ALCA" not in steps:
0164 cmd+=' --filein file:step%s.root '%(istep-1,)
0165 elif "ALCA" in steps and "RECO" in steps:
0166 cmd+=' --filein file:step%s.root '%(istep-1,)
0167 elif self.recoOutput:
0168 cmd+=' --filein %s'%(self.recoOutput)
0169 else:
0170 cmd+=' --filein file:step%s.root '%(istep-1,)
0171 if not '--fileout' in com:
0172 cmd+=' --fileout file:step%s.root '%(istep,)
0173 if "RECO" in cmd:
0174 self.recoOutput = "file:step%d.root"%(istep)
0175 if self.jobReport:
0176 cmd += ' --suffix "-j JobReport%s.xml " ' % istep
0177 if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0178 cmd += ' --nThreads %s' % self.nThreads
0179 if (self.nStreams > 0) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0180 cmd += ' --nStreams %s' % self.nStreams
0181 if (self.nEvents > 0):
0182 event_token = " -n "
0183 split = cmd.split(event_token)
0184 pos_cmd = " ".join(split[1].split(" ")[1:])
0185 cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd
0186 cmd+=closeCmd(istep,self.wf.nameId)
0187 retStep = 0
0188 if istep>self.maxSteps:
0189 wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
0190 wf_stats.write('step%s:%s\n' % (istep, cmd))
0191 wf_stats.close()
0192 else: retStep = self.doCmd(cmd)
0193
0194 self.retStep.append(retStep)
0195 if retStep == 32000:
0196
0197 self.npass.append(0)
0198 self.nfail.append(1)
0199 self.stat.append('TIMEOUT')
0200 aborted = True
0201 elif (retStep!=0):
0202
0203 self.npass.append(0)
0204 self.nfail.append(1)
0205 if not isInputOk:
0206 self.stat.append("DAS_ERROR")
0207 else:
0208 self.stat.append('FAILED')
0209
0210 aborted=True
0211 else:
0212
0213 self.npass.append(1)
0214 self.nfail.append(0)
0215 self.stat.append('PASSED')
0216
0217 os.chdir(startDir)
0218 endtime='date %s' %time.asctime()
0219 tottime='%s-%s'%(endtime,startime)
0220
0221
0222
0223
0224 logStat=''
0225 for i,s in enumerate(self.stat):
0226 logStat+='Step%d-%s '%(i,s)
0227
0228 self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
0229
0230 return
0231