Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-07-02 00:53:32

0001 from __future__ import print_function
0002 from threading import Thread
0003 from Configuration.PyReleaseValidation import WorkFlow
0004 import os,time
0005 import shutil
0006 from subprocess import Popen 
0007 from os.path import exists, basename, join
0008 from datetime import datetime
0009 
0010 class WorkFlowRunner(Thread):
0011     def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, nStreams=0, maxSteps=9999, nEvents=0):
0012         Thread.__init__(self)
0013         self.wf = wf
0014 
0015         self.status=-1
0016         self.report=''
0017         self.nfail=0
0018         self.npass=0
0019         self.noRun=noRun
0020         self.dryRun=dryRun
0021         self.cafVeto=cafVeto
0022         self.dasOptions=dasOptions
0023         self.jobReport=jobReport
0024         self.nThreads=nThreads
0025         self.nStreams=nStreams
0026         self.maxSteps=maxSteps
0027         self.nEvents=nEvents
0028         self.recoOutput=''
0029         
0030         self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
0031         return
0032 
0033     def doCmd(self, cmd):
0034 
0035         msg = "\n# in: " +os.getcwd()
0036         if self.dryRun: msg += " dryRun for '"
0037         else:      msg += " going to execute "
0038         msg += cmd.replace(';','\n')
0039         print(msg)
0040 
0041         cmdLog = open(self.wfDir+'/cmdLog','a')
0042         cmdLog.write(msg+'\n')
0043         cmdLog.close()
0044         
0045         ret = 0
0046         if not self.dryRun:
0047             p = Popen(cmd, shell=True)
0048             ret = os.waitpid(p.pid, 0)[1]
0049             if ret != 0:
0050                 print("ERROR executing ",cmd,'ret=', ret)
0051 
0052         return ret
0053     
0054     def run(self):
0055 
0056         startDir = os.getcwd()
0057 
0058         if not os.path.exists(self.wfDir):
0059             os.makedirs(self.wfDir)
0060         elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
0061             print("cleaning up ", self.wfDir, ' in ', os.getcwd())
0062             shutil.rmtree(self.wfDir) 
0063             os.makedirs(self.wfDir)
0064 
0065         preamble = 'cd '+self.wfDir+'; '
0066        
0067         realstarttime = datetime.now()
0068         startime='date %s' %time.asctime()
0069 
0070         # check where we are running:
0071         onCAF = False
0072         if 'cms/caf/cms' in os.environ['CMS_PATH']:
0073             onCAF = True
0074 
0075         ##needs to set
0076         #self.report
0077         self.npass  = []
0078         self.nfail = []
0079         self.stat = []
0080         self.retStep = []
0081 
0082         def closeCmd(i,ID):
0083             return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
0084 
0085         inFile=None
0086         lumiRangeFile=None
0087         aborted=False
0088         for (istepmone,com) in enumerate(self.wf.cmds):
0089             # isInputOk is used to keep track of the das result. In case this
0090             # is False we use a different error message to indicate the failed
0091             # das query.
0092             isInputOk=True
0093             istep=istepmone+1
0094             cmd = preamble
0095             if aborted:
0096                 self.npass.append(0)
0097                 self.nfail.append(0)
0098                 self.retStep.append(0)
0099                 self.stat.append('NOTRUN')
0100                 continue
0101             if not isinstance(com,str):
0102                 if self.cafVeto and (com.location == 'CAF' and not onCAF):
0103                     print("You need to be no CAF to run",self.wf.numId)
0104                     self.npass.append(0)
0105                     self.nfail.append(0)
0106                     self.retStep.append(0)
0107                     self.stat.append('NOTRUN')
0108                     aborted=True
0109                     continue
0110                 #create lumiRange file first so if das fails we get its error code
0111                 cmd2 = com.lumiRanges()
0112                 if cmd2:
0113                     cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
0114                     lumiRangeFile='step%d_lumiRanges.log'%(istep,)
0115                     retStep = self.doCmd(cmd2)
0116                 if (com.dataSetParent):
0117                     cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
0118                     retStep = self.doCmd(cmd3)
0119                 cmd+=com.das(self.dasOptions,com.dataSet)
0120                 cmd+=closeCmd(istep,'dasquery')
0121                 retStep = self.doCmd(cmd)
0122                 #don't use the file list executed, but use the das command of cmsDriver for next step
0123                 # If the das output is not there or it's empty, consider it an
0124                 # issue of this step, not of the next one.
0125                 dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
0126                 # Check created das output in no-dryRun mode only
0127                 if not self.dryRun:
0128                     if not exists(dasOutputPath):
0129                         retStep = 1
0130                         dasOutput = None
0131                     else:
0132                         # We consider only the files which have at least one logical filename
0133                         # in it. This is because sometimes das fails and still prints out junk.
0134                         dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
0135                     if not dasOutput:
0136                         retStep = 1
0137                         isInputOk = False
0138                  
0139                 inFile = 'filelist:' + basename(dasOutputPath)
0140                 print("---")
0141             else:
0142                 #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
0143                 cmd += com
0144                 if self.noRun:
0145                     cmd +=' --no_exec'
0146                 # in case previous step used DAS query (either filelist of das:)
0147                 # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
0148                 if inFile and not 'premix_stage1' in cmd:
0149                     cmd += ' --filein '+inFile
0150                     inFile=None
0151                 if lumiRangeFile: #DAS query can also restrict lumi range
0152                     cmd += ' --lumiToProcess '+lumiRangeFile
0153                     lumiRangeFile=None
0154                 # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..    
0155                 if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
0156                     cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
0157                 else:
0158                     # Disable input for premix stage1 to allow combined stage1+stage2 workflow
0159                     # Disable input for premix stage2 in FastSim to allow combined stage1+stage2 workflow (in FS, stage2 does also GEN)
0160                     # Ugly hack but works
0161                     if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
0162                         steps = cmd.split("-s ")[1].split(" ")[0] ## relying on the syntax: cmsDriver -s STEPS --otherFlags
0163                         if "ALCA" not in steps:
0164                             cmd+=' --filein  file:step%s.root '%(istep-1,)
0165                         elif "ALCA" in steps and "RECO" in steps:
0166                             cmd+=' --filein  file:step%s.root '%(istep-1,)
0167                         elif self.recoOutput:
0168                             cmd+=' --filein %s'%(self.recoOutput)
0169                         else:
0170                             cmd+=' --filein  file:step%s.root '%(istep-1,)
0171                     if not '--fileout' in com:
0172                         cmd+=' --fileout file:step%s.root '%(istep,)
0173                         if "RECO" in cmd:
0174                             self.recoOutput = "file:step%d.root"%(istep)
0175                 if self.jobReport:
0176                   cmd += ' --suffix "-j JobReport%s.xml " ' % istep
0177                 if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0178                   cmd += ' --nThreads %s' % self.nThreads
0179                 if (self.nStreams > 0) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0180                   cmd += ' --nStreams %s' % self.nStreams
0181                 if (self.nEvents > 0):
0182                   event_token = " -n "
0183                   split = cmd.split(event_token)
0184                   pos_cmd = " ".join(split[1].split(" ")[1:])
0185                   cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd
0186                 cmd+=closeCmd(istep,self.wf.nameId)            
0187                 retStep = 0
0188                 if istep>self.maxSteps:
0189                    wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
0190                    wf_stats.write('step%s:%s\n' % (istep, cmd))
0191                    wf_stats.close()
0192                 else: retStep = self.doCmd(cmd)
0193             
0194             self.retStep.append(retStep)
0195             if retStep == 32000:
0196                 # A timeout occurred
0197                 self.npass.append(0)
0198                 self.nfail.append(1)
0199                 self.stat.append('TIMEOUT')
0200                 aborted = True
0201             elif (retStep!=0):
0202                 #error occured
0203                 self.npass.append(0)
0204                 self.nfail.append(1)
0205                 if not isInputOk:
0206                   self.stat.append("DAS_ERROR")
0207                 else:
0208                   self.stat.append('FAILED')
0209                 #to skip processing
0210                 aborted=True
0211             else:
0212                 #things went fine
0213                 self.npass.append(1)
0214                 self.nfail.append(0)
0215                 self.stat.append('PASSED')
0216 
0217         os.chdir(startDir)
0218         endtime='date %s' %time.asctime()
0219         tottime='%s-%s'%(endtime,startime)
0220         
0221 
0222         #### wrap up ####
0223 
0224         logStat=''
0225         for i,s in enumerate(self.stat):
0226             logStat+='Step%d-%s '%(i,s)
0227         #self.report='%s_%s+%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,'+'.join(self.wf.stepList),logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
0228         self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
0229 
0230         return 
0231