Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
from threading import Thread
from Configuration.PyReleaseValidation import WorkFlow
import os,time
import shutil
import re
from subprocess import Popen 
from os.path import exists, basename, join
from datetime import datetime

class WorkFlowRunner(Thread):
    def __init__(self, wf, opt, noRun=False, dryRun=False, cafVeto=True, jobNumber=None, gpu = None):
        Thread.__init__(self)
        self.wf = wf

        self.status = -1
        self.report  =''
        self.nfail = 0
        self.npass = 0
        self.noRun = noRun
        self.dryRun = dryRun
        self.cafVeto = cafVeto
        self.gpu = gpu

        self.dasOptions = opt.dasOptions
        self.jobReport = opt.jobReports
        self.nThreads = opt.nThreads
        self.nStreams = opt.nStreams
        self.maxSteps = opt.maxSteps
        self.nEvents = opt.nEvents
        self.recoOutput = ''
        self.startFrom = opt.startFrom
        self.recycle = opt.recycle
        
        self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
        if jobNumber is not None:
            self.wfDir = self.wfDir + '_job' + str(jobNumber)

        return

    def doCmd(self, cmd):

        msg = "\n# in: " +os.getcwd()
        if self.dryRun: msg += " dryRun for '"
        else:      msg += " going to execute "
        msg += cmd.replace(';','\n')
        print(msg)

        cmdLog = open(self.wfDir+'/cmdLog','a')
        cmdLog.write(msg+'\n')
        cmdLog.close()
        
        ret = 0
        if not self.dryRun:
            p = Popen(cmd, shell=True)
            ret = os.waitpid(p.pid, 0)[1]
            if ret != 0:
                print("ERROR executing ",cmd,'ret=', ret)

        return ret
    
    def run(self):

        startDir = os.getcwd()

        if not os.path.exists(self.wfDir):
            os.makedirs(self.wfDir)
        elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
            print("cleaning up ", self.wfDir, ' in ', os.getcwd())
            shutil.rmtree(self.wfDir) 
            os.makedirs(self.wfDir)

        preamble = 'cd '+self.wfDir+'; '
       
        realstarttime = datetime.now()
        startime='date %s' %time.asctime()

        # check where we are running:
        onCAF = False
        if 'cms/caf/cms' in os.environ['CMS_PATH']:
            onCAF = True

        ##needs to set
        #self.report
        self.npass  = []
        self.nfail = []
        self.stat = []
        self.retStep = []

        def closeCmd(i,ID):
            return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)

        inFile=None
        lumiRangeFile=None
        aborted=False
        outputExtensionForStep = {}
        for (istepmone,com) in enumerate(self.wf.cmds):
            # isInputOk is used to keep track of the das result. In case this
            # is False we use a different error message to indicate the failed
            # das query.
            isInputOk=True
            istep=istepmone+1
            cmd = preamble
            outputExtensionForStep[istep]=''
            if aborted:
                self.npass.append(0)
                self.nfail.append(0)
                self.retStep.append(0)
                self.stat.append('NOTRUN')
                continue
            if not isinstance(com,str):
                if self.recycle:
                    inFile = self.recycle
                    continue
                if self.cafVeto and (com.location == 'CAF' and not onCAF):
                    print("You need to be no CAF to run",self.wf.numId)
                    self.npass.append(0)
                    self.nfail.append(0)
                    self.retStep.append(0)
                    self.stat.append('NOTRUN')
                    aborted=True
                    continue
                #create lumiRange file first so if das fails we get its error code
                cmd2 = com.lumiRanges()
                if cmd2:
                    cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
                    lumiRangeFile='step%d_lumiRanges.log'%(istep,)
                    retStep = self.doCmd(cmd2)
                if (com.dataSetParent):
                    cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
                    retStep = self.doCmd(cmd3)
                cmd+=com.das(self.dasOptions,com.dataSet)
                cmd+=closeCmd(istep,'dasquery')
                retStep = self.doCmd(cmd)
                #don't use the file list executed, but use the das command of cmsDriver for next step
                # If the das output is not there or it's empty, consider it an
                # issue of this step, not of the next one.
                dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
                # Check created das output in no-dryRun mode only
                if not self.dryRun:
                    if not exists(dasOutputPath):
                        retStep = 1
                        dasOutput = None
                    else:
                        # We consider only the files which have at least one logical or physical filename
                        # in it. This is because sometimes das fails and still prints out junk.
                        dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/") or l.startswith("root://eoscms.cern.ch")]
                    if not dasOutput:
                        retStep = 1
                        isInputOk = False
                 
                inFile = 'filelist:' + basename(dasOutputPath)

                if com.skimEvents:
                    lumiRangeFile='step%d_lumiRanges.log'%(istep,)
                    cmd2 = preamble + "mv lumi_ranges.txt " + lumiRangeFile
                    retStep = self.doCmd(cmd2)

                print("---")

            else:
                #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
                if self.gpu is not None:
                    cmd = cmd + self.gpu

                cmd += com

                if self.startFrom:
                    steps = cmd.split("-s ")[1].split(" ")[0]
                    if self.startFrom not in steps:
                        continue
                    else:
                        self.startFrom = False
                        inFile = self.recycle
                
                if self.noRun:
                    cmd +=' --no_exec'
                # in case previous step used DAS query (either filelist of das:)
                # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
                if inFile and not 'premix_stage1' in cmd:
                    cmd += ' --filein '+inFile
                    inFile=None
                if lumiRangeFile: #DAS query can also restrict lumi range
                    cmd += ' --lumiToProcess '+lumiRangeFile
                    lumiRangeFile=None
                # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..    
                if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
                    cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
                    outputExtensionForStep[istep] = '.root'
                else:
                    # Disable input for premix stage1 to allow combined stage1+stage2 workflow
                    # Disable input for premix stage2 in FastSim to allow combined stage1+stage2 workflow (in FS, stage2 does also GEN)
                    # Ugly hack but works
                    extension = '.root'
                    if '--rntuple_out' in cmd:
                        extension = '.rntpl'
                    outputExtensionForStep[istep] = extension
                    if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
                        steps = cmd.split("-s ")[1].split(" ")[0] ## relying on the syntax: cmsDriver -s STEPS --otherFlags
                        if "ALCA" not in steps:
                            cmd+=' --filein  file:step%s%s '%(istep-1,extension)
                        elif "ALCA" in steps and "RECO" in steps:
                            cmd+=' --filein  file:step%s%s '%(istep-1,extension)
                        elif self.recoOutput:
                            cmd+=' --filein %s'%(self.recoOutput)
                        else:
                            cmd+=' --filein  file:step%s%s '%(istep-1,extension)
                    elif istep!=1 and '--filein' in cmd and '--filetype' not in cmd:
                        #make sure correct extension is being used
                        #find the previous state index
                        expression = '--filein\s+file:step([1-9])(_[a-zA-Z]+)*\.[a-z]+'
                        m = re.search(expression, cmd)
                        if m:
                            cmd = re.sub(expression,r'--filein file:step\1\2'+outputExtensionForStep[int(m.group(1))],cmd)
                        elif extension == '.rntpl':
                            #some ALCA steps use special file names without step_ prefix and these are also force to use RNTuple
                            expression = '--filein\s+file:([a-zA-Z0-9_]+)*\.[a-z]+'
                            m = re.search(expression, cmd)
                            if m:
                                cmd = re.sub(expression,r'--filein file:\1.rntpl',cmd)
                    if not '--fileout' in com:
                        cmd+=' --fileout file:step%s%s '%(istep,extension)
                        if "RECO" in cmd:
                            self.recoOutput = "file:step%d%s"%(istep,extension)
                if self.jobReport:
                  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
                if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
                  cmd += ' --nThreads %s' % self.nThreads
                if (self.nStreams > 0) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
                  cmd += ' --nStreams %s' % self.nStreams
                if (self.nEvents > 0):
                  event_token = " -n "
                  split = cmd.split(event_token)
                  pos_cmd = " ".join(split[1].split(" ")[1:])
                  cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd
                cmd+=closeCmd(istep,self.wf.nameId)            
                retStep = 0

                if istep>self.maxSteps:
                   wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
                   wf_stats.write('step%s:%s\n' % (istep, cmd))
                   wf_stats.close()
                else: retStep = self.doCmd(cmd)
            
            self.retStep.append(retStep)
            if retStep == 32000:
                # A timeout occurred
                self.npass.append(0)
                self.nfail.append(1)
                self.stat.append('TIMEOUT')
                aborted = True
            elif (retStep!=0):
                #error occured
                self.npass.append(0)
                self.nfail.append(1)
                if not isInputOk:
                  self.stat.append("DAS_ERROR")
                else:
                  self.stat.append('FAILED')
                #to skip processing
                aborted=True
            else:
                #things went fine
                self.npass.append(1)
                self.nfail.append(0)
                self.stat.append('PASSED')

        os.chdir(startDir)
        endtime='date %s' %time.asctime()
        tottime='%s-%s'%(endtime,startime)
        

        #### wrap up ####

        logStat=''
        for i,s in enumerate(self.stat):
            logStat+='Step%d-%s '%(i,s)
        #self.report='%s_%s+%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,'+'.join(self.wf.stepList),logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
        self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'

        return