Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-11 03:30:59

0001 from threading import Thread
0002 from Configuration.PyReleaseValidation import WorkFlow
0003 import os,time
0004 import shutil
0005 import re
0006 from subprocess import Popen 
0007 from os.path import exists, basename, join
0008 from datetime import datetime
0009 
0010 class WorkFlowRunner(Thread):
0011     def __init__(self, wf, opt, noRun=False, dryRun=False, cafVeto=True, jobNumber=None, gpu = None):
0012         Thread.__init__(self)
0013         self.wf = wf
0014 
0015         self.status = -1
0016         self.report  =''
0017         self.nfail = 0
0018         self.npass = 0
0019         self.noRun = noRun
0020         self.dryRun = dryRun
0021         self.cafVeto = cafVeto
0022         self.gpu = gpu
0023 
0024         self.dasOptions = opt.dasOptions
0025         self.jobReport = opt.jobReports
0026         self.nThreads = opt.nThreads
0027         self.nStreams = opt.nStreams
0028         self.maxSteps = opt.maxSteps
0029         self.nEvents = opt.nEvents
0030         self.recoOutput = ''
0031         self.startFrom = opt.startFrom
0032         self.recycle = opt.recycle
0033         
0034         self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
0035         if jobNumber is not None:
0036             self.wfDir = self.wfDir + '_job' + str(jobNumber)
0037 
0038         return
0039 
0040     def doCmd(self, cmd):
0041 
0042         msg = "\n# in: " +os.getcwd()
0043         if self.dryRun: msg += " dryRun for '"
0044         else:      msg += " going to execute "
0045         msg += cmd.replace(';','\n')
0046         print(msg)
0047 
0048         cmdLog = open(self.wfDir+'/cmdLog','a')
0049         cmdLog.write(msg+'\n')
0050         cmdLog.close()
0051         
0052         ret = 0
0053         if not self.dryRun:
0054             p = Popen(cmd, shell=True)
0055             ret = os.waitpid(p.pid, 0)[1]
0056             if ret != 0:
0057                 print("ERROR executing ",cmd,'ret=', ret)
0058 
0059         return ret
0060     
0061     def run(self):
0062 
0063         startDir = os.getcwd()
0064 
0065         if not os.path.exists(self.wfDir):
0066             os.makedirs(self.wfDir)
0067         elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
0068             print("cleaning up ", self.wfDir, ' in ', os.getcwd())
0069             shutil.rmtree(self.wfDir) 
0070             os.makedirs(self.wfDir)
0071 
0072         preamble = 'cd '+self.wfDir+'; '
0073        
0074         realstarttime = datetime.now()
0075         startime='date %s' %time.asctime()
0076 
0077         # check where we are running:
0078         onCAF = False
0079         if 'cms/caf/cms' in os.environ['CMS_PATH']:
0080             onCAF = True
0081 
0082         ##needs to set
0083         #self.report
0084         self.npass  = []
0085         self.nfail = []
0086         self.stat = []
0087         self.retStep = []
0088 
0089         def closeCmd(i,ID):
0090             return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
0091 
0092         inFile=None
0093         lumiRangeFile=None
0094         aborted=False
0095         outputExtensionForStep = {}
0096         for (istepmone,com) in enumerate(self.wf.cmds):
0097             # isInputOk is used to keep track of the das result. In case this
0098             # is False we use a different error message to indicate the failed
0099             # das query.
0100             isInputOk=True
0101             istep=istepmone+1
0102             cmd = preamble
0103             outputExtensionForStep[istep]=''
0104             if aborted:
0105                 self.npass.append(0)
0106                 self.nfail.append(0)
0107                 self.retStep.append(0)
0108                 self.stat.append('NOTRUN')
0109                 continue
0110             if not isinstance(com,str):
0111                 if self.recycle:
0112                     inFile = self.recycle
0113                     continue
0114                 if self.cafVeto and (com.location == 'CAF' and not onCAF):
0115                     print("You need to be no CAF to run",self.wf.numId)
0116                     self.npass.append(0)
0117                     self.nfail.append(0)
0118                     self.retStep.append(0)
0119                     self.stat.append('NOTRUN')
0120                     aborted=True
0121                     continue
0122                 #create lumiRange file first so if das fails we get its error code
0123                 cmd2 = com.lumiRanges()
0124                 if cmd2:
0125                     cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
0126                     lumiRangeFile='step%d_lumiRanges.log'%(istep,)
0127                     retStep = self.doCmd(cmd2)
0128                 if (com.dataSetParent):
0129                     cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
0130                     retStep = self.doCmd(cmd3)
0131                 cmd+=com.das(self.dasOptions,com.dataSet)
0132                 cmd+=closeCmd(istep,'dasquery')
0133                 retStep = self.doCmd(cmd)
0134                 #don't use the file list executed, but use the das command of cmsDriver for next step
0135                 # If the das output is not there or it's empty, consider it an
0136                 # issue of this step, not of the next one.
0137                 dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
0138                 # Check created das output in no-dryRun mode only
0139                 if not self.dryRun:
0140                     if not exists(dasOutputPath):
0141                         retStep = 1
0142                         dasOutput = None
0143                     else:
0144                         # We consider only the files which have at least one logical or physical filename
0145                         # in it. This is because sometimes das fails and still prints out junk.
0146                         dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/") or l.startswith("root://eoscms.cern.ch")]
0147                     if not dasOutput:
0148                         retStep = 1
0149                         isInputOk = False
0150                  
0151                 inFile = 'filelist:' + basename(dasOutputPath)
0152 
0153                 if com.skimEvents:
0154                     lumiRangeFile='step%d_lumiRanges.log'%(istep,)
0155                     cmd2 = preamble + "mv lumi_ranges.txt " + lumiRangeFile
0156                     retStep = self.doCmd(cmd2)
0157 
0158                 print("---")
0159 
0160             else:
0161                 #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
0162                 if self.gpu is not None:
0163                     cmd = cmd + self.gpu
0164 
0165                 cmd += com
0166 
0167                 if self.startFrom:
0168                     steps = cmd.split("-s ")[1].split(" ")[0]
0169                     if self.startFrom not in steps:
0170                         continue
0171                     else:
0172                         self.startFrom = False
0173                         inFile = self.recycle
0174                 
0175                 if self.noRun:
0176                     cmd +=' --no_exec'
0177                 # in case previous step used DAS query (either filelist of das:)
0178                 # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
0179                 if inFile and not 'premix_stage1' in cmd:
0180                     cmd += ' --filein '+inFile
0181                     inFile=None
0182                 if lumiRangeFile: #DAS query can also restrict lumi range
0183                     cmd += ' --lumiToProcess '+lumiRangeFile
0184                     lumiRangeFile=None
0185                 # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..    
0186                 if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
0187                     cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
0188                     outputExtensionForStep[istep] = '.root'
0189                 else:
0190                     # Disable input for premix stage1 to allow combined stage1+stage2 workflow
0191                     # Disable input for premix stage2 in FastSim to allow combined stage1+stage2 workflow (in FS, stage2 does also GEN)
0192                     # Ugly hack but works
0193                     extension = '.root'
0194                     if '--rntuple_out' in cmd:
0195                         extension = '.rntpl'
0196                     outputExtensionForStep[istep] = extension
0197                     if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
0198                         steps = cmd.split("-s ")[1].split(" ")[0] ## relying on the syntax: cmsDriver -s STEPS --otherFlags
0199                         if "ALCA" not in steps:
0200                             cmd+=' --filein  file:step%s%s '%(istep-1,extension)
0201                         elif "ALCA" in steps and "RECO" in steps:
0202                             cmd+=' --filein  file:step%s%s '%(istep-1,extension)
0203                         elif self.recoOutput:
0204                             cmd+=' --filein %s'%(self.recoOutput)
0205                         else:
0206                             cmd+=' --filein  file:step%s%s '%(istep-1,extension)
0207                     elif istep!=1 and '--filein' in cmd and '--filetype' not in cmd:
0208                         #make sure correct extension is being used
0209                         #find the previous state index
0210                         expression = '--filein\s+file:step([1-9])(_[a-zA-Z]+)*\.[a-z]+'
0211                         m = re.search(expression, cmd)
0212                         if m:
0213                             cmd = re.sub(expression,r'--filein file:step\1\2'+outputExtensionForStep[int(m.group(1))],cmd)
0214                         elif extension == '.rntpl':
0215                             #some ALCA steps use special file names without step_ prefix and these are also force to use RNTuple
0216                             expression = '--filein\s+file:([a-zA-Z0-9_]+)*\.[a-z]+'
0217                             m = re.search(expression, cmd)
0218                             if m:
0219                                 cmd = re.sub(expression,r'--filein file:\1.rntpl',cmd)
0220                     if not '--fileout' in com:
0221                         cmd+=' --fileout file:step%s%s '%(istep,extension)
0222                         if "RECO" in cmd:
0223                             self.recoOutput = "file:step%d%s"%(istep,extension)
0224                 if self.jobReport:
0225                   cmd += ' --suffix "-j JobReport%s.xml " ' % istep
0226                 if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0227                   cmd += ' --nThreads %s' % self.nThreads
0228                 if (self.nStreams > 0) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
0229                   cmd += ' --nStreams %s' % self.nStreams
0230                 if (self.nEvents > 0):
0231                   event_token = " -n "
0232                   split = cmd.split(event_token)
0233                   pos_cmd = " ".join(split[1].split(" ")[1:])
0234                   cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd
0235                 cmd+=closeCmd(istep,self.wf.nameId)            
0236                 retStep = 0
0237 
0238                 if istep>self.maxSteps:
0239                    wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
0240                    wf_stats.write('step%s:%s\n' % (istep, cmd))
0241                    wf_stats.close()
0242                 else: retStep = self.doCmd(cmd)
0243             
0244             self.retStep.append(retStep)
0245             if retStep == 32000:
0246                 # A timeout occurred
0247                 self.npass.append(0)
0248                 self.nfail.append(1)
0249                 self.stat.append('TIMEOUT')
0250                 aborted = True
0251             elif (retStep!=0):
0252                 #error occured
0253                 self.npass.append(0)
0254                 self.nfail.append(1)
0255                 if not isInputOk:
0256                   self.stat.append("DAS_ERROR")
0257                 else:
0258                   self.stat.append('FAILED')
0259                 #to skip processing
0260                 aborted=True
0261             else:
0262                 #things went fine
0263                 self.npass.append(1)
0264                 self.nfail.append(0)
0265                 self.stat.append('PASSED')
0266 
0267         os.chdir(startDir)
0268         endtime='date %s' %time.asctime()
0269         tottime='%s-%s'%(endtime,startime)
0270         
0271 
0272         #### wrap up ####
0273 
0274         logStat=''
0275         for i,s in enumerate(self.stat):
0276             logStat+='Step%d-%s '%(i,s)
0277         #self.report='%s_%s+%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,'+'.join(self.wf.stepList),logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
0278         self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
0279 
0280         return 
0281