Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-02-14 12:52:45

0001 from __future__ import print_function
0002 from threading import Thread
0003 from Configuration.PyReleaseValidation import WorkFlow
0004 import os,time
0005 import shutil
0006 from subprocess import Popen 
0007 from os.path import exists, basename, join
0008 from datetime import datetime
0009 
0010 class WorkFlowRunner(Thread):
0011     def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, nStreams=0, maxSteps=9999):
0012         Thread.__init__(self)
0013         self.wf = wf
0014 
0015         self.status=-1
0016         self.report=''
0017         self.nfail=0
0018         self.npass=0
0019         self.noRun=noRun
0020         self.dryRun=dryRun
0021         self.cafVeto=cafVeto
0022         self.dasOptions=dasOptions
0023         self.jobReport=jobReport
0024         self.nThreads=nThreads
0025         self.nStreams=nStreams
0026         self.maxSteps=maxSteps
0027         
0028         self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
0029         return
0030 
0031     def doCmd(self, cmd):
0032 
0033         msg = "\n# in: " +os.getcwd()
0034         if self.dryRun: msg += " dryRun for '"
0035         else:      msg += " going to execute "
0036         msg += cmd.replace(';','\n')
0037         print(msg)
0038 
0039         cmdLog = open(self.wfDir+'/cmdLog','a')
0040         cmdLog.write(msg+'\n')
0041         cmdLog.close()
0042         
0043         ret = 0
0044         if not self.dryRun:
0045             p = Popen(cmd, shell=True)
0046             ret = os.waitpid(p.pid, 0)[1]
0047             if ret != 0:
0048                 print("ERROR executing ",cmd,'ret=', ret)
0049 
0050         return ret
0051     
0052     def run(self):
0053 
0054         startDir = os.getcwd()
0055 
0056         if not os.path.exists(self.wfDir):
0057             os.makedirs(self.wfDir)
0058         elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
0059             print("cleaning up ", self.wfDir, ' in ', os.getcwd())
0060             shutil.rmtree(self.wfDir) 
0061             os.makedirs(self.wfDir)
0062 
0063         preamble = 'cd '+self.wfDir+'; '
0064        
0065         realstarttime = datetime.now()
0066         startime='date %s' %time.asctime()
0067 
0068         # check where we are running:
0069         onCAF = False
0070         if 'cms/caf/cms' in os.environ['CMS_PATH']:
0071             onCAF = True
0072 
0073         ##needs to set
0074         #self.report
0075         self.npass  = []
0076         self.nfail = []
0077         self.stat = []
0078         self.retStep = []
0079 
0080         def closeCmd(i,ID):
0081             return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
0082 
0083         inFile=None
0084         lumiRangeFile=None
0085         aborted=False
0086         for (istepmone,com) in enumerate(self.wf.cmds):
0087             # isInputOk is used to keep track of the das result. In case this
0088             # is False we use a different error message to indicate the failed
0089             # das query.
0090             isInputOk=True
0091             istep=istepmone+1
0092             cmd = preamble
0093             if aborted:
0094                 self.npass.append(0)
0095                 self.nfail.append(0)
0096                 self.retStep.append(0)
0097                 self.stat.append('NOTRUN')
0098                 continue
0099             if not isinstance(com,str):
0100                 if self.cafVeto and (com.location == 'CAF' and not onCAF):
0101                     print("You need to be no CAF to run",self.wf.numId)
0102                     self.npass.append(0)
0103                     self.nfail.append(0)
0104                     self.retStep.append(0)
0105                     self.stat.append('NOTRUN')
0106                     aborted=True
0107                     continue
0108                 #create lumiRange file first so if das fails we get its error code
0109                 cmd2 = com.lumiRanges()
0110                 if cmd2:
0111                     cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
0112                     lumiRangeFile='step%d_lumiRanges.log'%(istep,)
0113                     retStep = self.doCmd(cmd2)
0114                 if (com.dataSetParent):
0115                     cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
0116                     retStep = self.doCmd(cmd3)
0117                 cmd+=com.das(self.dasOptions,com.dataSet)
0118                 cmd+=closeCmd(istep,'dasquery')
0119                 retStep = self.doCmd(cmd)
0120                 #don't use the file list executed, but use the das command of cmsDriver for next step
0121                 # If the das output is not there or it's empty, consider it an
0122                 # issue of this step, not of the next one.
0123                 dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
0124                 # Check created das output in no-dryRun mode only
0125                 if not self.dryRun:
0126                     if not exists(dasOutputPath):
0127                         retStep = 1
0128                         dasOutput = None
0129                     else:
0130                         # We consider only the files which have at least one logical filename
0131                         # in it. This is because sometimes das fails and still prints out junk.
0132                         dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
0133                     if not dasOutput:
0134                         retStep = 1
0135                         isInputOk = False
0136                  
0137                 inFile = 'filelist:' + basename(dasOutputPath)
0138                 print("---")
0139             else:
0140                 #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
0141                 cmd += com
0142                 if self.noRun:
0143                     cmd +=' --no_exec'
0144                 # in case previous step used DAS query (either filelist of das:)
0145                 # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
0146                 if inFile and not 'premix_stage1' in cmd:
0147                     cmd += ' --filein '+inFile
0148                     inFile=None
0149                 if lumiRangeFile: #DAS query can also restrict lumi range
0150                     cmd += ' --lumiToProcess '+lumiRangeFile
0151                     lumiRangeFile=None
0152                 # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..    
0153                 if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
0154                     cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
0155                 else:
0156                     # Disable input for premix stage1 to allow combined stage1+stage2 workflow
0157                     # Disable input for premix stage2 in FastSim to allow combined stage1+stage2 workflow (in FS, stage2 does also GEN)
0158                     # Ugly hack but works
0159                     if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
0160                         cmd+=' --filein  file:step%s.root '%(istep-1,)
0161                     if not '--fileout' in com:
0162                         cmd+=' --fileout file:step%s.root '%(istep,)
0163                 if self.jobReport:
0164                   cmd += ' --suffix "-j JobReport%s.xml " ' % istep
0165                 if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0166                   cmd += ' --nThreads %s' % self.nThreads
0167                 if (self.nStreams > 0) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0168                   cmd += ' --nStreams %s' % self.nStreams
0169                 cmd+=closeCmd(istep,self.wf.nameId)            
0170                 retStep = 0
0171                 if istep>self.maxSteps:
0172                    wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
0173                    wf_stats.write('step%s:%s\n' % (istep, cmd))
0174                    wf_stats.close()
0175                 else: retStep = self.doCmd(cmd)
0176             
0177             self.retStep.append(retStep)
0178             if retStep == 32000:
0179                 # A timeout occurred
0180                 self.npass.append(0)
0181                 self.nfail.append(1)
0182                 self.stat.append('TIMEOUT')
0183                 aborted = True
0184             elif (retStep!=0):
0185                 #error occured
0186                 self.npass.append(0)
0187                 self.nfail.append(1)
0188                 if not isInputOk:
0189                   self.stat.append("DAS_ERROR")
0190                 else:
0191                   self.stat.append('FAILED')
0192                 #to skip processing
0193                 aborted=True
0194             else:
0195                 #things went fine
0196                 self.npass.append(1)
0197                 self.nfail.append(0)
0198                 self.stat.append('PASSED')
0199 
0200         os.chdir(startDir)
0201         endtime='date %s' %time.asctime()
0202         tottime='%s-%s'%(endtime,startime)
0203         
0204 
0205         #### wrap up ####
0206 
0207         logStat=''
0208         for i,s in enumerate(self.stat):
0209             logStat+='Step%d-%s '%(i,s)
0210         self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
0211 
0212         return 
0213