Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-10-25 09:38:30

0001 from __future__ import print_function
0002 from threading import Thread
0003 from Configuration.PyReleaseValidation import WorkFlow
0004 import os,time
0005 import shutil
0006 from subprocess import Popen 
0007 from os.path import exists, basename, join
0008 from datetime import datetime
0009 
0010 class WorkFlowRunner(Thread):
0011     def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, nStreams=0, maxSteps=9999, nEvents=0):
0012         Thread.__init__(self)
0013         self.wf = wf
0014 
0015         self.status=-1
0016         self.report=''
0017         self.nfail=0
0018         self.npass=0
0019         self.noRun=noRun
0020         self.dryRun=dryRun
0021         self.cafVeto=cafVeto
0022         self.dasOptions=dasOptions
0023         self.jobReport=jobReport
0024         self.nThreads=nThreads
0025         self.nStreams=nStreams
0026         self.maxSteps=maxSteps
0027         self.nEvents=nEvents
0028         
0029         self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
0030         return
0031 
0032     def doCmd(self, cmd):
0033 
0034         msg = "\n# in: " +os.getcwd()
0035         if self.dryRun: msg += " dryRun for '"
0036         else:      msg += " going to execute "
0037         msg += cmd.replace(';','\n')
0038         print(msg)
0039 
0040         cmdLog = open(self.wfDir+'/cmdLog','a')
0041         cmdLog.write(msg+'\n')
0042         cmdLog.close()
0043         
0044         ret = 0
0045         if not self.dryRun:
0046             p = Popen(cmd, shell=True)
0047             ret = os.waitpid(p.pid, 0)[1]
0048             if ret != 0:
0049                 print("ERROR executing ",cmd,'ret=', ret)
0050 
0051         return ret
0052     
0053     def run(self):
0054 
0055         startDir = os.getcwd()
0056 
0057         if not os.path.exists(self.wfDir):
0058             os.makedirs(self.wfDir)
0059         elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
0060             print("cleaning up ", self.wfDir, ' in ', os.getcwd())
0061             shutil.rmtree(self.wfDir) 
0062             os.makedirs(self.wfDir)
0063 
0064         preamble = 'cd '+self.wfDir+'; '
0065        
0066         realstarttime = datetime.now()
0067         startime='date %s' %time.asctime()
0068 
0069         # check where we are running:
0070         onCAF = False
0071         if 'cms/caf/cms' in os.environ['CMS_PATH']:
0072             onCAF = True
0073 
0074         ##needs to set
0075         #self.report
0076         self.npass  = []
0077         self.nfail = []
0078         self.stat = []
0079         self.retStep = []
0080 
0081         def closeCmd(i,ID):
0082             return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
0083 
0084         inFile=None
0085         lumiRangeFile=None
0086         aborted=False
0087         for (istepmone,com) in enumerate(self.wf.cmds):
0088             # isInputOk is used to keep track of the das result. In case this
0089             # is False we use a different error message to indicate the failed
0090             # das query.
0091             isInputOk=True
0092             istep=istepmone+1
0093             cmd = preamble
0094             if aborted:
0095                 self.npass.append(0)
0096                 self.nfail.append(0)
0097                 self.retStep.append(0)
0098                 self.stat.append('NOTRUN')
0099                 continue
0100             if not isinstance(com,str):
0101                 if self.cafVeto and (com.location == 'CAF' and not onCAF):
0102                     print("You need to be no CAF to run",self.wf.numId)
0103                     self.npass.append(0)
0104                     self.nfail.append(0)
0105                     self.retStep.append(0)
0106                     self.stat.append('NOTRUN')
0107                     aborted=True
0108                     continue
0109                 #create lumiRange file first so if das fails we get its error code
0110                 cmd2 = com.lumiRanges()
0111                 if cmd2:
0112                     cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
0113                     lumiRangeFile='step%d_lumiRanges.log'%(istep,)
0114                     retStep = self.doCmd(cmd2)
0115                 if (com.dataSetParent):
0116                     cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
0117                     retStep = self.doCmd(cmd3)
0118                 cmd+=com.das(self.dasOptions,com.dataSet)
0119                 cmd+=closeCmd(istep,'dasquery')
0120                 retStep = self.doCmd(cmd)
0121                 #don't use the file list executed, but use the das command of cmsDriver for next step
0122                 # If the das output is not there or it's empty, consider it an
0123                 # issue of this step, not of the next one.
0124                 dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
0125                 # Check created das output in no-dryRun mode only
0126                 if not self.dryRun:
0127                     if not exists(dasOutputPath):
0128                         retStep = 1
0129                         dasOutput = None
0130                     else:
0131                         # We consider only the files which have at least one logical filename
0132                         # in it. This is because sometimes das fails and still prints out junk.
0133                         dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
0134                     if not dasOutput:
0135                         retStep = 1
0136                         isInputOk = False
0137                  
0138                 inFile = 'filelist:' + basename(dasOutputPath)
0139                 print("---")
0140             else:
0141                 #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
0142                 cmd += com
0143                 if self.noRun:
0144                     cmd +=' --no_exec'
0145                 # in case previous step used DAS query (either filelist of das:)
0146                 # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
0147                 if inFile and not 'premix_stage1' in cmd:
0148                     cmd += ' --filein '+inFile
0149                     inFile=None
0150                 if lumiRangeFile: #DAS query can also restrict lumi range
0151                     cmd += ' --lumiToProcess '+lumiRangeFile
0152                     lumiRangeFile=None
0153                 # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..    
0154                 if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
0155                     cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
0156                 else:
0157                     # Disable input for premix stage1 to allow combined stage1+stage2 workflow
0158                     # Disable input for premix stage2 in FastSim to allow combined stage1+stage2 workflow (in FS, stage2 does also GEN)
0159                     # Ugly hack but works
0160                     if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
0161                         cmd+=' --filein  file:step%s.root '%(istep-1,)
0162                     if not '--fileout' in com:
0163                         cmd+=' --fileout file:step%s.root '%(istep,)
0164                 if self.jobReport:
0165                   cmd += ' --suffix "-j JobReport%s.xml " ' % istep
0166                 if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0167                   cmd += ' --nThreads %s' % self.nThreads
0168                 if (self.nStreams > 0) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0169                   cmd += ' --nStreams %s' % self.nStreams
0170                 if (self.nEvents > 0):
0171                   event_token = " -n "
0172                   split = cmd.split(event_token)
0173                   pos_cmd = " ".join(split[1].split(" ")[1:])
0174                   cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd
0175                 cmd+=closeCmd(istep,self.wf.nameId)            
0176                 retStep = 0
0177                 if istep>self.maxSteps:
0178                    wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
0179                    wf_stats.write('step%s:%s\n' % (istep, cmd))
0180                    wf_stats.close()
0181                 else: retStep = self.doCmd(cmd)
0182             
0183             self.retStep.append(retStep)
0184             if retStep == 32000:
0185                 # A timeout occurred
0186                 self.npass.append(0)
0187                 self.nfail.append(1)
0188                 self.stat.append('TIMEOUT')
0189                 aborted = True
0190             elif (retStep!=0):
0191                 #error occured
0192                 self.npass.append(0)
0193                 self.nfail.append(1)
0194                 if not isInputOk:
0195                   self.stat.append("DAS_ERROR")
0196                 else:
0197                   self.stat.append('FAILED')
0198                 #to skip processing
0199                 aborted=True
0200             else:
0201                 #things went fine
0202                 self.npass.append(1)
0203                 self.nfail.append(0)
0204                 self.stat.append('PASSED')
0205 
0206         os.chdir(startDir)
0207         endtime='date %s' %time.asctime()
0208         tottime='%s-%s'%(endtime,startime)
0209         
0210 
0211         #### wrap up ####
0212 
0213         logStat=''
0214         for i,s in enumerate(self.stat):
0215             logStat+='Step%d-%s '%(i,s)
0216         #self.report='%s_%s+%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,'+'.join(self.wf.stepList),logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
0217         self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
0218 
0219         return 
0220