File indexing completed on 2024-11-25 02:29:50
0001
0002 from builtins import range
0003 import copy, datetime, inspect, fnmatch, os, re, subprocess, sys, tempfile, time
0004 import glob
0005 import gzip
0006 import errno
0007 from .edmIntegrityCheck import PublishToFileSystem, IntegrityCheck
0008 from .addToDatasets import addToDatasets
0009
0010 from . import eostools as castortools
0011 from . import das as Das
0012
0013 from .dataset import Dataset
0014 from .datasetToSource import createDataset
0015 from .castorBaseDir import castorBaseDir
0016
0017 def mkdir_p(path):
0018 try:
0019 os.makedirs(path)
0020 except OSError as exc:
0021 if exc.errno == errno.EEXIST:
0022 pass
0023 else: raise
0024
0025 class Task(object):
0026 """Base class for Task API"""
0027 def __init__(self, name, dataset, user, options, instance = None):
0028 self.name = name
0029 self.instance = instance
0030 self.dataset = dataset
0031 self.user = user
0032 self.options = options
0033 def getname(self):
0034 """The name of the object, using the instance if needed"""
0035 if self.instance is not None:
0036 return '%s_%s' % (self.name,self.instance)
0037 else:
0038 return self.name
0039 def addOption(self, parser):
0040 """A hook for adding things to the parser"""
0041 pass
0042 def run(self, input):
0043 """Basic API for a task. input and output are dictionaries"""
0044 return {}
0045
0046 class ParseOptions(Task):
0047 """Common options for the script __main__: used by all production tasks"""
0048 def __init__(self, dataset, user, options):
0049 Task.__init__(self,'ParseOptions', dataset, user, options)
0050
0051 usage = """%prog [options] <dataset>
0052
0053 The %prog script aims to take a list of samples and process them on the batch system. Submission
0054 may be done serially (by setting --max_threads to 1), or in parallel (the default).
0055
0056 The basic flow is:
0057
0058 1) Check that the sample to run on exists
0059 2) Generate a source CFG
0060 3) Run locally and check everything works with a small number of events
0061 4) Submit to the batch system
0062 5) Wait until the jobs are finished
0063 6) Check the jobs ran OK and that the files are good
0064
0065 Example:
0066
0067 ProductionTasks.py -u cbern -w 'PFAOD*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py /QCD_Pt-1800_TuneZ2_7TeV_pythia6/Summer11-PU_S3_START42_V11-v2/AODSIM/V2
0068
0069 It is often useful to store the sample names in a file, in which case you could instead do:
0070
0071 ProductionTasks.py -w '*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py `cat samples_mc.txt`
0072
0073 An example file might contain:
0074
0075 palencia%/Tbar_TuneZ2_tW-channel-DR_7TeV-powheg-tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0076 benitezj%/ZZ_TuneZ2_7TeV_pythia6_tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0077 wreece%/ZJetsToNuNu_100_HT_200_7TeV-madgraph/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0078
0079 The CASTOR username for each sample is given before the '%'.
0080
0081 Each step in the flow has a task associated with it, which may set options. The options for each task are
0082 documented below.
0083
0084 """
0085 self.das = Das.DASOptionParser(usage=usage)
0086 def addOption(self, parser):
0087 parser.add_option("-u", "--user", dest="user", default=os.getlogin(),help='The username to use when looking at mass storage devices. Your login username is used by default.')
0088 parser.add_option("-w", "--wildcard", dest="wildcard", default='*.root',help='A UNIX style wildcard to specify which input files to check before submitting the jobs')
0089 parser.add_option("--max_threads", dest="max_threads", default=None,help='The maximum number of threads to use in the production')
0090 def run(self, input):
0091 self.options, self.dataset = self.das.get_opt()
0092 self.dataset = [d for d in self.dataset if not d.startswith('#')]
0093 self.user = self.options.user
0094 if not self.dataset:
0095 raise Exception('TaskError: No dataset specified')
0096 return {'Options':self.options, 'Dataset':self.dataset}
0097
0098 class CheckDatasetExists(Task):
0099 """Use 'datasets.py' to check that the dataset exists in the production system.
0100 """
0101 def __init__(self, dataset, user, options):
0102 Task.__init__(self,'CheckDatasetExists', dataset, user, options)
0103 def run(self, input):
0104 pattern = fnmatch.translate(self.options.wildcard)
0105 run_range = (self.options.min_run, self.options.max_run)
0106 data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
0107 if( len(data.listOfGoodFiles()) == 0 ):
0108 raise Exception('no good root file in dataset %s | %s | %s | %s' % (self.user,
0109 self.dataset,
0110 self.options.wildcard,
0111 run_range))
0112 return {'Dataset':self.dataset}
0113
0114 class BaseDataset(Task):
0115 """Query DAS to find dataset name in DBS - see https://cmsweb.cern.ch/das/"""
0116 def __init__(self, dataset, user, options):
0117 Task.__init__(self,'BaseDataset', dataset, user, options)
0118 def addOption(self, parser):
0119 parser.add_option("-n", "--name", dest="name", default=None,help='The name of the dataset in DAS. Will be guessed if not specified')
0120 def query(self, dataset):
0121 """Query DAS to find out how many events are in the dataset"""
0122
0123 host = self.options.host
0124 debug = self.options.verbose
0125 idx = self.options.idx
0126 limit = self.options.limit
0127
0128 def check(ds):
0129 query = 'dataset=%s' % ds
0130 result = Das.get_data(host, query, idx, limit, debug)
0131 result = result.replace('null','None')
0132 result = result.replace('true','True')
0133 result = result.replace('false','False')
0134 data = eval(result)
0135 if data['status'] != 'ok':
0136 raise Exception("Das query failed: Output is '%s'" % data)
0137 return (data['data'],data)
0138
0139 data = None
0140 exists = False
0141
0142 if self.options.name is None:
0143
0144 tokens = [t for t in dataset.split(os.sep) if t]
0145 if len(tokens) >= 3:
0146
0147 ds = os.sep + os.sep.join(tokens[0:3])
0148 if ds:
0149 exists, data = check(ds)
0150 self.options.name = ds
0151 else:
0152 exists, data = check(self.options.name)
0153 if not exists:
0154 raise Exception("Specified dataset '%s' not found in Das. Please check." % self.options.name)
0155
0156 if data is None:
0157 raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
0158 return data
0159
0160 def run(self, input):
0161 output = {}
0162 if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
0163 output = self.query(self.dataset)
0164 return {'Name':self.options.name,'Das':output}
0165
0166 class GZipFiles(Task):
0167 """GZip a list of files"""
0168 def __init__(self, dataset, user, options):
0169 Task.__init__(self,'GZipFiles', dataset, user, options)
0170 def gzip(self, fileName):
0171 output = '%s.gz' % fileName
0172
0173 f_in = open(fileName, 'rb')
0174 f_out = gzip.open(output, 'wb')
0175 f_out.writelines(f_in)
0176 f_out.close()
0177 f_in.close()
0178
0179 os.remove(fileName)
0180 return output
0181
0182 def run(self, input):
0183 files = input['FilesToCompress']['Files']
0184
0185 compressed = []
0186 for f in files:
0187 if f is None or not f: continue
0188 if os.path.exists(f):
0189 gz = self.gzip(f)
0190 compressed.append(gz)
0191 return {'CompressedFiles':compressed}
0192
0193 class CleanFiles(Task):
0194 """Remove a list of files"""
0195 def __init__(self, dataset, user, options):
0196 Task.__init__(self,'CleanFiles', dataset, user, options)
0197 def run(self, input):
0198 files = input['FilesToClean']['Files']
0199 removed = []
0200 for f in files:
0201 if f is None or not f: continue
0202 if os.path.exists(f): os.remove(f)
0203 removed.append(f)
0204 return {'CleanedFiles':removed}
0205
0206 class FindOnCastor(Task):
0207 """Checks that the sample specified exists in the CASTOR area of the user specified. The directory must exist."""
0208 def __init__(self, dataset, user, options):
0209 Task.__init__(self,'FindOnCastor', dataset, user, options)
0210 def run(self, input):
0211 if self.user == 'CMS':
0212 return {'Topdir':None,'Directory':None}
0213 topdir = castortools.lfnToCastor(castorBaseDir(user=self.user))
0214 directory = '%s/%s' % (topdir,self.dataset)
0215
0216 if not castortools.fileExists(directory):
0217 if hasattr(self,'create') and self.create:
0218 castortools.createCastorDir(directory)
0219
0220 if not castortools.isDirectory(directory):
0221 raise Exception("Dataset directory '%s' does not exist or could not be created" % directory)
0222 return {'Topdir':topdir,'Directory':directory}
0223
0224 class CheckForMask(Task):
0225 """Tests if a file mask, created by edmIntegrityCheck.py, is present already and reads it if so."""
0226 def __init__(self, dataset, user, options):
0227 Task.__init__(self,'CheckForMask', dataset, user, options)
0228 def addOption(self, parser):
0229 parser.add_option("-c", "--check", dest="check", default=False, action='store_true',help='Check filemask if available')
0230 def run(self, input):
0231
0232 if self.user == 'CMS':
0233 return {'MaskPresent':True,'Report':'Files taken from DBS'}
0234
0235 dir = input['FindOnCastor']['Directory']
0236 mask = "IntegrityCheck"
0237 file_mask = []
0238
0239 report = None
0240 if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
0241 file_mask = castortools.matchingFiles(dir, '^%s_.*\\.txt$' % mask)
0242
0243 if file_mask:
0244 p = PublishToFileSystem(mask)
0245 report = p.get(dir)
0246 return {'MaskPresent':report is not None,'Report':report}
0247
0248 class CheckForWrite(Task):
0249 """Checks whether you have write access to the CASTOR directory specified"""
0250 def __init__(self, dataset, user, options):
0251 Task.__init__(self,'CheckForWrite', dataset, user, options)
0252 def run(self, input):
0253 """Check that the directory is writable"""
0254 if self.user == 'CMS':
0255 return {'Directory':None,'WriteAccess':True}
0256 dir = input['FindOnCastor']['Directory']
0257 if self.options.check:
0258
0259 _, name = tempfile.mkstemp('.txt',text=True)
0260 testFile = file(name,'w')
0261 testFile.write('Test file')
0262 testFile.close()
0263
0264 store = castortools.castorToLFN(dir)
0265
0266 if not os.system('cmsStage %s %s' % (name,store)):
0267 fname = '%s/%s' % (dir,os.path.basename(name))
0268 write = castortools.fileExists(fname)
0269 if write:
0270 castortools.rm(fname)
0271 else:
0272 raise Exception("Failed to write to directory '%s'" % dir)
0273 os.remove(name)
0274 return {'Directory':dir,'WriteAccess':True}
0275
0276 class GenerateMask(Task):
0277 """Uses edmIntegrityCheck.py to generate a file mask for the sample if one is not already present."""
0278 def __init__(self, dataset, user, options):
0279 Task.__init__(self,'GenerateMask', dataset, user, options)
0280 def addOption(self, parser):
0281 parser.add_option("-r", "--recursive", dest="resursive", default=False, action='store_true',help='Walk the mass storage device recursively')
0282 parser.add_option("-p", "--printout", dest="printout", default=False, action='store_true',help='Print a report to stdout')
0283 def run(self, input):
0284
0285 report = None
0286 if self.options.check and not input['CheckForMask']['MaskPresent']:
0287
0288 options = copy.deepcopy(self.options)
0289 options.user = self.user
0290
0291 if 'BaseDataset' in input:
0292 options.name = input['BaseDataset']['Name']
0293 else:
0294 options.name = None
0295
0296 check = IntegrityCheck(self.dataset,options)
0297 check.test()
0298 report = check.structured()
0299 pub = PublishToFileSystem(check)
0300 pub.publish(report)
0301 elif input['CheckForMask']['MaskPresent']:
0302 report = input['CheckForMask']['Report']
0303
0304 return {'MaskPresent':report is not None,'Report':report}
0305
0306 class CreateJobDirectory(Task):
0307 """Generates a job directory on your local drive"""
0308 def __init__(self, dataset, user, options):
0309 Task.__init__(self,'CreateJobDirectory', dataset, user, options)
0310 def addOption(self, parser):
0311 parser.add_option("-o","--output", dest="output", default=None,help='The directory to use locally for job files')
0312 def run(self, input):
0313 if self.options.output is not None:
0314 output = self.options.output
0315 else:
0316
0317
0318 output = '%s_%s' % (self.dataset,datetime.datetime.now().strftime("%s"))
0319 output = output.lstrip('/')
0320 if not os.path.exists(output):
0321 mkdir_p(output)
0322 return {'JobDir':output,'PWD':os.getcwd()}
0323
0324 class SourceCFG(Task):
0325 """Generate a source CFG using 'sourceFileList.py' by listing the CASTOR directory specified. Applies the file wildcard, '--wildcard'"""
0326 def __init__(self, dataset, user, options):
0327 Task.__init__(self,'SourceCFG', dataset, user, options)
0328 def addOption(self, parser):
0329 parser.add_option("--min-run", dest="min_run", default=-1, type=int, help='When querying DBS, require runs >= than this run')
0330 parser.add_option("--max-run", dest="max_run", default=-1, type=int, help='When querying DBS, require runs <= than this run')
0331 parser.add_option("--input-prescale", dest="prescale", default=1, type=int, help='Randomly prescale the number of good files by this factor.')
0332 def run(self, input):
0333
0334 jobdir = input['CreateJobDirectory']['JobDir']
0335 pattern = fnmatch.translate(self.options.wildcard)
0336
0337 run_range = (self.options.min_run, self.options.max_run)
0338 data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
0339 good_files = data.listOfGoodFilesWithPrescale(self.options.prescale)
0340
0341 bad_files = [fname for fname in data.listOfFiles() if not fname in good_files]
0342
0343 source = os.path.join(jobdir,'source_cfg.py')
0344 output = file(source,'w')
0345 output.write('###SourceCFG:\t%d GoodFiles; %d BadFiles found in mask; Input prescale factor %d\n' % (len(good_files),len(bad_files),self.options.prescale) )
0346 output.write('files = ' + str(good_files) + '\n')
0347 for bad_file in bad_files:
0348 output.write("###SourceCFG:\tBadInMask '%s'\n" % bad_file)
0349 output.close()
0350 return {'SourceCFG':source}
0351
0352
0353 def insertLines( insertedTo, toInsert ):
0354 '''insert a sequence in another sequence.
0355
0356 the sequence is inserted either at the end, or at the position
0357 of the HOOK, if it is found.
0358 The HOOK is considered as being found if
0359 str(elem).find(###ProductionTaskHook$$$)
0360 is true for one of the elements in the insertedTo sequence.
0361 '''
0362 HOOK = '###ProductionTaskHook$$$'
0363 hookIndex = None
0364 for index, line in enumerate(insertedTo):
0365 line = str(line)
0366 if line.find(HOOK)>-1:
0367 hookIndex = index
0368 break
0369 if hookIndex is not None:
0370 before = insertedTo[:hookIndex]
0371 after = insertedTo[hookIndex:]
0372 result = before + toInsert + after
0373 return result
0374 else:
0375 insertedTo.extend( toInsert )
0376 return insertedTo
0377
0378
0379 class FullCFG(Task):
0380 """Generate the full CFG needed to run the job and writes it to the job directory"""
0381 def __init__(self, dataset, user, options):
0382 Task.__init__(self,'FullCFG', dataset, user, options)
0383 def addOption(self, parser):
0384 parser.add_option("--cfg", dest="cfg", default=None, help='The top level CFG to run')
0385 parser.add_option("--nEventsPerJob", dest="nEventsPerJob", default=None, help='Number of events per job (for testing)')
0386 def run(self, input):
0387
0388 jobdir = input['CreateJobDirectory']['JobDir']
0389
0390 if self.options.cfg is None or not os.path.exists(self.options.cfg):
0391 raise Exception("The file '%s' does not exist. Please check." % self.options.cfg)
0392
0393 config = file(self.options.cfg).readlines()
0394 sourceFile = os.path.basename(input['SourceCFG']['SourceCFG'])
0395 if sourceFile.lower().endswith('.py'):
0396 sourceFile = sourceFile[:-3]
0397
0398 source = os.path.join(jobdir,'full_cfg.py')
0399 output = file(source,'w')
0400
0401 nEventsPerJob = -1
0402 if self.options.nEventsPerJob:
0403 nEventsPerJob = int(self.options.nEventsPerJob)
0404
0405 toInsert = ['\nfrom %s import *\n' % sourceFile,
0406 'process.source.fileNames = files\n',
0407 'if hasattr(process,"maxEvents"): process.maxEvents.input = cms.untracked.int32({nEvents})\n'.format(nEvents=nEventsPerJob),
0408 'if hasattr(process,"maxLuminosityBlocks"): process.maxLuminosityBlocks.input = cms.untracked.int32(-1)\n'
0409 'datasetInfo = ("%s","%s","%s")\n' % (self.user, self.dataset, fnmatch.translate(self.options.wildcard) )
0410 ]
0411 config = insertLines( config, toInsert )
0412 output.writelines(config)
0413 output.close()
0414 return {'FullCFG':source}
0415
0416 class CheckConfig(Task):
0417 """Check the basic syntax of a CFG file by running python on it."""
0418 def __init__(self, dataset, user, options):
0419 Task.__init__(self,'CheckConfig', dataset, user, options)
0420 def run(self, input):
0421
0422 full = input['FullCFG']['FullCFG']
0423
0424 child = subprocess.Popen(['python',full], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0425 stdout, stderr = child.communicate()
0426 if child.returncode != 0:
0427 raise Exception("Syntax check of cfg failed. Error was '%s'. (%i)" % (stderr,child.returncode))
0428 return {'Status':'VALID'}
0429
0430 class RunTestEvents(Task):
0431 """Run cmsRun but with a small number of events on the job CFG."""
0432
0433 def __init__(self, dataset, user, options):
0434 Task.__init__(self,'RunTestEvents', dataset, user, options)
0435 def run(self, input):
0436
0437 full = input['FullCFG']['FullCFG']
0438 jobdir = input['CreateJobDirectory']['JobDir']
0439
0440 config = file(full).readlines()
0441 source = os.path.join(jobdir,'test_cfg.py')
0442 output = file(source,'w')
0443 toInsert = ['\n',
0444 'process.maxEvents.input = cms.untracked.int32(5)\n',
0445 'if hasattr(process,"source"): process.source.fileNames = process.source.fileNames[:10]\n'
0446 ]
0447 config = insertLines( config, toInsert )
0448 output.writelines(config)
0449 output.close()
0450
0451 pwd = os.getcwd()
0452
0453 error = None
0454 try:
0455 os.chdir(jobdir)
0456
0457 child = subprocess.Popen(['cmsRun',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0458 stdout, stderr = child.communicate()
0459
0460 if child.returncode != 0:
0461 error = "Failed to cmsRun with a few events. Error was '%s' (%i)." % (stderr,child.returncode)
0462 finally:
0463 os.chdir(pwd)
0464
0465 if error is not None:
0466 raise Exception(error)
0467
0468 return {'Status':'VALID','TestCFG':source}
0469
0470 class ExpandConfig(Task):
0471 """Runs edmConfigDump to produce an expanded cfg file"""
0472
0473 def __init__(self, dataset, user, options):
0474 Task.__init__(self,'ExpandConfig', dataset, user, options)
0475 def run(self, input):
0476
0477 full = input['FullCFG']['FullCFG']
0478 jobdir = input['CreateJobDirectory']['JobDir']
0479
0480 config = file(full).read()
0481 source = os.path.join(jobdir,'test_cfg.py')
0482 expanded = 'Expanded%s' % os.path.basename(full)
0483 output = file(source,'w')
0484 output.write(config)
0485 output.write("file('%s','w').write(process.dumpPython())\n" % expanded)
0486 output.close()
0487
0488 pwd = os.getcwd()
0489
0490 result = {}
0491 error = None
0492 try:
0493 os.chdir(jobdir)
0494
0495 child = subprocess.Popen(['python',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0496 stdout, stderr = child.communicate()
0497
0498 if child.returncode != 0:
0499 error = "Failed to edmConfigDump. Error was '%s' (%i)." % (stderr,child.returncode)
0500 result['ExpandedFullCFG'] = os.path.join(jobdir,expanded)
0501
0502 finally:
0503 os.chdir(pwd)
0504
0505 if error is not None:
0506 raise Exception(error)
0507
0508 return result
0509
0510 class WriteToDatasets(Task):
0511 """Publish the sample to 'Datasets.txt' if required"""
0512 def __init__(self, dataset, user, options):
0513 Task.__init__(self,'WriteToDatasets', dataset, user, options)
0514 def run(self, input):
0515 name = "%s/%s" % (self.dataset,self.options.tier)
0516 name = name.replace('//','/')
0517 user = self.options.batch_user
0518 added = addToDatasets(name, user = user)
0519 return {'Added':added, 'Name':name, 'User':user}
0520
0521 class RunCMSBatch(Task):
0522 """Run the 'cmsBatch.py' command on your CFG, submitting to the CERN batch system"""
0523
0524 def __init__(self, dataset, user, options):
0525 Task.__init__(self,'RunCMSBatch', dataset, user, options)
0526 def addOption(self, parser):
0527 parser.add_option("--batch_user", dest="batch_user", help="The user for LSF", default=os.getlogin())
0528 parser.add_option("--run_batch", dest="run_batch", default=True, action='store_true',help='Run on the batch system')
0529 parser.add_option("-N", "--numberOfInputFiles", dest="nInput",help="Number of input files per job",default=5,type=int)
0530 parser.add_option("-q", "--queue", dest="queue", help="The LSF queue to use", default="1nh")
0531 parser.add_option("-t", "--tier", dest="tier",
0532 help="Tier: extension you can give to specify you are doing a new production. If you give a Tier, your new files will appear in sampleName/tierName, which will constitute a new dataset.",
0533 default="")
0534 parser.add_option("-G", "--group", dest="group", help="The LSF user group to use, e.g. 'u_zh'", default=None)
0535
0536 def run(self, input):
0537 find = FindOnCastor(self.dataset,self.options.batch_user,self.options)
0538 find.create = True
0539 out = find.run({})
0540
0541 full = input['ExpandConfig']['ExpandedFullCFG']
0542 jobdir = input['CreateJobDirectory']['JobDir']
0543
0544 sampleDir = os.path.join(out['Directory'],self.options.tier)
0545 sampleDir = castortools.castorToLFN(sampleDir)
0546
0547 cmd = ['cmsBatch.py',str(self.options.nInput),os.path.basename(full),'-o','%s_Jobs' % self.options.tier,'--force']
0548 cmd.extend(['-r',sampleDir])
0549 if self.options.run_batch:
0550 jname = "%s/%s" % (self.dataset,self.options.tier)
0551 jname = jname.replace("//","/")
0552 user_group = ''
0553 if self.options.group is not None:
0554 user_group = '-G %s' % self.options.group
0555 cmd.extend(['-b',"'bsub -q %s -J %s -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id.txt'" % (self.options.queue,jname,user_group)])
0556 print(" ".join(cmd))
0557
0558 pwd = os.getcwd()
0559
0560 error = None
0561 try:
0562 os.chdir(jobdir)
0563 returncode = os.system(" ".join(cmd))
0564
0565 if returncode != 0:
0566 error = "Running cmsBatch failed. Return code was %i." % returncode
0567 finally:
0568 os.chdir(pwd)
0569
0570 if error is not None:
0571 raise Exception(error)
0572
0573 return {'SampleDataset':"%s/%s" % (self.dataset,self.options.tier),'BatchUser':self.options.batch_user,
0574 'SampleOutputDir':sampleDir,'LSFJobsTopDir':os.path.join(jobdir,'%s_Jobs' % self.options.tier)}
0575
0576 class MonitorJobs(Task):
0577 """Monitor LSF jobs created with cmsBatch.py. Blocks until all jobs are finished."""
0578 def __init__(self, dataset, user, options):
0579 Task.__init__(self,'MonitorJobs', dataset, user, options)
0580
0581 def getjobid(self, job_dir):
0582 """Parse the LSF output to find the job id"""
0583 input = os.path.join(job_dir,'job_id.txt')
0584 result = None
0585 if os.path.exists(input):
0586 contents = file(input).read()
0587 for c in contents.split('\n'):
0588 if c and re.match('^Job <\\d*> is submitted to queue <.*>',c) is not None:
0589 try:
0590 result = c.split('<')[1].split('>')[0]
0591 except Exception as e:
0592 print('Job ID parsing error',str(e),c, file=sys.stderr)
0593 return result
0594
0595 def monitor(self, jobs, previous):
0596
0597
0598 cmd = ['bjobs','-u',self.options.batch_user]
0599 cmd.extend([v for v in jobs.values() if v is not None])
0600 child = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0601 stdout, stderr = child.communicate()
0602
0603 def parseHeader(header):
0604 """Parse the header from bjobs"""
0605 tokens = [t for t in header.split(' ') if t]
0606 result = {}
0607 for i in range(len(tokens)):
0608 result[tokens[i]] = i
0609
0610 return result
0611
0612 result = {}
0613 if stdout:
0614 lines = stdout.split('\n')
0615 if lines:
0616 header = parseHeader(lines[0])
0617 if not 'STAT' in header or not 'JOBID' in header:
0618 print('Problem parsing bjobs header\n',lines, file=sys.stderr)
0619 return result
0620 for line in lines[1:]:
0621
0622 tokens = [t for t in line.split(' ') if t]
0623 if len(tokens) < len(header): continue
0624 id = tokens[header['JOBID']]
0625 user = tokens[header['USER']]
0626 status = tokens[header['STAT']]
0627
0628 result[id] = status
0629
0630 if stderr:
0631 lines = stderr.split('\n')
0632 if lines:
0633 for line in lines:
0634 if line and re.match('^Job <\\d*> is not found',line) is not None:
0635 try:
0636 id = line.split('<')[1].split('>')[0]
0637 if id not in result and id not in previous:
0638 result[id] = 'FORGOTTEN'
0639 except Exception as e:
0640 print('Job ID parsing error in STDERR',str(e),line, file=sys.stderr)
0641
0642
0643 if result:
0644 for id in jobs.values():
0645 if id not in result and id in previous:
0646 result[id] = previous[id]
0647 return result
0648
0649 def run(self, input):
0650
0651
0652 jobsdir = input['RunCMSBatch']['LSFJobsTopDir']
0653 if not os.path.exists(jobsdir):
0654 raise Exception("LSF jobs dir does not exist: '%s'" % jobsdir)
0655
0656 subjobs = [s for s in glob.glob("%s/Job_[0-9]*" % jobsdir) if os.path.isdir(s)]
0657 jobs = {}
0658 for s in subjobs:
0659 jobs[s] = self.getjobid(s)
0660
0661 def checkStatus(stat):
0662
0663
0664 actions = {'FilesToCompress':{'Files':[]}}
0665
0666 result = {}
0667 for j, id in jobs.items():
0668 if id is None:
0669 result[j] = 'UNKNOWN'
0670 else:
0671 if id in stat:
0672 result[j] = stat[id]
0673 if result[j] in ['DONE','EXIT','FORGOTTEN']:
0674 stdout = os.path.join(j,'LSFJOB_%s' % id,'STDOUT')
0675 if os.path.exists(stdout):
0676
0677 actions['FilesToCompress']['Files'].append(stdout)
0678 result[j] = '%s.gz' % stdout
0679 elif os.path.exists('%s.gz' % stdout):
0680 result[j] = '%s.gz' % stdout
0681 else:
0682 result[j] = 'NOSTDOUT'
0683
0684
0685 stderr = os.path.join(j,'LSFJOB_%s' % id,'STDERR')
0686 if os.path.exists(stderr):
0687
0688 actions['FilesToCompress']['Files'].append(stderr)
0689
0690 compress = GZipFiles(self.dataset,self.user,self.options)
0691 compress.run(actions)
0692 return result
0693
0694 def countJobs(stat):
0695 """Count jobs that are monitorable - i.e. not in a final state"""
0696 result = []
0697 for j, id in jobs.items():
0698 if id is not None and id in stat:
0699 st = stat[id]
0700 if st in ['PEND','PSUSP','RUN','USUSP','SSUSP','WAIT']:
0701 result.append(id)
0702 return result
0703
0704 def writeKillScript(mon):
0705 """Write a shell script to kill the jobs we know about"""
0706 kill = os.path.join(jobsdir,'kill_jobs.sh')
0707 output = file(kill,'w')
0708 script = """
0709 #!/usr/bin/env bash
0710 echo "Killing jobs"
0711 bkill -u %s %s
0712 """ % (self.options.batch_user," ".join(mon))
0713 output.write(script)
0714 output.close()
0715 return mon
0716
0717
0718 status = self.monitor(jobs,{})
0719 monitorable = writeKillScript(countJobs(status))
0720 count = 0
0721
0722 while monitorable:
0723 job_status = checkStatus(status)
0724 time.sleep(60)
0725 status = self.monitor(jobs,status)
0726 monitorable = writeKillScript(countJobs(status))
0727 if not (count % 3):
0728 print('%s: Monitoring %i jobs (%s)' % (self.name,len(monitorable),self.dataset))
0729 count += 1
0730
0731 return {'LSFJobStatus':checkStatus(status),'LSFJobIDs':jobs}
0732
0733 class CheckJobStatus(Task):
0734 """Checks the job STDOUT to catch common problems like exceptions, CPU time exceeded. Sets the job status in the report accordingly."""
0735 def __init__(self, dataset, user, options):
0736 Task.__init__(self,'CheckJobStatus', dataset, user, options)
0737 def addOption(self, parser):
0738 parser.add_option("--output_wildcard", dest="output_wildcard", help="The wildcard to use when testing the output of this production (defaults to same as -w)", default=None)
0739 def run(self, input):
0740
0741 job_status = input['MonitorJobs']['LSFJobStatus']
0742
0743 result = {}
0744 for j, status in job_status.items():
0745 valid = True
0746 if os.path.exists(status):
0747
0748 fileHandle = None
0749 if status.endswith('.gz') or status.endswith('.GZ'):
0750 fileHandle = gzip.GzipFile(status)
0751 else:
0752 fileHandle = file(status)
0753
0754 open_count = 0
0755 close_count = 0
0756 for line in fileHandle:
0757
0758
0759 if 'pened file' in line:
0760 open_count += 1
0761 if 'losed file' in line:
0762 close_count += 1
0763
0764 if 'Exception' in line:
0765 result[j] = 'Exception'
0766 valid = False
0767 break
0768 elif 'CPU time limit exceeded' in line:
0769 result[j] = 'CPUTimeExceeded'
0770 valid = False
0771 break
0772 elif 'Killed' in line:
0773 result[j] = 'JobKilled'
0774 valid = False
0775 break
0776 elif 'A fatal system signal has occurred' in line:
0777 result[j] = 'SegFault'
0778 valid = False
0779 break
0780
0781 if valid and open_count != close_count:
0782 result[j] = 'FileOpenCloseMismatch'
0783 valid = False
0784 if valid:
0785 result[j] = 'VALID'
0786 else:
0787 result[j] = status
0788
0789
0790 options = copy.deepcopy(self.options)
0791 if self.options.output_wildcard is not None:
0792 options.wildcard = self.options.output_wildcard
0793
0794 mask = GenerateMask(input['RunCMSBatch']['SampleDataset'],self.options.batch_user,options)
0795 report = mask.run({'CheckForMask':{'MaskPresent':False}})
0796 report['LSFJobStatusCheck'] = result
0797 return report
0798
0799 class WriteJobReport(Task):
0800 """Write a summary report on each job"""
0801 def __init__(self, dataset, user, options):
0802 Task.__init__(self,'WriteJobReport', dataset, user, options)
0803 def run(self, input):
0804
0805 report = input['CheckJobStatus']
0806
0807
0808 states = {}
0809 for j, status in report['LSFJobStatusCheck'].items():
0810 if status not in states:
0811 states[status] = []
0812 states[status].append(j)
0813 jobdir = input['CreateJobDirectory']['PWD']
0814 if not os.path.exists(jobdir):
0815 raise Exception("Top level job directory not found: '%s'" % jobdir)
0816 report_file = os.path.join(input['CreateJobDirectory']['JobDir'],'resubmit.sh')
0817
0818 output = file(report_file,'w')
0819 output.write('#!/usr/bin/env bash\n')
0820
0821 if report['MaskPresent']:
0822 mask = report['Report']
0823 output.write('#PrimaryDatasetFraction: %f\n' % mask['PrimaryDatasetFraction'])
0824 output.write('#FilesGood: %i\n' % mask['FilesGood'])
0825 output.write('#FilesBad: %i\n' % mask['FilesBad'])
0826
0827 user_group = ''
0828 if self.options.group is not None:
0829 user_group = '-G %s' % self.options.group
0830
0831 for status, jobs in states.items():
0832 output.write('# %d jobs found in state %s\n' % (len(jobs),status) )
0833 if status == 'VALID':
0834 continue
0835 for j in jobs:
0836 jdir = os.path.join(jobdir,j)
0837 output.write('pushd %s; bsub -q %s -J RESUB -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id_resub.txt; popd\n' % (jdir,self.options.queue,user_group))
0838 output.close()
0839
0840 return {'SummaryFile':report_file}
0841
0842 class CleanJobFiles(Task):
0843 """Removes and compresses auto-generated files from the job directory to save space."""
0844 def __init__(self, dataset, user, options):
0845 Task.__init__(self,'CleanJobFiles', dataset, user, options)
0846 def run(self, input):
0847
0848 jobdir = input['CreateJobDirectory']['JobDir']
0849 jobs = input['MonitorJobs']['LSFJobIDs']
0850 job_status = input['MonitorJobs']['LSFJobStatus']
0851
0852 actions = {'FilesToCompress':{'Files':[]},'FilesToClean':{'Files':[]}}
0853
0854 actions['FilesToClean']['Files'].append(input['ExpandConfig']['ExpandedFullCFG'])
0855 if 'RunTestEvents' in input:
0856 actions['FilesToClean']['Files'].append(input['RunTestEvents']['TestCFG'])
0857
0858 for rt in glob.iglob('%s/*.root' % jobdir):
0859 actions['FilesToClean']['Files'].append(rt)
0860 for pyc in glob.iglob('%s/*.pyc' % jobdir):
0861 actions['FilesToClean']['Files'].append(pyc)
0862
0863 for j in jobs:
0864 status = job_status[j]
0865 if os.path.exists(status) and not status.endswith('.gz'):
0866 actions['FilesToCompress']['Files'].append(status)
0867
0868 compress = GZipFiles(self.dataset,self.user,self.options)
0869 compressed = compress.run(actions)
0870
0871 clean = CleanFiles(self.dataset,self.user,self.options)
0872 removed = clean.run(actions)
0873 return {'Cleaned':removed,'Compressed':compressed}
0874