1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
|
from threading import Thread
from Configuration.PyReleaseValidation import WorkFlow
import os,time
import shutil
import re
from subprocess import Popen
from os.path import exists, basename, join
from datetime import datetime
class WorkFlowRunner(Thread):
def __init__(self, wf, opt, noRun=False, dryRun=False, cafVeto=True, jobNumber=None, gpu = None):
Thread.__init__(self)
self.wf = wf
self.status = -1
self.report =''
self.nfail = 0
self.npass = 0
self.noRun = noRun
self.dryRun = dryRun
self.cafVeto = cafVeto
self.gpu = gpu
self.dasOptions = opt.dasOptions
self.jobReport = opt.jobReports
self.nThreads = opt.nThreads
self.nStreams = opt.nStreams
self.maxSteps = opt.maxSteps
self.nEvents = opt.nEvents
self.recoOutput = ''
self.startFrom = opt.startFrom
self.recycle = opt.recycle
self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
if jobNumber is not None:
self.wfDir = self.wfDir + '_job' + str(jobNumber)
return
def doCmd(self, cmd):
msg = "\n# in: " +os.getcwd()
if self.dryRun: msg += " dryRun for '"
else: msg += " going to execute "
msg += cmd.replace(';','\n')
print(msg)
cmdLog = open(self.wfDir+'/cmdLog','a')
cmdLog.write(msg+'\n')
cmdLog.close()
ret = 0
if not self.dryRun:
p = Popen(cmd, shell=True)
ret = os.waitpid(p.pid, 0)[1]
if ret != 0:
print("ERROR executing ",cmd,'ret=', ret)
return ret
def run(self):
startDir = os.getcwd()
if not os.path.exists(self.wfDir):
os.makedirs(self.wfDir)
elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
print("cleaning up ", self.wfDir, ' in ', os.getcwd())
shutil.rmtree(self.wfDir)
os.makedirs(self.wfDir)
preamble = 'cd '+self.wfDir+'; '
realstarttime = datetime.now()
startime='date %s' %time.asctime()
# check where we are running:
onCAF = False
if 'cms/caf/cms' in os.environ['CMS_PATH']:
onCAF = True
##needs to set
#self.report
self.npass = []
self.nfail = []
self.stat = []
self.retStep = []
def closeCmd(i,ID):
return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
inFile=None
lumiRangeFile=None
aborted=False
outputExtensionForStep = {}
for (istepmone,com) in enumerate(self.wf.cmds):
# isInputOk is used to keep track of the das result. In case this
# is False we use a different error message to indicate the failed
# das query.
isInputOk=True
istep=istepmone+1
cmd = preamble
outputExtensionForStep[istep]=''
if aborted:
self.npass.append(0)
self.nfail.append(0)
self.retStep.append(0)
self.stat.append('NOTRUN')
continue
if not isinstance(com,str):
if self.recycle:
inFile = self.recycle
continue
if self.cafVeto and (com.location == 'CAF' and not onCAF):
print("You need to be no CAF to run",self.wf.numId)
self.npass.append(0)
self.nfail.append(0)
self.retStep.append(0)
self.stat.append('NOTRUN')
aborted=True
continue
#create lumiRange file first so if das fails we get its error code
cmd2 = com.lumiRanges()
if cmd2:
cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
lumiRangeFile='step%d_lumiRanges.log'%(istep,)
retStep = self.doCmd(cmd2)
if (com.dataSetParent):
cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
retStep = self.doCmd(cmd3)
cmd+=com.das(self.dasOptions,com.dataSet)
cmd+=closeCmd(istep,'dasquery')
retStep = self.doCmd(cmd)
#don't use the file list executed, but use the das command of cmsDriver for next step
# If the das output is not there or it's empty, consider it an
# issue of this step, not of the next one.
dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
# Check created das output in no-dryRun mode only
if not self.dryRun:
if not exists(dasOutputPath):
retStep = 1
dasOutput = None
else:
# We consider only the files which have at least one logical or physical filename
# in it. This is because sometimes das fails and still prints out junk.
dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/") or l.startswith("root://eoscms.cern.ch")]
if not dasOutput:
retStep = 1
isInputOk = False
inFile = 'filelist:' + basename(dasOutputPath)
if com.skimEvents:
lumiRangeFile='step%d_lumiRanges.log'%(istep,)
cmd2 = preamble + "mv lumi_ranges.txt " + lumiRangeFile
retStep = self.doCmd(cmd2)
print("---")
else:
#chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
if self.gpu is not None:
cmd = cmd + self.gpu
cmd += com
if self.startFrom:
steps = cmd.split("-s ")[1].split(" ")[0]
if self.startFrom not in steps:
continue
else:
self.startFrom = False
inFile = self.recycle
if self.noRun:
cmd +=' --no_exec'
# in case previous step used DAS query (either filelist of das:)
# not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
if inFile and not 'premix_stage1' in cmd:
cmd += ' --filein '+inFile
inFile=None
if lumiRangeFile: #DAS query can also restrict lumi range
cmd += ' --lumiToProcess '+lumiRangeFile
lumiRangeFile=None
# 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
outputExtensionForStep[istep] = '.root'
else:
# Disable input for premix stage1 to allow combined stage1+stage2 workflow
# Disable input for premix stage2 in FastSim to allow combined stage1+stage2 workflow (in FS, stage2 does also GEN)
# Ugly hack but works
extension = '.root'
if '--rntuple_out' in cmd:
extension = '.rntpl'
outputExtensionForStep[istep] = extension
if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
steps = cmd.split("-s ")[1].split(" ")[0] ## relying on the syntax: cmsDriver -s STEPS --otherFlags
if "ALCA" not in steps:
cmd+=' --filein file:step%s%s '%(istep-1,extension)
elif "ALCA" in steps and "RECO" in steps:
cmd+=' --filein file:step%s%s '%(istep-1,extension)
elif self.recoOutput:
cmd+=' --filein %s'%(self.recoOutput)
else:
cmd+=' --filein file:step%s%s '%(istep-1,extension)
elif istep!=1 and '--filein' in cmd and '--filetype' not in cmd:
#make sure correct extension is being used
#find the previous state index
expression = '--filein\s+file:step([1-9])(_[a-zA-Z]+)*\.[a-z]+'
m = re.search(expression, cmd)
if m:
cmd = re.sub(expression,r'--filein file:step\1\2'+outputExtensionForStep[int(m.group(1))],cmd)
elif extension == '.rntpl':
#some ALCA steps use special file names without step_ prefix and these are also force to use RNTuple
expression = '--filein\s+file:([a-zA-Z0-9_]+)*\.[a-z]+'
m = re.search(expression, cmd)
if m:
cmd = re.sub(expression,r'--filein file:\1.rntpl',cmd)
if not '--fileout' in com:
cmd+=' --fileout file:step%s%s '%(istep,extension)
if "RECO" in cmd:
self.recoOutput = "file:step%d%s"%(istep,extension)
if self.jobReport:
cmd += ' --suffix "-j JobReport%s.xml " ' % istep
if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
cmd += ' --nThreads %s' % self.nThreads
if (self.nStreams > 0) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
cmd += ' --nStreams %s' % self.nStreams
if (self.nEvents > 0):
event_token = " -n "
split = cmd.split(event_token)
pos_cmd = " ".join(split[1].split(" ")[1:])
cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd
cmd+=closeCmd(istep,self.wf.nameId)
retStep = 0
if istep>self.maxSteps:
wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
wf_stats.write('step%s:%s\n' % (istep, cmd))
wf_stats.close()
else: retStep = self.doCmd(cmd)
self.retStep.append(retStep)
if retStep == 32000:
# A timeout occurred
self.npass.append(0)
self.nfail.append(1)
self.stat.append('TIMEOUT')
aborted = True
elif (retStep!=0):
#error occured
self.npass.append(0)
self.nfail.append(1)
if not isInputOk:
self.stat.append("DAS_ERROR")
else:
self.stat.append('FAILED')
#to skip processing
aborted=True
else:
#things went fine
self.npass.append(1)
self.nfail.append(0)
self.stat.append('PASSED')
os.chdir(startDir)
endtime='date %s' %time.asctime()
tottime='%s-%s'%(endtime,startime)
#### wrap up ####
logStat=''
for i,s in enumerate(self.stat):
logStat+='Step%d-%s '%(i,s)
#self.report='%s_%s+%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,'+'.join(self.wf.stepList),logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
return
|