File indexing completed on 2024-04-06 12:23:30
0001 from __future__ import print_function
0002 from __future__ import absolute_import
0003
0004 from builtins import range
0005 import copy, datetime, inspect, fnmatch, os, re, subprocess, sys, tempfile, time
0006 import glob
0007 import gzip
0008 import errno
0009 from .edmIntegrityCheck import PublishToFileSystem, IntegrityCheck
0010 from .addToDatasets import addToDatasets
0011
0012 from . import eostools as castortools
0013 from . import das as Das
0014
0015 from .dataset import Dataset
0016 from .datasetToSource import createDataset
0017 from .castorBaseDir import castorBaseDir
0018
0019 def mkdir_p(path):
0020 try:
0021 os.makedirs(path)
0022 except OSError as exc:
0023 if exc.errno == errno.EEXIST:
0024 pass
0025 else: raise
0026
0027 class Task(object):
0028 """Base class for Task API"""
0029 def __init__(self, name, dataset, user, options, instance = None):
0030 self.name = name
0031 self.instance = instance
0032 self.dataset = dataset
0033 self.user = user
0034 self.options = options
0035 def getname(self):
0036 """The name of the object, using the instance if needed"""
0037 if self.instance is not None:
0038 return '%s_%s' % (self.name,self.instance)
0039 else:
0040 return self.name
0041 def addOption(self, parser):
0042 """A hook for adding things to the parser"""
0043 pass
0044 def run(self, input):
0045 """Basic API for a task. input and output are dictionaries"""
0046 return {}
0047
0048 class ParseOptions(Task):
0049 """Common options for the script __main__: used by all production tasks"""
0050 def __init__(self, dataset, user, options):
0051 Task.__init__(self,'ParseOptions', dataset, user, options)
0052
0053 usage = """%prog [options] <dataset>
0054
0055 The %prog script aims to take a list of samples and process them on the batch system. Submission
0056 may be done serially (by setting --max_threads to 1), or in parallel (the default).
0057
0058 The basic flow is:
0059
0060 1) Check that the sample to run on exists
0061 2) Generate a source CFG
0062 3) Run locally and check everything works with a small number of events
0063 4) Submit to the batch system
0064 5) Wait until the jobs are finished
0065 6) Check the jobs ran OK and that the files are good
0066
0067 Example:
0068
0069 ProductionTasks.py -u cbern -w 'PFAOD*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py /QCD_Pt-1800_TuneZ2_7TeV_pythia6/Summer11-PU_S3_START42_V11-v2/AODSIM/V2
0070
0071 It is often useful to store the sample names in a file, in which case you could instead do:
0072
0073 ProductionTasks.py -w '*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py `cat samples_mc.txt`
0074
0075 An example file might contain:
0076
0077 palencia%/Tbar_TuneZ2_tW-channel-DR_7TeV-powheg-tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0078 benitezj%/ZZ_TuneZ2_7TeV_pythia6_tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0079 wreece%/ZJetsToNuNu_100_HT_200_7TeV-madgraph/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0080
0081 The CASTOR username for each sample is given before the '%'.
0082
0083 Each step in the flow has a task associated with it, which may set options. The options for each task are
0084 documented below.
0085
0086 """
0087 self.das = Das.DASOptionParser(usage=usage)
0088 def addOption(self, parser):
0089 parser.add_option("-u", "--user", dest="user", default=os.getlogin(),help='The username to use when looking at mass storage devices. Your login username is used by default.')
0090 parser.add_option("-w", "--wildcard", dest="wildcard", default='*.root',help='A UNIX style wildcard to specify which input files to check before submitting the jobs')
0091 parser.add_option("--max_threads", dest="max_threads", default=None,help='The maximum number of threads to use in the production')
0092 def run(self, input):
0093 self.options, self.dataset = self.das.get_opt()
0094 self.dataset = [d for d in self.dataset if not d.startswith('#')]
0095 self.user = self.options.user
0096 if not self.dataset:
0097 raise Exception('TaskError: No dataset specified')
0098 return {'Options':self.options, 'Dataset':self.dataset}
0099
0100 class CheckDatasetExists(Task):
0101 """Use 'datasets.py' to check that the dataset exists in the production system.
0102 """
0103 def __init__(self, dataset, user, options):
0104 Task.__init__(self,'CheckDatasetExists', dataset, user, options)
0105 def run(self, input):
0106 pattern = fnmatch.translate(self.options.wildcard)
0107 run_range = (self.options.min_run, self.options.max_run)
0108 data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
0109 if( len(data.listOfGoodFiles()) == 0 ):
0110 raise Exception('no good root file in dataset %s | %s | %s | %s' % (self.user,
0111 self.dataset,
0112 self.options.wildcard,
0113 run_range))
0114 return {'Dataset':self.dataset}
0115
0116 class BaseDataset(Task):
0117 """Query DAS to find dataset name in DBS - see https://cmsweb.cern.ch/das/"""
0118 def __init__(self, dataset, user, options):
0119 Task.__init__(self,'BaseDataset', dataset, user, options)
0120 def addOption(self, parser):
0121 parser.add_option("-n", "--name", dest="name", default=None,help='The name of the dataset in DAS. Will be guessed if not specified')
0122 def query(self, dataset):
0123 """Query DAS to find out how many events are in the dataset"""
0124
0125 host = self.options.host
0126 debug = self.options.verbose
0127 idx = self.options.idx
0128 limit = self.options.limit
0129
0130 def check(ds):
0131 query = 'dataset=%s' % ds
0132 result = Das.get_data(host, query, idx, limit, debug)
0133 result = result.replace('null','None')
0134 result = result.replace('true','True')
0135 result = result.replace('false','False')
0136 data = eval(result)
0137 if data['status'] != 'ok':
0138 raise Exception("Das query failed: Output is '%s'" % data)
0139 return (data['data'],data)
0140
0141 data = None
0142 exists = False
0143
0144 if self.options.name is None:
0145
0146 tokens = [t for t in dataset.split(os.sep) if t]
0147 if len(tokens) >= 3:
0148
0149 ds = os.sep + os.sep.join(tokens[0:3])
0150 if ds:
0151 exists, data = check(ds)
0152 self.options.name = ds
0153 else:
0154 exists, data = check(self.options.name)
0155 if not exists:
0156 raise Exception("Specified dataset '%s' not found in Das. Please check." % self.options.name)
0157
0158 if data is None:
0159 raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
0160 return data
0161
0162 def run(self, input):
0163 output = {}
0164 if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
0165 output = self.query(self.dataset)
0166 return {'Name':self.options.name,'Das':output}
0167
0168 class GZipFiles(Task):
0169 """GZip a list of files"""
0170 def __init__(self, dataset, user, options):
0171 Task.__init__(self,'GZipFiles', dataset, user, options)
0172 def gzip(self, fileName):
0173 output = '%s.gz' % fileName
0174
0175 f_in = open(fileName, 'rb')
0176 f_out = gzip.open(output, 'wb')
0177 f_out.writelines(f_in)
0178 f_out.close()
0179 f_in.close()
0180
0181 os.remove(fileName)
0182 return output
0183
0184 def run(self, input):
0185 files = input['FilesToCompress']['Files']
0186
0187 compressed = []
0188 for f in files:
0189 if f is None or not f: continue
0190 if os.path.exists(f):
0191 gz = self.gzip(f)
0192 compressed.append(gz)
0193 return {'CompressedFiles':compressed}
0194
0195 class CleanFiles(Task):
0196 """Remove a list of files"""
0197 def __init__(self, dataset, user, options):
0198 Task.__init__(self,'CleanFiles', dataset, user, options)
0199 def run(self, input):
0200 files = input['FilesToClean']['Files']
0201 removed = []
0202 for f in files:
0203 if f is None or not f: continue
0204 if os.path.exists(f): os.remove(f)
0205 removed.append(f)
0206 return {'CleanedFiles':removed}
0207
0208 class FindOnCastor(Task):
0209 """Checks that the sample specified exists in the CASTOR area of the user specified. The directory must exist."""
0210 def __init__(self, dataset, user, options):
0211 Task.__init__(self,'FindOnCastor', dataset, user, options)
0212 def run(self, input):
0213 if self.user == 'CMS':
0214 return {'Topdir':None,'Directory':None}
0215 topdir = castortools.lfnToCastor(castorBaseDir(user=self.user))
0216 directory = '%s/%s' % (topdir,self.dataset)
0217
0218 if not castortools.fileExists(directory):
0219 if hasattr(self,'create') and self.create:
0220 castortools.createCastorDir(directory)
0221
0222 if not castortools.isDirectory(directory):
0223 raise Exception("Dataset directory '%s' does not exist or could not be created" % directory)
0224 return {'Topdir':topdir,'Directory':directory}
0225
0226 class CheckForMask(Task):
0227 """Tests if a file mask, created by edmIntegrityCheck.py, is present already and reads it if so."""
0228 def __init__(self, dataset, user, options):
0229 Task.__init__(self,'CheckForMask', dataset, user, options)
0230 def addOption(self, parser):
0231 parser.add_option("-c", "--check", dest="check", default=False, action='store_true',help='Check filemask if available')
0232 def run(self, input):
0233
0234 if self.user == 'CMS':
0235 return {'MaskPresent':True,'Report':'Files taken from DBS'}
0236
0237 dir = input['FindOnCastor']['Directory']
0238 mask = "IntegrityCheck"
0239 file_mask = []
0240
0241 report = None
0242 if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
0243 file_mask = castortools.matchingFiles(dir, '^%s_.*\.txt$' % mask)
0244
0245 if file_mask:
0246 p = PublishToFileSystem(mask)
0247 report = p.get(dir)
0248 return {'MaskPresent':report is not None,'Report':report}
0249
0250 class CheckForWrite(Task):
0251 """Checks whether you have write access to the CASTOR directory specified"""
0252 def __init__(self, dataset, user, options):
0253 Task.__init__(self,'CheckForWrite', dataset, user, options)
0254 def run(self, input):
0255 """Check that the directory is writable"""
0256 if self.user == 'CMS':
0257 return {'Directory':None,'WriteAccess':True}
0258 dir = input['FindOnCastor']['Directory']
0259 if self.options.check:
0260
0261 _, name = tempfile.mkstemp('.txt',text=True)
0262 testFile = file(name,'w')
0263 testFile.write('Test file')
0264 testFile.close()
0265
0266 store = castortools.castorToLFN(dir)
0267
0268 if not os.system('cmsStage %s %s' % (name,store)):
0269 fname = '%s/%s' % (dir,os.path.basename(name))
0270 write = castortools.fileExists(fname)
0271 if write:
0272 castortools.rm(fname)
0273 else:
0274 raise Exception("Failed to write to directory '%s'" % dir)
0275 os.remove(name)
0276 return {'Directory':dir,'WriteAccess':True}
0277
0278 class GenerateMask(Task):
0279 """Uses edmIntegrityCheck.py to generate a file mask for the sample if one is not already present."""
0280 def __init__(self, dataset, user, options):
0281 Task.__init__(self,'GenerateMask', dataset, user, options)
0282 def addOption(self, parser):
0283 parser.add_option("-r", "--recursive", dest="resursive", default=False, action='store_true',help='Walk the mass storage device recursively')
0284 parser.add_option("-p", "--printout", dest="printout", default=False, action='store_true',help='Print a report to stdout')
0285 def run(self, input):
0286
0287 report = None
0288 if self.options.check and not input['CheckForMask']['MaskPresent']:
0289
0290 options = copy.deepcopy(self.options)
0291 options.user = self.user
0292
0293 if 'BaseDataset' in input:
0294 options.name = input['BaseDataset']['Name']
0295 else:
0296 options.name = None
0297
0298 check = IntegrityCheck(self.dataset,options)
0299 check.test()
0300 report = check.structured()
0301 pub = PublishToFileSystem(check)
0302 pub.publish(report)
0303 elif input['CheckForMask']['MaskPresent']:
0304 report = input['CheckForMask']['Report']
0305
0306 return {'MaskPresent':report is not None,'Report':report}
0307
0308 class CreateJobDirectory(Task):
0309 """Generates a job directory on your local drive"""
0310 def __init__(self, dataset, user, options):
0311 Task.__init__(self,'CreateJobDirectory', dataset, user, options)
0312 def addOption(self, parser):
0313 parser.add_option("-o","--output", dest="output", default=None,help='The directory to use locally for job files')
0314 def run(self, input):
0315 if self.options.output is not None:
0316 output = self.options.output
0317 else:
0318
0319
0320 output = '%s_%s' % (self.dataset,datetime.datetime.now().strftime("%s"))
0321 output = output.lstrip('/')
0322 if not os.path.exists(output):
0323 mkdir_p(output)
0324 return {'JobDir':output,'PWD':os.getcwd()}
0325
0326 class SourceCFG(Task):
0327 """Generate a source CFG using 'sourceFileList.py' by listing the CASTOR directory specified. Applies the file wildcard, '--wildcard'"""
0328 def __init__(self, dataset, user, options):
0329 Task.__init__(self,'SourceCFG', dataset, user, options)
0330 def addOption(self, parser):
0331 parser.add_option("--min-run", dest="min_run", default=-1, type=int, help='When querying DBS, require runs >= than this run')
0332 parser.add_option("--max-run", dest="max_run", default=-1, type=int, help='When querying DBS, require runs <= than this run')
0333 parser.add_option("--input-prescale", dest="prescale", default=1, type=int, help='Randomly prescale the number of good files by this factor.')
0334 def run(self, input):
0335
0336 jobdir = input['CreateJobDirectory']['JobDir']
0337 pattern = fnmatch.translate(self.options.wildcard)
0338
0339 run_range = (self.options.min_run, self.options.max_run)
0340 data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
0341 good_files = data.listOfGoodFilesWithPrescale(self.options.prescale)
0342
0343 bad_files = [fname for fname in data.listOfFiles() if not fname in good_files]
0344
0345 source = os.path.join(jobdir,'source_cfg.py')
0346 output = file(source,'w')
0347 output.write('###SourceCFG:\t%d GoodFiles; %d BadFiles found in mask; Input prescale factor %d\n' % (len(good_files),len(bad_files),self.options.prescale) )
0348 output.write('files = ' + str(good_files) + '\n')
0349 for bad_file in bad_files:
0350 output.write("###SourceCFG:\tBadInMask '%s'\n" % bad_file)
0351 output.close()
0352 return {'SourceCFG':source}
0353
0354
0355 def insertLines( insertedTo, toInsert ):
0356 '''insert a sequence in another sequence.
0357
0358 the sequence is inserted either at the end, or at the position
0359 of the HOOK, if it is found.
0360 The HOOK is considered as being found if
0361 str(elem).find(###ProductionTaskHook$$$)
0362 is true for one of the elements in the insertedTo sequence.
0363 '''
0364 HOOK = '###ProductionTaskHook$$$'
0365 hookIndex = None
0366 for index, line in enumerate(insertedTo):
0367 line = str(line)
0368 if line.find(HOOK)>-1:
0369 hookIndex = index
0370 break
0371 if hookIndex is not None:
0372 before = insertedTo[:hookIndex]
0373 after = insertedTo[hookIndex:]
0374 result = before + toInsert + after
0375 return result
0376 else:
0377 insertedTo.extend( toInsert )
0378 return insertedTo
0379
0380
0381 class FullCFG(Task):
0382 """Generate the full CFG needed to run the job and writes it to the job directory"""
0383 def __init__(self, dataset, user, options):
0384 Task.__init__(self,'FullCFG', dataset, user, options)
0385 def addOption(self, parser):
0386 parser.add_option("--cfg", dest="cfg", default=None, help='The top level CFG to run')
0387 parser.add_option("--nEventsPerJob", dest="nEventsPerJob", default=None, help='Number of events per job (for testing)')
0388 def run(self, input):
0389
0390 jobdir = input['CreateJobDirectory']['JobDir']
0391
0392 if self.options.cfg is None or not os.path.exists(self.options.cfg):
0393 raise Exception("The file '%s' does not exist. Please check." % self.options.cfg)
0394
0395 config = file(self.options.cfg).readlines()
0396 sourceFile = os.path.basename(input['SourceCFG']['SourceCFG'])
0397 if sourceFile.lower().endswith('.py'):
0398 sourceFile = sourceFile[:-3]
0399
0400 source = os.path.join(jobdir,'full_cfg.py')
0401 output = file(source,'w')
0402
0403 nEventsPerJob = -1
0404 if self.options.nEventsPerJob:
0405 nEventsPerJob = int(self.options.nEventsPerJob)
0406
0407 toInsert = ['\nfrom %s import *\n' % sourceFile,
0408 'process.source.fileNames = files\n',
0409 'if hasattr(process,"maxEvents"): process.maxEvents.input = cms.untracked.int32({nEvents})\n'.format(nEvents=nEventsPerJob),
0410 'if hasattr(process,"maxLuminosityBlocks"): process.maxLuminosityBlocks.input = cms.untracked.int32(-1)\n'
0411 'datasetInfo = ("%s","%s","%s")\n' % (self.user, self.dataset, fnmatch.translate(self.options.wildcard) )
0412 ]
0413 config = insertLines( config, toInsert )
0414 output.writelines(config)
0415 output.close()
0416 return {'FullCFG':source}
0417
0418 class CheckConfig(Task):
0419 """Check the basic syntax of a CFG file by running python on it."""
0420 def __init__(self, dataset, user, options):
0421 Task.__init__(self,'CheckConfig', dataset, user, options)
0422 def run(self, input):
0423
0424 full = input['FullCFG']['FullCFG']
0425
0426 child = subprocess.Popen(['python',full], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0427 stdout, stderr = child.communicate()
0428 if child.returncode != 0:
0429 raise Exception("Syntax check of cfg failed. Error was '%s'. (%i)" % (stderr,child.returncode))
0430 return {'Status':'VALID'}
0431
0432 class RunTestEvents(Task):
0433 """Run cmsRun but with a small number of events on the job CFG."""
0434
0435 def __init__(self, dataset, user, options):
0436 Task.__init__(self,'RunTestEvents', dataset, user, options)
0437 def run(self, input):
0438
0439 full = input['FullCFG']['FullCFG']
0440 jobdir = input['CreateJobDirectory']['JobDir']
0441
0442 config = file(full).readlines()
0443 source = os.path.join(jobdir,'test_cfg.py')
0444 output = file(source,'w')
0445 toInsert = ['\n',
0446 'process.maxEvents.input = cms.untracked.int32(5)\n',
0447 'if hasattr(process,"source"): process.source.fileNames = process.source.fileNames[:10]\n'
0448 ]
0449 config = insertLines( config, toInsert )
0450 output.writelines(config)
0451 output.close()
0452
0453 pwd = os.getcwd()
0454
0455 error = None
0456 try:
0457 os.chdir(jobdir)
0458
0459 child = subprocess.Popen(['cmsRun',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0460 stdout, stderr = child.communicate()
0461
0462 if child.returncode != 0:
0463 error = "Failed to cmsRun with a few events. Error was '%s' (%i)." % (stderr,child.returncode)
0464 finally:
0465 os.chdir(pwd)
0466
0467 if error is not None:
0468 raise Exception(error)
0469
0470 return {'Status':'VALID','TestCFG':source}
0471
0472 class ExpandConfig(Task):
0473 """Runs edmConfigDump to produce an expanded cfg file"""
0474
0475 def __init__(self, dataset, user, options):
0476 Task.__init__(self,'ExpandConfig', dataset, user, options)
0477 def run(self, input):
0478
0479 full = input['FullCFG']['FullCFG']
0480 jobdir = input['CreateJobDirectory']['JobDir']
0481
0482 config = file(full).read()
0483 source = os.path.join(jobdir,'test_cfg.py')
0484 expanded = 'Expanded%s' % os.path.basename(full)
0485 output = file(source,'w')
0486 output.write(config)
0487 output.write("file('%s','w').write(process.dumpPython())\n" % expanded)
0488 output.close()
0489
0490 pwd = os.getcwd()
0491
0492 result = {}
0493 error = None
0494 try:
0495 os.chdir(jobdir)
0496
0497 child = subprocess.Popen(['python',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0498 stdout, stderr = child.communicate()
0499
0500 if child.returncode != 0:
0501 error = "Failed to edmConfigDump. Error was '%s' (%i)." % (stderr,child.returncode)
0502 result['ExpandedFullCFG'] = os.path.join(jobdir,expanded)
0503
0504 finally:
0505 os.chdir(pwd)
0506
0507 if error is not None:
0508 raise Exception(error)
0509
0510 return result
0511
0512 class WriteToDatasets(Task):
0513 """Publish the sample to 'Datasets.txt' if required"""
0514 def __init__(self, dataset, user, options):
0515 Task.__init__(self,'WriteToDatasets', dataset, user, options)
0516 def run(self, input):
0517 name = "%s/%s" % (self.dataset,self.options.tier)
0518 name = name.replace('//','/')
0519 user = self.options.batch_user
0520 added = addToDatasets(name, user = user)
0521 return {'Added':added, 'Name':name, 'User':user}
0522
0523 class RunCMSBatch(Task):
0524 """Run the 'cmsBatch.py' command on your CFG, submitting to the CERN batch system"""
0525
0526 def __init__(self, dataset, user, options):
0527 Task.__init__(self,'RunCMSBatch', dataset, user, options)
0528 def addOption(self, parser):
0529 parser.add_option("--batch_user", dest="batch_user", help="The user for LSF", default=os.getlogin())
0530 parser.add_option("--run_batch", dest="run_batch", default=True, action='store_true',help='Run on the batch system')
0531 parser.add_option("-N", "--numberOfInputFiles", dest="nInput",help="Number of input files per job",default=5,type=int)
0532 parser.add_option("-q", "--queue", dest="queue", help="The LSF queue to use", default="1nh")
0533 parser.add_option("-t", "--tier", dest="tier",
0534 help="Tier: extension you can give to specify you are doing a new production. If you give a Tier, your new files will appear in sampleName/tierName, which will constitute a new dataset.",
0535 default="")
0536 parser.add_option("-G", "--group", dest="group", help="The LSF user group to use, e.g. 'u_zh'", default=None)
0537
0538 def run(self, input):
0539 find = FindOnCastor(self.dataset,self.options.batch_user,self.options)
0540 find.create = True
0541 out = find.run({})
0542
0543 full = input['ExpandConfig']['ExpandedFullCFG']
0544 jobdir = input['CreateJobDirectory']['JobDir']
0545
0546 sampleDir = os.path.join(out['Directory'],self.options.tier)
0547 sampleDir = castortools.castorToLFN(sampleDir)
0548
0549 cmd = ['cmsBatch.py',str(self.options.nInput),os.path.basename(full),'-o','%s_Jobs' % self.options.tier,'--force']
0550 cmd.extend(['-r',sampleDir])
0551 if self.options.run_batch:
0552 jname = "%s/%s" % (self.dataset,self.options.tier)
0553 jname = jname.replace("//","/")
0554 user_group = ''
0555 if self.options.group is not None:
0556 user_group = '-G %s' % self.options.group
0557 cmd.extend(['-b',"'bsub -q %s -J %s -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id.txt'" % (self.options.queue,jname,user_group)])
0558 print(" ".join(cmd))
0559
0560 pwd = os.getcwd()
0561
0562 error = None
0563 try:
0564 os.chdir(jobdir)
0565 returncode = os.system(" ".join(cmd))
0566
0567 if returncode != 0:
0568 error = "Running cmsBatch failed. Return code was %i." % returncode
0569 finally:
0570 os.chdir(pwd)
0571
0572 if error is not None:
0573 raise Exception(error)
0574
0575 return {'SampleDataset':"%s/%s" % (self.dataset,self.options.tier),'BatchUser':self.options.batch_user,
0576 'SampleOutputDir':sampleDir,'LSFJobsTopDir':os.path.join(jobdir,'%s_Jobs' % self.options.tier)}
0577
0578 class MonitorJobs(Task):
0579 """Monitor LSF jobs created with cmsBatch.py. Blocks until all jobs are finished."""
0580 def __init__(self, dataset, user, options):
0581 Task.__init__(self,'MonitorJobs', dataset, user, options)
0582
0583 def getjobid(self, job_dir):
0584 """Parse the LSF output to find the job id"""
0585 input = os.path.join(job_dir,'job_id.txt')
0586 result = None
0587 if os.path.exists(input):
0588 contents = file(input).read()
0589 for c in contents.split('\n'):
0590 if c and re.match('^Job <\\d*> is submitted to queue <.*>',c) is not None:
0591 try:
0592 result = c.split('<')[1].split('>')[0]
0593 except Exception as e:
0594 print('Job ID parsing error',str(e),c, file=sys.stderr)
0595 return result
0596
0597 def monitor(self, jobs, previous):
0598
0599
0600 cmd = ['bjobs','-u',self.options.batch_user]
0601 cmd.extend([v for v in jobs.values() if v is not None])
0602 child = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0603 stdout, stderr = child.communicate()
0604
0605 def parseHeader(header):
0606 """Parse the header from bjobs"""
0607 tokens = [t for t in header.split(' ') if t]
0608 result = {}
0609 for i in range(len(tokens)):
0610 result[tokens[i]] = i
0611
0612 return result
0613
0614 result = {}
0615 if stdout:
0616 lines = stdout.split('\n')
0617 if lines:
0618 header = parseHeader(lines[0])
0619 if not 'STAT' in header or not 'JOBID' in header:
0620 print('Problem parsing bjobs header\n',lines, file=sys.stderr)
0621 return result
0622 for line in lines[1:]:
0623
0624 tokens = [t for t in line.split(' ') if t]
0625 if len(tokens) < len(header): continue
0626 id = tokens[header['JOBID']]
0627 user = tokens[header['USER']]
0628 status = tokens[header['STAT']]
0629
0630 result[id] = status
0631
0632 if stderr:
0633 lines = stderr.split('\n')
0634 if lines:
0635 for line in lines:
0636 if line and re.match('^Job <\\d*> is not found',line) is not None:
0637 try:
0638 id = line.split('<')[1].split('>')[0]
0639 if id not in result and id not in previous:
0640 result[id] = 'FORGOTTEN'
0641 except Exception as e:
0642 print('Job ID parsing error in STDERR',str(e),line, file=sys.stderr)
0643
0644
0645 if result:
0646 for id in jobs.values():
0647 if id not in result and id in previous:
0648 result[id] = previous[id]
0649 return result
0650
0651 def run(self, input):
0652
0653
0654 jobsdir = input['RunCMSBatch']['LSFJobsTopDir']
0655 if not os.path.exists(jobsdir):
0656 raise Exception("LSF jobs dir does not exist: '%s'" % jobsdir)
0657
0658 subjobs = [s for s in glob.glob("%s/Job_[0-9]*" % jobsdir) if os.path.isdir(s)]
0659 jobs = {}
0660 for s in subjobs:
0661 jobs[s] = self.getjobid(s)
0662
0663 def checkStatus(stat):
0664
0665
0666 actions = {'FilesToCompress':{'Files':[]}}
0667
0668 result = {}
0669 for j, id in jobs.items():
0670 if id is None:
0671 result[j] = 'UNKNOWN'
0672 else:
0673 if id in stat:
0674 result[j] = stat[id]
0675 if result[j] in ['DONE','EXIT','FORGOTTEN']:
0676 stdout = os.path.join(j,'LSFJOB_%s' % id,'STDOUT')
0677 if os.path.exists(stdout):
0678
0679 actions['FilesToCompress']['Files'].append(stdout)
0680 result[j] = '%s.gz' % stdout
0681 elif os.path.exists('%s.gz' % stdout):
0682 result[j] = '%s.gz' % stdout
0683 else:
0684 result[j] = 'NOSTDOUT'
0685
0686
0687 stderr = os.path.join(j,'LSFJOB_%s' % id,'STDERR')
0688 if os.path.exists(stderr):
0689
0690 actions['FilesToCompress']['Files'].append(stderr)
0691
0692 compress = GZipFiles(self.dataset,self.user,self.options)
0693 compress.run(actions)
0694 return result
0695
0696 def countJobs(stat):
0697 """Count jobs that are monitorable - i.e. not in a final state"""
0698 result = []
0699 for j, id in jobs.items():
0700 if id is not None and id in stat:
0701 st = stat[id]
0702 if st in ['PEND','PSUSP','RUN','USUSP','SSUSP','WAIT']:
0703 result.append(id)
0704 return result
0705
0706 def writeKillScript(mon):
0707 """Write a shell script to kill the jobs we know about"""
0708 kill = os.path.join(jobsdir,'kill_jobs.sh')
0709 output = file(kill,'w')
0710 script = """
0711 #!/usr/bin/env bash
0712 echo "Killing jobs"
0713 bkill -u %s %s
0714 """ % (self.options.batch_user," ".join(mon))
0715 output.write(script)
0716 output.close()
0717 return mon
0718
0719
0720 status = self.monitor(jobs,{})
0721 monitorable = writeKillScript(countJobs(status))
0722 count = 0
0723
0724 while monitorable:
0725 job_status = checkStatus(status)
0726 time.sleep(60)
0727 status = self.monitor(jobs,status)
0728 monitorable = writeKillScript(countJobs(status))
0729 if not (count % 3):
0730 print('%s: Monitoring %i jobs (%s)' % (self.name,len(monitorable),self.dataset))
0731 count += 1
0732
0733 return {'LSFJobStatus':checkStatus(status),'LSFJobIDs':jobs}
0734
0735 class CheckJobStatus(Task):
0736 """Checks the job STDOUT to catch common problems like exceptions, CPU time exceeded. Sets the job status in the report accordingly."""
0737 def __init__(self, dataset, user, options):
0738 Task.__init__(self,'CheckJobStatus', dataset, user, options)
0739 def addOption(self, parser):
0740 parser.add_option("--output_wildcard", dest="output_wildcard", help="The wildcard to use when testing the output of this production (defaults to same as -w)", default=None)
0741 def run(self, input):
0742
0743 job_status = input['MonitorJobs']['LSFJobStatus']
0744
0745 result = {}
0746 for j, status in job_status.items():
0747 valid = True
0748 if os.path.exists(status):
0749
0750 fileHandle = None
0751 if status.endswith('.gz') or status.endswith('.GZ'):
0752 fileHandle = gzip.GzipFile(status)
0753 else:
0754 fileHandle = file(status)
0755
0756 open_count = 0
0757 close_count = 0
0758 for line in fileHandle:
0759
0760
0761 if 'pened file' in line:
0762 open_count += 1
0763 if 'losed file' in line:
0764 close_count += 1
0765
0766 if 'Exception' in line:
0767 result[j] = 'Exception'
0768 valid = False
0769 break
0770 elif 'CPU time limit exceeded' in line:
0771 result[j] = 'CPUTimeExceeded'
0772 valid = False
0773 break
0774 elif 'Killed' in line:
0775 result[j] = 'JobKilled'
0776 valid = False
0777 break
0778 elif 'A fatal system signal has occurred' in line:
0779 result[j] = 'SegFault'
0780 valid = False
0781 break
0782
0783 if valid and open_count != close_count:
0784 result[j] = 'FileOpenCloseMismatch'
0785 valid = False
0786 if valid:
0787 result[j] = 'VALID'
0788 else:
0789 result[j] = status
0790
0791
0792 options = copy.deepcopy(self.options)
0793 if self.options.output_wildcard is not None:
0794 options.wildcard = self.options.output_wildcard
0795
0796 mask = GenerateMask(input['RunCMSBatch']['SampleDataset'],self.options.batch_user,options)
0797 report = mask.run({'CheckForMask':{'MaskPresent':False}})
0798 report['LSFJobStatusCheck'] = result
0799 return report
0800
0801 class WriteJobReport(Task):
0802 """Write a summary report on each job"""
0803 def __init__(self, dataset, user, options):
0804 Task.__init__(self,'WriteJobReport', dataset, user, options)
0805 def run(self, input):
0806
0807 report = input['CheckJobStatus']
0808
0809
0810 states = {}
0811 for j, status in report['LSFJobStatusCheck'].items():
0812 if status not in states:
0813 states[status] = []
0814 states[status].append(j)
0815 jobdir = input['CreateJobDirectory']['PWD']
0816 if not os.path.exists(jobdir):
0817 raise Exception("Top level job directory not found: '%s'" % jobdir)
0818 report_file = os.path.join(input['CreateJobDirectory']['JobDir'],'resubmit.sh')
0819
0820 output = file(report_file,'w')
0821 output.write('#!/usr/bin/env bash\n')
0822
0823 if report['MaskPresent']:
0824 mask = report['Report']
0825 output.write('#PrimaryDatasetFraction: %f\n' % mask['PrimaryDatasetFraction'])
0826 output.write('#FilesGood: %i\n' % mask['FilesGood'])
0827 output.write('#FilesBad: %i\n' % mask['FilesBad'])
0828
0829 user_group = ''
0830 if self.options.group is not None:
0831 user_group = '-G %s' % self.options.group
0832
0833 for status, jobs in states.items():
0834 output.write('# %d jobs found in state %s\n' % (len(jobs),status) )
0835 if status == 'VALID':
0836 continue
0837 for j in jobs:
0838 jdir = os.path.join(jobdir,j)
0839 output.write('pushd %s; bsub -q %s -J RESUB -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id_resub.txt; popd\n' % (jdir,self.options.queue,user_group))
0840 output.close()
0841
0842 return {'SummaryFile':report_file}
0843
0844 class CleanJobFiles(Task):
0845 """Removes and compresses auto-generated files from the job directory to save space."""
0846 def __init__(self, dataset, user, options):
0847 Task.__init__(self,'CleanJobFiles', dataset, user, options)
0848 def run(self, input):
0849
0850 jobdir = input['CreateJobDirectory']['JobDir']
0851 jobs = input['MonitorJobs']['LSFJobIDs']
0852 job_status = input['MonitorJobs']['LSFJobStatus']
0853
0854 actions = {'FilesToCompress':{'Files':[]},'FilesToClean':{'Files':[]}}
0855
0856 actions['FilesToClean']['Files'].append(input['ExpandConfig']['ExpandedFullCFG'])
0857 if 'RunTestEvents' in input:
0858 actions['FilesToClean']['Files'].append(input['RunTestEvents']['TestCFG'])
0859
0860 for rt in glob.iglob('%s/*.root' % jobdir):
0861 actions['FilesToClean']['Files'].append(rt)
0862 for pyc in glob.iglob('%s/*.pyc' % jobdir):
0863 actions['FilesToClean']['Files'].append(pyc)
0864
0865 for j in jobs:
0866 status = job_status[j]
0867 if os.path.exists(status) and not status.endswith('.gz'):
0868 actions['FilesToCompress']['Files'].append(status)
0869
0870 compress = GZipFiles(self.dataset,self.user,self.options)
0871 compressed = compress.run(actions)
0872
0873 clean = CleanFiles(self.dataset,self.user,self.options)
0874 removed = clean.run(actions)
0875 return {'Cleaned':removed,'Compressed':compressed}
0876