Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:23:30

0001 from __future__ import print_function
0002 from __future__ import absolute_import
0003 
0004 from builtins import range
0005 import copy, datetime, inspect, fnmatch, os, re, subprocess, sys, tempfile, time
0006 import glob
0007 import gzip
0008 import errno
0009 from .edmIntegrityCheck import PublishToFileSystem, IntegrityCheck
0010 from .addToDatasets import addToDatasets
0011 
0012 from . import eostools as castortools
0013 from . import das as Das
0014 
0015 from .dataset import Dataset
0016 from .datasetToSource import createDataset
0017 from .castorBaseDir import castorBaseDir
0018 
0019 def mkdir_p(path):
0020     try:
0021         os.makedirs(path)
0022     except OSError as exc: # Python >2.5
0023         if exc.errno == errno.EEXIST:
0024             pass
0025         else: raise
0026 
0027 class Task(object):
0028     """Base class for Task API"""
0029     def __init__(self, name, dataset, user, options, instance = None):
0030         self.name = name
0031         self.instance = instance
0032         self.dataset = dataset
0033         self.user = user
0034         self.options = options
0035     def getname(self):
0036         """The name of the object, using the instance if needed"""
0037         if self.instance is not None:
0038             return '%s_%s' % (self.name,self.instance)
0039         else:
0040             return self.name
0041     def addOption(self, parser):
0042         """A hook for adding things to the parser"""
0043         pass
0044     def run(self, input):
0045         """Basic API for a task. input and output are dictionaries"""
0046         return {}
0047     
0048 class ParseOptions(Task):
0049     """Common options for the script __main__: used by all production tasks"""
0050     def __init__(self, dataset, user, options):
0051         Task.__init__(self,'ParseOptions', dataset, user, options)
0052 
0053         usage = """%prog [options] <dataset>
0054         
0055 The %prog script aims to take a list of samples and process them on the batch system. Submission
0056 may be done serially (by setting --max_threads to 1), or in parallel (the default).
0057 
0058 The basic flow is:
0059 
0060     1) Check that the sample to run on exists
0061     2) Generate a source CFG
0062     3) Run locally and check everything works with a small number of events
0063     4) Submit to the batch system 
0064     5) Wait until the jobs are finished
0065     6) Check the jobs ran OK and that the files are good
0066 
0067 Example:
0068 
0069 ProductionTasks.py -u cbern -w 'PFAOD*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py /QCD_Pt-1800_TuneZ2_7TeV_pythia6/Summer11-PU_S3_START42_V11-v2/AODSIM/V2
0070 
0071 It is often useful to store the sample names in a file, in which case you could instead do:
0072 
0073 ProductionTasks.py -w '*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py `cat samples_mc.txt`
0074 
0075 An example file might contain:
0076 
0077 palencia%/Tbar_TuneZ2_tW-channel-DR_7TeV-powheg-tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0078 benitezj%/ZZ_TuneZ2_7TeV_pythia6_tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0079 wreece%/ZJetsToNuNu_100_HT_200_7TeV-madgraph/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0080 
0081 The CASTOR username for each sample is given before the '%'.
0082 
0083 Each step in the flow has a task associated with it, which may set options. The options for each task are 
0084 documented below.
0085 
0086 """
0087         self.das = Das.DASOptionParser(usage=usage)
0088     def addOption(self, parser):
0089         parser.add_option("-u", "--user", dest="user", default=os.getlogin(),help='The username to use when looking at mass storage devices. Your login username is used by default.')
0090         parser.add_option("-w", "--wildcard", dest="wildcard", default='*.root',help='A UNIX style wildcard to specify which input files to check before submitting the jobs')    
0091         parser.add_option("--max_threads", dest="max_threads", default=None,help='The maximum number of threads to use in the production')
0092     def run(self, input):
0093         self.options, self.dataset = self.das.get_opt()
0094         self.dataset = [d for d in self.dataset if not d.startswith('#')]
0095         self.user = self.options.user
0096         if not self.dataset:
0097             raise Exception('TaskError: No dataset specified')
0098         return {'Options':self.options, 'Dataset':self.dataset}    
0099 
0100 class CheckDatasetExists(Task):
0101     """Use 'datasets.py' to check that the dataset exists in the production system.
0102     """
0103     def __init__(self, dataset, user, options):
0104         Task.__init__(self,'CheckDatasetExists', dataset, user, options)
0105     def run(self, input):
0106         pattern = fnmatch.translate(self.options.wildcard)
0107         run_range = (self.options.min_run, self.options.max_run)
0108         data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
0109         if( len(data.listOfGoodFiles()) == 0 ):
0110             raise Exception('no good root file in dataset %s | %s | %s | %s' % (self.user,
0111                                                                             self.dataset,
0112                                                                             self.options.wildcard,
0113                                                                             run_range))        
0114         return {'Dataset':self.dataset}
0115 
0116 class BaseDataset(Task):
0117     """Query DAS to find dataset name in DBS - see https://cmsweb.cern.ch/das/"""
0118     def __init__(self, dataset, user, options):
0119         Task.__init__(self,'BaseDataset', dataset, user, options)
0120     def addOption(self, parser):
0121         parser.add_option("-n", "--name", dest="name", default=None,help='The name of the dataset in DAS. Will be guessed if not specified')
0122     def query(self, dataset):
0123         """Query DAS to find out how many events are in the dataset"""
0124 
0125         host    = self.options.host
0126         debug   = self.options.verbose
0127         idx     = self.options.idx
0128         limit   = self.options.limit
0129         
0130         def check(ds):
0131             query = 'dataset=%s' % ds
0132             result = Das.get_data(host, query, idx, limit, debug)
0133             result = result.replace('null','None')
0134             result = result.replace('true','True')
0135             result = result.replace('false','False')
0136             data = eval(result)
0137             if data['status'] != 'ok':
0138                 raise Exception("Das query failed: Output is '%s'" % data)
0139             return (data['data'],data)
0140 
0141         data = None
0142         exists = False
0143         
0144         if self.options.name is None:
0145             #guess the dataset name in DBS
0146             tokens = [t for t in dataset.split(os.sep) if t]
0147             if len(tokens) >= 3:
0148                 #DBS names always have three entries
0149                 ds = os.sep + os.sep.join(tokens[0:3])
0150                 if ds:
0151                     exists, data = check(ds)
0152                     self.options.name = ds
0153         else:
0154             exists, data = check(self.options.name)
0155             if not exists:
0156                 raise Exception("Specified dataset '%s' not found in Das. Please check." % self.options.name)
0157         
0158         if data is None:
0159             raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
0160         return data
0161     
0162     def run(self, input):
0163         output = {}
0164         if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
0165             output = self.query(self.dataset)
0166         return {'Name':self.options.name,'Das':output}
0167 
0168 class GZipFiles(Task):
0169     """GZip a list of files"""
0170     def __init__(self, dataset, user, options):
0171         Task.__init__(self,'GZipFiles', dataset, user, options)
0172     def gzip(self, fileName):
0173         output = '%s.gz' % fileName
0174         
0175         f_in = open(fileName, 'rb')
0176         f_out = gzip.open(output, 'wb')
0177         f_out.writelines(f_in)
0178         f_out.close()
0179         f_in.close()
0180         #remove the original file once we've gzipped it
0181         os.remove(fileName)
0182         return output
0183            
0184     def run(self, input):
0185         files = input['FilesToCompress']['Files']
0186         
0187         compressed = []
0188         for f in files:
0189             if f is None or not f: continue
0190             if os.path.exists(f):
0191                 gz = self.gzip(f)
0192                 compressed.append(gz)
0193         return {'CompressedFiles':compressed}
0194     
0195 class CleanFiles(Task):
0196     """Remove a list of files"""
0197     def __init__(self, dataset, user, options):
0198         Task.__init__(self,'CleanFiles', dataset, user, options)
0199     def run(self, input):
0200         files = input['FilesToClean']['Files']
0201         removed = []
0202         for f in files:
0203             if f is None or not f: continue
0204             if os.path.exists(f): os.remove(f)
0205             removed.append(f)
0206         return {'CleanedFiles':removed}
0207 
0208 class FindOnCastor(Task):
0209     """Checks that the sample specified exists in the CASTOR area of the user specified. The directory must exist."""
0210     def __init__(self, dataset, user, options):
0211         Task.__init__(self,'FindOnCastor', dataset, user, options)
0212     def run(self, input):
0213         if self.user == 'CMS':
0214             return {'Topdir':None,'Directory':None}
0215         topdir = castortools.lfnToCastor(castorBaseDir(user=self.user))
0216         directory = '%s/%s' % (topdir,self.dataset)
0217         # directory = directory.replace('//','/')
0218         if not castortools.fileExists(directory):
0219             if hasattr(self,'create') and self.create:
0220                 castortools.createCastorDir(directory)
0221                 #castortools.chmod(directory,'775')
0222         if not castortools.isDirectory(directory): 
0223             raise Exception("Dataset directory '%s' does not exist or could not be created" % directory)
0224         return {'Topdir':topdir,'Directory':directory}  
0225 
0226 class CheckForMask(Task):
0227     """Tests if a file mask, created by edmIntegrityCheck.py, is present already and reads it if so."""
0228     def __init__(self, dataset, user, options):
0229         Task.__init__(self,'CheckForMask', dataset, user, options)
0230     def addOption(self, parser):
0231         parser.add_option("-c", "--check", dest="check", default=False, action='store_true',help='Check filemask if available')        
0232     def run(self, input):
0233         #skip for DBS
0234         if self.user == 'CMS':
0235             return {'MaskPresent':True,'Report':'Files taken from DBS'}
0236         
0237         dir = input['FindOnCastor']['Directory']
0238         mask = "IntegrityCheck"
0239         file_mask = []  
0240 
0241         report = None
0242         if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
0243             file_mask = castortools.matchingFiles(dir, '^%s_.*\.txt$' % mask)
0244 
0245             if file_mask:
0246                 p = PublishToFileSystem(mask)
0247                 report = p.get(dir)
0248         return {'MaskPresent':report is not None,'Report':report}
0249     
0250 class CheckForWrite(Task):
0251     """Checks whether you have write access to the CASTOR directory specified"""
0252     def __init__(self, dataset, user, options):
0253         Task.__init__(self,'CheckForWrite', dataset, user, options)
0254     def run(self, input):
0255         """Check that the directory is writable"""
0256         if self.user == 'CMS':
0257             return {'Directory':None,'WriteAccess':True}
0258         dir = input['FindOnCastor']['Directory']
0259         if self.options.check:
0260             
0261             _, name = tempfile.mkstemp('.txt',text=True)
0262             testFile = file(name,'w')
0263             testFile.write('Test file')
0264             testFile.close()
0265 
0266             store = castortools.castorToLFN(dir) 
0267             #this is bad, but castortools is giving me problems
0268             if not os.system('cmsStage %s %s' % (name,store)):
0269                 fname = '%s/%s' % (dir,os.path.basename(name))
0270                 write = castortools.fileExists(fname)
0271                 if write:
0272                     castortools.rm(fname)
0273                 else:
0274                     raise Exception("Failed to write to directory '%s'" % dir)
0275             os.remove(name)
0276         return {'Directory':dir,'WriteAccess':True}
0277     
0278 class GenerateMask(Task):
0279     """Uses edmIntegrityCheck.py to generate a file mask for the sample if one is not already present."""
0280     def __init__(self, dataset, user, options):
0281         Task.__init__(self,'GenerateMask', dataset, user, options)
0282     def addOption(self, parser):
0283         parser.add_option("-r", "--recursive", dest="resursive", default=False, action='store_true',help='Walk the mass storage device recursively')
0284         parser.add_option("-p", "--printout", dest="printout", default=False, action='store_true',help='Print a report to stdout')        
0285     def run(self, input):
0286         
0287         report = None
0288         if self.options.check and not input['CheckForMask']['MaskPresent']:
0289             
0290             options = copy.deepcopy(self.options)
0291             options.user = self.user
0292 
0293             if 'BaseDataset' in input:
0294                 options.name = input['BaseDataset']['Name']
0295             else:
0296                 options.name = None
0297             
0298             check = IntegrityCheck(self.dataset,options)
0299             check.test()
0300             report = check.structured()
0301             pub = PublishToFileSystem(check)
0302             pub.publish(report)
0303         elif input['CheckForMask']['MaskPresent']:
0304             report = input['CheckForMask']['Report']
0305         
0306         return {'MaskPresent':report is not None,'Report':report}
0307 
0308 class CreateJobDirectory(Task):
0309     """Generates a job directory on your local drive"""
0310     def __init__(self, dataset, user, options):
0311         Task.__init__(self,'CreateJobDirectory', dataset, user, options)
0312     def addOption(self, parser):
0313         parser.add_option("-o","--output", dest="output", default=None,help='The directory to use locally for job files')           
0314     def run(self, input):
0315         if self.options.output is not None:
0316             output = self.options.output
0317         else:
0318             # output = '%s_%s' % (self.dataset.replace('/','.'),datetime.datetime.now().strftime("%s"))
0319             # if output.startswith('.'):
0320             output = '%s_%s' % (self.dataset,datetime.datetime.now().strftime("%s"))
0321             output = output.lstrip('/')
0322         if not os.path.exists(output):
0323             mkdir_p(output)
0324         return {'JobDir':output,'PWD':os.getcwd()}
0325 
0326 class SourceCFG(Task):
0327     """Generate a source CFG using 'sourceFileList.py' by listing the CASTOR directory specified. Applies the file wildcard, '--wildcard'"""
0328     def __init__(self, dataset, user, options):
0329         Task.__init__(self,'SourceCFG', dataset, user, options)    
0330     def addOption(self, parser):
0331         parser.add_option("--min-run", dest="min_run", default=-1, type=int, help='When querying DBS, require runs >= than this run')
0332         parser.add_option("--max-run", dest="max_run", default=-1, type=int, help='When querying DBS, require runs <= than this run')
0333         parser.add_option("--input-prescale", dest="prescale", default=1, type=int, help='Randomly prescale the number of good files by this factor.')
0334     def run(self, input):
0335 
0336         jobdir = input['CreateJobDirectory']['JobDir']
0337         pattern = fnmatch.translate(self.options.wildcard)
0338         
0339         run_range = (self.options.min_run, self.options.max_run)
0340         data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
0341         good_files = data.listOfGoodFilesWithPrescale(self.options.prescale)
0342         #will mark prescale removed files as bad in comments
0343         bad_files = [fname for fname in data.listOfFiles() if not fname in good_files]
0344         
0345         source = os.path.join(jobdir,'source_cfg.py')
0346         output = file(source,'w')
0347         output.write('###SourceCFG:\t%d GoodFiles; %d BadFiles found in mask; Input prescale factor %d\n' % (len(good_files),len(bad_files),self.options.prescale) )
0348         output.write('files = ' + str(good_files) + '\n')
0349         for bad_file in bad_files:
0350             output.write("###SourceCFG:\tBadInMask '%s'\n" % bad_file)
0351         output.close()
0352         return {'SourceCFG':source}
0353 
0354 
0355 def insertLines( insertedTo, toInsert ):
0356     '''insert a sequence in another sequence.
0357 
0358     the sequence is inserted either at the end, or at the position
0359     of the HOOK, if it is found.
0360     The HOOK is considered as being found if
0361       str(elem).find(###ProductionTaskHook$$$)
0362     is true for one of the elements in the insertedTo sequence. 
0363     '''
0364     HOOK = '###ProductionTaskHook$$$'
0365     hookIndex = None
0366     for index, line in enumerate(insertedTo):
0367         line = str(line)
0368         if line.find(HOOK)>-1:
0369             hookIndex = index
0370             break        
0371     if hookIndex is not None:
0372         before = insertedTo[:hookIndex]
0373         after = insertedTo[hookIndex:]
0374         result = before + toInsert + after
0375         return result
0376     else:
0377         insertedTo.extend( toInsert )
0378         return insertedTo
0379 
0380     
0381 class FullCFG(Task):
0382     """Generate the full CFG needed to run the job and writes it to the job directory"""
0383     def __init__(self, dataset, user, options):
0384         Task.__init__(self,'FullCFG', dataset, user, options)
0385     def addOption(self, parser):
0386         parser.add_option("--cfg", dest="cfg", default=None, help='The top level CFG to run')           
0387         parser.add_option("--nEventsPerJob", dest="nEventsPerJob", default=None, help='Number of events per job (for testing)')           
0388     def run(self, input):
0389         
0390         jobdir = input['CreateJobDirectory']['JobDir']
0391 
0392         if self.options.cfg is None or not os.path.exists(self.options.cfg):
0393             raise Exception("The file '%s' does not exist. Please check." % self.options.cfg)
0394         
0395         config = file(self.options.cfg).readlines()
0396         sourceFile = os.path.basename(input['SourceCFG']['SourceCFG'])
0397         if sourceFile.lower().endswith('.py'):
0398             sourceFile = sourceFile[:-3]
0399         
0400         source = os.path.join(jobdir,'full_cfg.py')
0401         output = file(source,'w')
0402 
0403         nEventsPerJob = -1
0404         if self.options.nEventsPerJob:
0405             nEventsPerJob = int(self.options.nEventsPerJob)
0406         
0407         toInsert = ['\nfrom %s import *\n' % sourceFile,
0408                     'process.source.fileNames = files\n',
0409                     'if hasattr(process,"maxEvents"): process.maxEvents.input = cms.untracked.int32({nEvents})\n'.format(nEvents=nEventsPerJob),
0410                     'if hasattr(process,"maxLuminosityBlocks"): process.maxLuminosityBlocks.input = cms.untracked.int32(-1)\n'
0411                     'datasetInfo = ("%s","%s","%s")\n' % (self.user, self.dataset, fnmatch.translate(self.options.wildcard) )
0412                     ]
0413         config = insertLines( config, toInsert )
0414         output.writelines(config)
0415         output.close()
0416         return {'FullCFG':source}
0417 
0418 class CheckConfig(Task):
0419     """Check the basic syntax of a CFG file by running python on it."""    
0420     def __init__(self, dataset, user, options):
0421         Task.__init__(self,'CheckConfig', dataset, user, options)
0422     def run(self, input):
0423         
0424         full = input['FullCFG']['FullCFG']
0425         
0426         child = subprocess.Popen(['python',full], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0427         stdout, stderr = child.communicate()
0428         if child.returncode != 0:
0429             raise Exception("Syntax check of cfg failed. Error was '%s'. (%i)" % (stderr,child.returncode))
0430         return {'Status':'VALID'}   
0431 
0432 class RunTestEvents(Task):
0433     """Run cmsRun but with a small number of events on the job CFG."""    
0434 
0435     def __init__(self, dataset, user, options):
0436         Task.__init__(self,'RunTestEvents', dataset, user, options)
0437     def run(self, input):
0438         
0439         full = input['FullCFG']['FullCFG']
0440         jobdir = input['CreateJobDirectory']['JobDir']
0441         
0442         config = file(full).readlines()
0443         source = os.path.join(jobdir,'test_cfg.py')
0444         output = file(source,'w')
0445         toInsert = ['\n',
0446                     'process.maxEvents.input = cms.untracked.int32(5)\n',
0447                     'if hasattr(process,"source"): process.source.fileNames = process.source.fileNames[:10]\n'
0448                     ]
0449         config = insertLines( config, toInsert )
0450         output.writelines(config)
0451         output.close()
0452                 
0453         pwd = os.getcwd()
0454         
0455         error = None
0456         try:
0457             os.chdir(jobdir)
0458             
0459             child = subprocess.Popen(['cmsRun',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0460             stdout, stderr = child.communicate()
0461             
0462             if child.returncode != 0:
0463                 error = "Failed to cmsRun with a few events. Error was '%s' (%i)." % (stderr,child.returncode)
0464         finally:
0465             os.chdir(pwd)
0466             
0467         if error is not None:
0468             raise Exception(error)
0469 
0470         return {'Status':'VALID','TestCFG':source}
0471     
0472 class ExpandConfig(Task):
0473     """Runs edmConfigDump to produce an expanded cfg file"""    
0474 
0475     def __init__(self, dataset, user, options):
0476         Task.__init__(self,'ExpandConfig', dataset, user, options)
0477     def run(self, input):
0478         
0479         full = input['FullCFG']['FullCFG']
0480         jobdir = input['CreateJobDirectory']['JobDir']
0481 
0482         config = file(full).read()
0483         source = os.path.join(jobdir,'test_cfg.py')
0484         expanded = 'Expanded%s' % os.path.basename(full)
0485         output = file(source,'w')
0486         output.write(config)
0487         output.write("file('%s','w').write(process.dumpPython())\n" % expanded)
0488         output.close()
0489 
0490         pwd = os.getcwd()
0491         
0492         result = {}
0493         error = None
0494         try:
0495             os.chdir(jobdir)
0496             
0497             child = subprocess.Popen(['python',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0498             stdout, stderr = child.communicate()
0499             
0500             if child.returncode != 0:
0501                 error = "Failed to edmConfigDump. Error was '%s' (%i)." % (stderr,child.returncode)
0502             result['ExpandedFullCFG'] = os.path.join(jobdir,expanded)
0503             
0504         finally:
0505             os.chdir(pwd)
0506             
0507         if error is not None:
0508             raise Exception(error)
0509 
0510         return result
0511     
0512 class WriteToDatasets(Task):
0513     """Publish the sample to 'Datasets.txt' if required"""    
0514     def __init__(self, dataset, user, options):
0515         Task.__init__(self,'WriteToDatasets', dataset, user, options)
0516     def run(self, input):
0517         name = "%s/%s" % (self.dataset,self.options.tier)
0518         name = name.replace('//','/')
0519         user = self.options.batch_user
0520         added = addToDatasets(name, user = user)
0521         return {'Added':added, 'Name':name, 'User':user}       
0522     
0523 class RunCMSBatch(Task):
0524     """Run the 'cmsBatch.py' command on your CFG, submitting to the CERN batch system"""    
0525 
0526     def __init__(self, dataset, user, options):
0527         Task.__init__(self,'RunCMSBatch', dataset, user, options)
0528     def addOption(self, parser):
0529         parser.add_option("--batch_user", dest="batch_user", help="The user for LSF", default=os.getlogin())
0530         parser.add_option("--run_batch", dest="run_batch", default=True, action='store_true',help='Run on the batch system')
0531         parser.add_option("-N", "--numberOfInputFiles", dest="nInput",help="Number of input files per job",default=5,type=int)
0532         parser.add_option("-q", "--queue", dest="queue", help="The LSF queue to use", default="1nh")        
0533         parser.add_option("-t", "--tier", dest="tier",
0534                           help="Tier: extension you can give to specify you are doing a new production. If you give a Tier, your new files will appear in sampleName/tierName, which will constitute a new dataset.",
0535                           default="")
0536         parser.add_option("-G", "--group", dest="group", help="The LSF user group to use, e.g. 'u_zh'", default=None)        
0537         
0538     def run(self, input):
0539         find = FindOnCastor(self.dataset,self.options.batch_user,self.options)
0540         find.create = True
0541         out = find.run({})
0542         
0543         full = input['ExpandConfig']['ExpandedFullCFG']
0544         jobdir = input['CreateJobDirectory']['JobDir']
0545         
0546         sampleDir = os.path.join(out['Directory'],self.options.tier)
0547         sampleDir = castortools.castorToLFN(sampleDir)
0548         
0549         cmd = ['cmsBatch.py',str(self.options.nInput),os.path.basename(full),'-o','%s_Jobs' % self.options.tier,'--force']
0550         cmd.extend(['-r',sampleDir])
0551         if self.options.run_batch:
0552             jname = "%s/%s" % (self.dataset,self.options.tier)
0553             jname = jname.replace("//","/")
0554             user_group = ''
0555             if self.options.group is not None:
0556                 user_group = '-G %s' % self.options.group
0557             cmd.extend(['-b',"'bsub -q %s -J %s -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id.txt'" % (self.options.queue,jname,user_group)])
0558         print(" ".join(cmd))
0559         
0560         pwd = os.getcwd()
0561         
0562         error = None
0563         try:
0564             os.chdir(jobdir)
0565             returncode = os.system(" ".join(cmd))
0566 
0567             if returncode != 0:
0568                 error = "Running cmsBatch failed. Return code was %i." % returncode
0569         finally:
0570             os.chdir(pwd)
0571             
0572         if error is not None:
0573             raise Exception(error)
0574 
0575         return {'SampleDataset':"%s/%s" % (self.dataset,self.options.tier),'BatchUser':self.options.batch_user,
0576                 'SampleOutputDir':sampleDir,'LSFJobsTopDir':os.path.join(jobdir,'%s_Jobs' % self.options.tier)}
0577 
0578 class MonitorJobs(Task):
0579     """Monitor LSF jobs created with cmsBatch.py. Blocks until all jobs are finished."""    
0580     def __init__(self, dataset, user, options):
0581         Task.__init__(self,'MonitorJobs', dataset, user, options)
0582     
0583     def getjobid(self, job_dir):
0584         """Parse the LSF output to find the job id"""
0585         input = os.path.join(job_dir,'job_id.txt')
0586         result = None
0587         if os.path.exists(input):
0588             contents = file(input).read()
0589             for c in contents.split('\n'):
0590                 if c and re.match('^Job <\\d*> is submitted to queue <.*>',c) is not None:
0591                     try:
0592                         result = c.split('<')[1].split('>')[0]
0593                     except Exception as e:
0594                         print('Job ID parsing error',str(e),c, file=sys.stderr)
0595         return result
0596     
0597     def monitor(self, jobs, previous):
0598 
0599         #executes bjobs with a list of job IDs
0600         cmd = ['bjobs','-u',self.options.batch_user]
0601         cmd.extend([v for v in jobs.values() if v is not None])#filter out unknown IDs
0602         child = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0603         stdout, stderr = child.communicate()
0604 
0605         def parseHeader(header):
0606             """Parse the header from bjobs"""
0607             tokens = [t for t in header.split(' ') if t]
0608             result = {}
0609             for i in range(len(tokens)):
0610                 result[tokens[i]] = i          
0611 
0612             return result
0613 
0614         result = {}
0615         if stdout:
0616             lines = stdout.split('\n')
0617             if lines:
0618                 header = parseHeader(lines[0])                                 
0619                 if not 'STAT' in header or not 'JOBID' in header:
0620                     print('Problem parsing bjobs header\n',lines, file=sys.stderr)
0621                     return result
0622                 for line in lines[1:]:
0623                     #TODO: Unreliable for some fields, e.g. dates
0624                     tokens = [t for t in line.split(' ') if t]
0625                     if len(tokens) < len(header): continue
0626                     id = tokens[header['JOBID']]
0627                     user = tokens[header['USER']]
0628                     status = tokens[header['STAT']]
0629 
0630                     result[id] = status               
0631                     
0632         if stderr:
0633             lines = stderr.split('\n')
0634             if lines:
0635                 for line in lines:
0636                     if line and re.match('^Job <\\d*> is not found',line) is not None:
0637                         try:
0638                             id = line.split('<')[1].split('>')[0]
0639                             if id not in result and id not in previous:
0640                                 result[id] = 'FORGOTTEN'
0641                         except Exception as e:
0642                             print('Job ID parsing error in STDERR',str(e),line, file=sys.stderr)
0643         
0644         #after one hour the status is no longer available     
0645         if result:
0646             for id in jobs.values():
0647                 if id not in result and id in previous:
0648                     result[id] = previous[id]
0649         return result
0650     
0651     def run(self, input):
0652 
0653         # return #COLIN
0654         jobsdir = input['RunCMSBatch']['LSFJobsTopDir']
0655         if not os.path.exists(jobsdir):
0656             raise Exception("LSF jobs dir does not exist: '%s'" % jobsdir)
0657         
0658         subjobs = [s for s in glob.glob("%s/Job_[0-9]*" % jobsdir) if os.path.isdir(s)]
0659         jobs = {}
0660         for s in subjobs:
0661             jobs[s] = self.getjobid(s)
0662 
0663         def checkStatus(stat):
0664             
0665             #gzip files on the fly
0666             actions = {'FilesToCompress':{'Files':[]}}
0667             
0668             result = {}
0669             for j, id in jobs.items():
0670                 if id is None:
0671                     result[j] = 'UNKNOWN'
0672                 else:
0673                     if id in stat:
0674                         result[j] = stat[id]
0675                         if result[j] in ['DONE','EXIT','FORGOTTEN']:
0676                             stdout = os.path.join(j,'LSFJOB_%s' % id,'STDOUT')
0677                             if os.path.exists(stdout):
0678                                 #compress this file
0679                                 actions['FilesToCompress']['Files'].append(stdout)
0680                                 result[j] = '%s.gz' % stdout
0681                             elif os.path.exists('%s.gz' % stdout):
0682                                 result[j] = '%s.gz' % stdout
0683                             else:
0684                                 result[j] = 'NOSTDOUT'
0685                         
0686                             #also compress the stderr, although this is mostly empty
0687                             stderr = os.path.join(j,'LSFJOB_%s' % id,'STDERR')
0688                             if os.path.exists(stderr):
0689                                 #compress this file
0690                                 actions['FilesToCompress']['Files'].append(stderr)
0691                                 
0692             compress = GZipFiles(self.dataset,self.user,self.options)
0693             compress.run(actions)
0694             return result
0695         
0696         def countJobs(stat):
0697             """Count jobs that are monitorable - i.e. not in a final state"""
0698             result = []
0699             for j, id in jobs.items():
0700                 if id is not None and id in stat:
0701                     st = stat[id]
0702                     if st in ['PEND','PSUSP','RUN','USUSP','SSUSP','WAIT']:
0703                         result.append(id)
0704             return result
0705         
0706         def writeKillScript(mon):
0707             """Write a shell script to kill the jobs we know about"""
0708             kill = os.path.join(jobsdir,'kill_jobs.sh')
0709             output = file(kill,'w')
0710             script = """
0711 #!/usr/bin/env bash
0712 echo "Killing jobs"
0713 bkill -u %s %s
0714             """ % (self.options.batch_user," ".join(mon))
0715             output.write(script)
0716             output.close()
0717             return mon
0718                     
0719         #continue monitoring while there are jobs to monitor
0720         status = self.monitor(jobs,{})
0721         monitorable = writeKillScript(countJobs(status))
0722         count = 0
0723         
0724         while monitorable:
0725             job_status = checkStatus(status)
0726             time.sleep(60)
0727             status = self.monitor(jobs,status)
0728             monitorable = writeKillScript(countJobs(status))
0729             if not (count % 3):
0730                 print('%s: Monitoring %i jobs (%s)' % (self.name,len(monitorable),self.dataset))
0731             count += 1
0732             
0733         return {'LSFJobStatus':checkStatus(status),'LSFJobIDs':jobs}  
0734     
0735 class CheckJobStatus(Task):
0736     """Checks the job STDOUT to catch common problems like exceptions, CPU time exceeded. Sets the job status in the report accordingly."""    
0737     def __init__(self, dataset, user, options):
0738         Task.__init__(self,'CheckJobStatus', dataset, user, options)
0739     def addOption(self, parser):
0740         parser.add_option("--output_wildcard", dest="output_wildcard", help="The wildcard to use when testing the output of this production (defaults to same as -w)", default=None)           
0741     def run(self, input):
0742         
0743         job_status = input['MonitorJobs']['LSFJobStatus']
0744 
0745         result = {}
0746         for j, status in job_status.items():
0747             valid = True
0748             if os.path.exists(status):
0749 
0750                 fileHandle = None
0751                 if status.endswith('.gz') or status.endswith('.GZ'):
0752                     fileHandle = gzip.GzipFile(status)
0753                 else:
0754                     fileHandle = file(status)
0755                 
0756                 open_count = 0
0757                 close_count = 0
0758                 for line in fileHandle:
0759                     #start by counting files opened and closed
0760                     #suggestion from Enrique
0761                     if 'pened file' in line:
0762                         open_count += 1
0763                     if 'losed file' in line:
0764                         close_count += 1
0765                     
0766                     if 'Exception' in line:
0767                         result[j] = 'Exception'
0768                         valid = False
0769                         break
0770                     elif 'CPU time limit exceeded' in line:
0771                         result[j] = 'CPUTimeExceeded'
0772                         valid = False
0773                         break
0774                     elif 'Killed' in line:
0775                         result[j] = 'JobKilled'
0776                         valid = False
0777                         break
0778                     elif 'A fatal system signal has occurred' in line:
0779                         result[j] = 'SegFault'
0780                         valid = False
0781                         break
0782                     
0783                 if valid and open_count != close_count:
0784                     result[j] = 'FileOpenCloseMismatch'
0785                     valid = False
0786                 if valid:
0787                     result[j] = 'VALID'
0788             else:
0789                 result[j] = status
0790 
0791         #allows a different wildcard in the final check.
0792         options = copy.deepcopy(self.options)
0793         if self.options.output_wildcard is not None:
0794             options.wildcard = self.options.output_wildcard
0795 
0796         mask = GenerateMask(input['RunCMSBatch']['SampleDataset'],self.options.batch_user,options)
0797         report = mask.run({'CheckForMask':{'MaskPresent':False}})
0798         report['LSFJobStatusCheck'] = result
0799         return report
0800     
0801 class WriteJobReport(Task):
0802     """Write a summary report on each job"""    
0803     def __init__(self, dataset, user, options):
0804         Task.__init__(self,'WriteJobReport', dataset, user, options)
0805     def run(self, input):
0806         
0807         report = input['CheckJobStatus']
0808         
0809         #collect a list of jobs by status
0810         states = {}
0811         for j, status in report['LSFJobStatusCheck'].items():
0812             if status not in states:
0813                 states[status] = []
0814             states[status].append(j)
0815         jobdir = input['CreateJobDirectory']['PWD']
0816         if not os.path.exists(jobdir):
0817             raise Exception("Top level job directory not found: '%s'" % jobdir)
0818         report_file = os.path.join(input['CreateJobDirectory']['JobDir'],'resubmit.sh')
0819 
0820         output = file(report_file,'w')
0821         output.write('#!/usr/bin/env bash\n')
0822         
0823         if report['MaskPresent']:
0824             mask = report['Report']
0825             output.write('#PrimaryDatasetFraction: %f\n' % mask['PrimaryDatasetFraction'])
0826             output.write('#FilesGood: %i\n' % mask['FilesGood'])
0827             output.write('#FilesBad: %i\n' % mask['FilesBad'])        
0828         
0829         user_group = ''
0830         if self.options.group is not None:
0831             user_group = '-G %s' % self.options.group
0832 
0833         for status, jobs in states.items():
0834             output.write('# %d jobs found in state %s\n' % (len(jobs),status) )
0835             if status == 'VALID':
0836                 continue
0837             for j in jobs:
0838                 jdir = os.path.join(jobdir,j)
0839                 output.write('pushd %s; bsub -q %s -J RESUB -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id_resub.txt; popd\n' % (jdir,self.options.queue,user_group))
0840         output.close()
0841             
0842         return {'SummaryFile':report_file}
0843     
0844 class CleanJobFiles(Task):
0845     """Removes and compresses auto-generated files from the job directory to save space."""    
0846     def __init__(self, dataset, user, options):
0847         Task.__init__(self,'CleanJobFiles', dataset, user, options)
0848     def run(self, input):
0849         
0850         jobdir = input['CreateJobDirectory']['JobDir']
0851         jobs = input['MonitorJobs']['LSFJobIDs']
0852         job_status = input['MonitorJobs']['LSFJobStatus']
0853         
0854         actions = {'FilesToCompress':{'Files':[]},'FilesToClean':{'Files':[]}}
0855         
0856         actions['FilesToClean']['Files'].append(input['ExpandConfig']['ExpandedFullCFG'])
0857         if 'RunTestEvents' in input:
0858             actions['FilesToClean']['Files'].append(input['RunTestEvents']['TestCFG'])
0859 
0860         for rt in glob.iglob('%s/*.root' % jobdir):
0861             actions['FilesToClean']['Files'].append(rt)
0862         for pyc in glob.iglob('%s/*.pyc' % jobdir):
0863             actions['FilesToClean']['Files'].append(pyc)
0864 
0865         for j in jobs:
0866             status = job_status[j]
0867             if os.path.exists(status) and not status.endswith('.gz'):
0868                 actions['FilesToCompress']['Files'].append(status)
0869                 
0870         compress = GZipFiles(self.dataset,self.user,self.options)
0871         compressed = compress.run(actions)
0872 
0873         clean = CleanFiles(self.dataset,self.user,self.options)
0874         removed = clean.run(actions)
0875         return {'Cleaned':removed,'Compressed':compressed}
0876