Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:50

0001 
0002 from builtins import range
0003 import copy, datetime, inspect, fnmatch, os, re, subprocess, sys, tempfile, time
0004 import glob
0005 import gzip
0006 import errno
0007 from .edmIntegrityCheck import PublishToFileSystem, IntegrityCheck
0008 from .addToDatasets import addToDatasets
0009 
0010 from . import eostools as castortools
0011 from . import das as Das
0012 
0013 from .dataset import Dataset
0014 from .datasetToSource import createDataset
0015 from .castorBaseDir import castorBaseDir
0016 
0017 def mkdir_p(path):
0018     try:
0019         os.makedirs(path)
0020     except OSError as exc: # Python >2.5
0021         if exc.errno == errno.EEXIST:
0022             pass
0023         else: raise
0024 
0025 class Task(object):
0026     """Base class for Task API"""
0027     def __init__(self, name, dataset, user, options, instance = None):
0028         self.name = name
0029         self.instance = instance
0030         self.dataset = dataset
0031         self.user = user
0032         self.options = options
0033     def getname(self):
0034         """The name of the object, using the instance if needed"""
0035         if self.instance is not None:
0036             return '%s_%s' % (self.name,self.instance)
0037         else:
0038             return self.name
0039     def addOption(self, parser):
0040         """A hook for adding things to the parser"""
0041         pass
0042     def run(self, input):
0043         """Basic API for a task. input and output are dictionaries"""
0044         return {}
0045     
0046 class ParseOptions(Task):
0047     """Common options for the script __main__: used by all production tasks"""
0048     def __init__(self, dataset, user, options):
0049         Task.__init__(self,'ParseOptions', dataset, user, options)
0050 
0051         usage = """%prog [options] <dataset>
0052         
0053 The %prog script aims to take a list of samples and process them on the batch system. Submission
0054 may be done serially (by setting --max_threads to 1), or in parallel (the default).
0055 
0056 The basic flow is:
0057 
0058     1) Check that the sample to run on exists
0059     2) Generate a source CFG
0060     3) Run locally and check everything works with a small number of events
0061     4) Submit to the batch system 
0062     5) Wait until the jobs are finished
0063     6) Check the jobs ran OK and that the files are good
0064 
0065 Example:
0066 
0067 ProductionTasks.py -u cbern -w 'PFAOD*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py /QCD_Pt-1800_TuneZ2_7TeV_pythia6/Summer11-PU_S3_START42_V11-v2/AODSIM/V2
0068 
0069 It is often useful to store the sample names in a file, in which case you could instead do:
0070 
0071 ProductionTasks.py -w '*.root' -c -N 1 -q 8nh -t PAT_CMG_V5_10_0 --output_wildcard '*.root' --cfg PATCMG_cfg.py `cat samples_mc.txt`
0072 
0073 An example file might contain:
0074 
0075 palencia%/Tbar_TuneZ2_tW-channel-DR_7TeV-powheg-tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0076 benitezj%/ZZ_TuneZ2_7TeV_pythia6_tauola/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0077 wreece%/ZJetsToNuNu_100_HT_200_7TeV-madgraph/Summer11-PU_S4_START42_V11-v1/AODSIM/V2
0078 
0079 The CASTOR username for each sample is given before the '%'.
0080 
0081 Each step in the flow has a task associated with it, which may set options. The options for each task are 
0082 documented below.
0083 
0084 """
0085         self.das = Das.DASOptionParser(usage=usage)
0086     def addOption(self, parser):
0087         parser.add_option("-u", "--user", dest="user", default=os.getlogin(),help='The username to use when looking at mass storage devices. Your login username is used by default.')
0088         parser.add_option("-w", "--wildcard", dest="wildcard", default='*.root',help='A UNIX style wildcard to specify which input files to check before submitting the jobs')    
0089         parser.add_option("--max_threads", dest="max_threads", default=None,help='The maximum number of threads to use in the production')
0090     def run(self, input):
0091         self.options, self.dataset = self.das.get_opt()
0092         self.dataset = [d for d in self.dataset if not d.startswith('#')]
0093         self.user = self.options.user
0094         if not self.dataset:
0095             raise Exception('TaskError: No dataset specified')
0096         return {'Options':self.options, 'Dataset':self.dataset}    
0097 
0098 class CheckDatasetExists(Task):
0099     """Use 'datasets.py' to check that the dataset exists in the production system.
0100     """
0101     def __init__(self, dataset, user, options):
0102         Task.__init__(self,'CheckDatasetExists', dataset, user, options)
0103     def run(self, input):
0104         pattern = fnmatch.translate(self.options.wildcard)
0105         run_range = (self.options.min_run, self.options.max_run)
0106         data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
0107         if( len(data.listOfGoodFiles()) == 0 ):
0108             raise Exception('no good root file in dataset %s | %s | %s | %s' % (self.user,
0109                                                                             self.dataset,
0110                                                                             self.options.wildcard,
0111                                                                             run_range))        
0112         return {'Dataset':self.dataset}
0113 
0114 class BaseDataset(Task):
0115     """Query DAS to find dataset name in DBS - see https://cmsweb.cern.ch/das/"""
0116     def __init__(self, dataset, user, options):
0117         Task.__init__(self,'BaseDataset', dataset, user, options)
0118     def addOption(self, parser):
0119         parser.add_option("-n", "--name", dest="name", default=None,help='The name of the dataset in DAS. Will be guessed if not specified')
0120     def query(self, dataset):
0121         """Query DAS to find out how many events are in the dataset"""
0122 
0123         host    = self.options.host
0124         debug   = self.options.verbose
0125         idx     = self.options.idx
0126         limit   = self.options.limit
0127         
0128         def check(ds):
0129             query = 'dataset=%s' % ds
0130             result = Das.get_data(host, query, idx, limit, debug)
0131             result = result.replace('null','None')
0132             result = result.replace('true','True')
0133             result = result.replace('false','False')
0134             data = eval(result)
0135             if data['status'] != 'ok':
0136                 raise Exception("Das query failed: Output is '%s'" % data)
0137             return (data['data'],data)
0138 
0139         data = None
0140         exists = False
0141         
0142         if self.options.name is None:
0143             #guess the dataset name in DBS
0144             tokens = [t for t in dataset.split(os.sep) if t]
0145             if len(tokens) >= 3:
0146                 #DBS names always have three entries
0147                 ds = os.sep + os.sep.join(tokens[0:3])
0148                 if ds:
0149                     exists, data = check(ds)
0150                     self.options.name = ds
0151         else:
0152             exists, data = check(self.options.name)
0153             if not exists:
0154                 raise Exception("Specified dataset '%s' not found in Das. Please check." % self.options.name)
0155         
0156         if data is None:
0157             raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
0158         return data
0159     
0160     def run(self, input):
0161         output = {}
0162         if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
0163             output = self.query(self.dataset)
0164         return {'Name':self.options.name,'Das':output}
0165 
0166 class GZipFiles(Task):
0167     """GZip a list of files"""
0168     def __init__(self, dataset, user, options):
0169         Task.__init__(self,'GZipFiles', dataset, user, options)
0170     def gzip(self, fileName):
0171         output = '%s.gz' % fileName
0172         
0173         f_in = open(fileName, 'rb')
0174         f_out = gzip.open(output, 'wb')
0175         f_out.writelines(f_in)
0176         f_out.close()
0177         f_in.close()
0178         #remove the original file once we've gzipped it
0179         os.remove(fileName)
0180         return output
0181            
0182     def run(self, input):
0183         files = input['FilesToCompress']['Files']
0184         
0185         compressed = []
0186         for f in files:
0187             if f is None or not f: continue
0188             if os.path.exists(f):
0189                 gz = self.gzip(f)
0190                 compressed.append(gz)
0191         return {'CompressedFiles':compressed}
0192     
0193 class CleanFiles(Task):
0194     """Remove a list of files"""
0195     def __init__(self, dataset, user, options):
0196         Task.__init__(self,'CleanFiles', dataset, user, options)
0197     def run(self, input):
0198         files = input['FilesToClean']['Files']
0199         removed = []
0200         for f in files:
0201             if f is None or not f: continue
0202             if os.path.exists(f): os.remove(f)
0203             removed.append(f)
0204         return {'CleanedFiles':removed}
0205 
0206 class FindOnCastor(Task):
0207     """Checks that the sample specified exists in the CASTOR area of the user specified. The directory must exist."""
0208     def __init__(self, dataset, user, options):
0209         Task.__init__(self,'FindOnCastor', dataset, user, options)
0210     def run(self, input):
0211         if self.user == 'CMS':
0212             return {'Topdir':None,'Directory':None}
0213         topdir = castortools.lfnToCastor(castorBaseDir(user=self.user))
0214         directory = '%s/%s' % (topdir,self.dataset)
0215         # directory = directory.replace('//','/')
0216         if not castortools.fileExists(directory):
0217             if hasattr(self,'create') and self.create:
0218                 castortools.createCastorDir(directory)
0219                 #castortools.chmod(directory,'775')
0220         if not castortools.isDirectory(directory): 
0221             raise Exception("Dataset directory '%s' does not exist or could not be created" % directory)
0222         return {'Topdir':topdir,'Directory':directory}  
0223 
0224 class CheckForMask(Task):
0225     """Tests if a file mask, created by edmIntegrityCheck.py, is present already and reads it if so."""
0226     def __init__(self, dataset, user, options):
0227         Task.__init__(self,'CheckForMask', dataset, user, options)
0228     def addOption(self, parser):
0229         parser.add_option("-c", "--check", dest="check", default=False, action='store_true',help='Check filemask if available')        
0230     def run(self, input):
0231         #skip for DBS
0232         if self.user == 'CMS':
0233             return {'MaskPresent':True,'Report':'Files taken from DBS'}
0234         
0235         dir = input['FindOnCastor']['Directory']
0236         mask = "IntegrityCheck"
0237         file_mask = []  
0238 
0239         report = None
0240         if (hasattr(self.options,'check') and self.options.check) or not hasattr(self.options,'check'):
0241             file_mask = castortools.matchingFiles(dir, '^%s_.*\\.txt$' % mask)
0242 
0243             if file_mask:
0244                 p = PublishToFileSystem(mask)
0245                 report = p.get(dir)
0246         return {'MaskPresent':report is not None,'Report':report}
0247     
0248 class CheckForWrite(Task):
0249     """Checks whether you have write access to the CASTOR directory specified"""
0250     def __init__(self, dataset, user, options):
0251         Task.__init__(self,'CheckForWrite', dataset, user, options)
0252     def run(self, input):
0253         """Check that the directory is writable"""
0254         if self.user == 'CMS':
0255             return {'Directory':None,'WriteAccess':True}
0256         dir = input['FindOnCastor']['Directory']
0257         if self.options.check:
0258             
0259             _, name = tempfile.mkstemp('.txt',text=True)
0260             testFile = file(name,'w')
0261             testFile.write('Test file')
0262             testFile.close()
0263 
0264             store = castortools.castorToLFN(dir) 
0265             #this is bad, but castortools is giving me problems
0266             if not os.system('cmsStage %s %s' % (name,store)):
0267                 fname = '%s/%s' % (dir,os.path.basename(name))
0268                 write = castortools.fileExists(fname)
0269                 if write:
0270                     castortools.rm(fname)
0271                 else:
0272                     raise Exception("Failed to write to directory '%s'" % dir)
0273             os.remove(name)
0274         return {'Directory':dir,'WriteAccess':True}
0275     
0276 class GenerateMask(Task):
0277     """Uses edmIntegrityCheck.py to generate a file mask for the sample if one is not already present."""
0278     def __init__(self, dataset, user, options):
0279         Task.__init__(self,'GenerateMask', dataset, user, options)
0280     def addOption(self, parser):
0281         parser.add_option("-r", "--recursive", dest="resursive", default=False, action='store_true',help='Walk the mass storage device recursively')
0282         parser.add_option("-p", "--printout", dest="printout", default=False, action='store_true',help='Print a report to stdout')        
0283     def run(self, input):
0284         
0285         report = None
0286         if self.options.check and not input['CheckForMask']['MaskPresent']:
0287             
0288             options = copy.deepcopy(self.options)
0289             options.user = self.user
0290 
0291             if 'BaseDataset' in input:
0292                 options.name = input['BaseDataset']['Name']
0293             else:
0294                 options.name = None
0295             
0296             check = IntegrityCheck(self.dataset,options)
0297             check.test()
0298             report = check.structured()
0299             pub = PublishToFileSystem(check)
0300             pub.publish(report)
0301         elif input['CheckForMask']['MaskPresent']:
0302             report = input['CheckForMask']['Report']
0303         
0304         return {'MaskPresent':report is not None,'Report':report}
0305 
0306 class CreateJobDirectory(Task):
0307     """Generates a job directory on your local drive"""
0308     def __init__(self, dataset, user, options):
0309         Task.__init__(self,'CreateJobDirectory', dataset, user, options)
0310     def addOption(self, parser):
0311         parser.add_option("-o","--output", dest="output", default=None,help='The directory to use locally for job files')           
0312     def run(self, input):
0313         if self.options.output is not None:
0314             output = self.options.output
0315         else:
0316             # output = '%s_%s' % (self.dataset.replace('/','.'),datetime.datetime.now().strftime("%s"))
0317             # if output.startswith('.'):
0318             output = '%s_%s' % (self.dataset,datetime.datetime.now().strftime("%s"))
0319             output = output.lstrip('/')
0320         if not os.path.exists(output):
0321             mkdir_p(output)
0322         return {'JobDir':output,'PWD':os.getcwd()}
0323 
0324 class SourceCFG(Task):
0325     """Generate a source CFG using 'sourceFileList.py' by listing the CASTOR directory specified. Applies the file wildcard, '--wildcard'"""
0326     def __init__(self, dataset, user, options):
0327         Task.__init__(self,'SourceCFG', dataset, user, options)    
0328     def addOption(self, parser):
0329         parser.add_option("--min-run", dest="min_run", default=-1, type=int, help='When querying DBS, require runs >= than this run')
0330         parser.add_option("--max-run", dest="max_run", default=-1, type=int, help='When querying DBS, require runs <= than this run')
0331         parser.add_option("--input-prescale", dest="prescale", default=1, type=int, help='Randomly prescale the number of good files by this factor.')
0332     def run(self, input):
0333 
0334         jobdir = input['CreateJobDirectory']['JobDir']
0335         pattern = fnmatch.translate(self.options.wildcard)
0336         
0337         run_range = (self.options.min_run, self.options.max_run)
0338         data = createDataset(self.user, self.dataset, pattern, run_range = run_range)
0339         good_files = data.listOfGoodFilesWithPrescale(self.options.prescale)
0340         #will mark prescale removed files as bad in comments
0341         bad_files = [fname for fname in data.listOfFiles() if not fname in good_files]
0342         
0343         source = os.path.join(jobdir,'source_cfg.py')
0344         output = file(source,'w')
0345         output.write('###SourceCFG:\t%d GoodFiles; %d BadFiles found in mask; Input prescale factor %d\n' % (len(good_files),len(bad_files),self.options.prescale) )
0346         output.write('files = ' + str(good_files) + '\n')
0347         for bad_file in bad_files:
0348             output.write("###SourceCFG:\tBadInMask '%s'\n" % bad_file)
0349         output.close()
0350         return {'SourceCFG':source}
0351 
0352 
0353 def insertLines( insertedTo, toInsert ):
0354     '''insert a sequence in another sequence.
0355 
0356     the sequence is inserted either at the end, or at the position
0357     of the HOOK, if it is found.
0358     The HOOK is considered as being found if
0359       str(elem).find(###ProductionTaskHook$$$)
0360     is true for one of the elements in the insertedTo sequence. 
0361     '''
0362     HOOK = '###ProductionTaskHook$$$'
0363     hookIndex = None
0364     for index, line in enumerate(insertedTo):
0365         line = str(line)
0366         if line.find(HOOK)>-1:
0367             hookIndex = index
0368             break        
0369     if hookIndex is not None:
0370         before = insertedTo[:hookIndex]
0371         after = insertedTo[hookIndex:]
0372         result = before + toInsert + after
0373         return result
0374     else:
0375         insertedTo.extend( toInsert )
0376         return insertedTo
0377 
0378     
0379 class FullCFG(Task):
0380     """Generate the full CFG needed to run the job and writes it to the job directory"""
0381     def __init__(self, dataset, user, options):
0382         Task.__init__(self,'FullCFG', dataset, user, options)
0383     def addOption(self, parser):
0384         parser.add_option("--cfg", dest="cfg", default=None, help='The top level CFG to run')           
0385         parser.add_option("--nEventsPerJob", dest="nEventsPerJob", default=None, help='Number of events per job (for testing)')           
0386     def run(self, input):
0387         
0388         jobdir = input['CreateJobDirectory']['JobDir']
0389 
0390         if self.options.cfg is None or not os.path.exists(self.options.cfg):
0391             raise Exception("The file '%s' does not exist. Please check." % self.options.cfg)
0392         
0393         config = file(self.options.cfg).readlines()
0394         sourceFile = os.path.basename(input['SourceCFG']['SourceCFG'])
0395         if sourceFile.lower().endswith('.py'):
0396             sourceFile = sourceFile[:-3]
0397         
0398         source = os.path.join(jobdir,'full_cfg.py')
0399         output = file(source,'w')
0400 
0401         nEventsPerJob = -1
0402         if self.options.nEventsPerJob:
0403             nEventsPerJob = int(self.options.nEventsPerJob)
0404         
0405         toInsert = ['\nfrom %s import *\n' % sourceFile,
0406                     'process.source.fileNames = files\n',
0407                     'if hasattr(process,"maxEvents"): process.maxEvents.input = cms.untracked.int32({nEvents})\n'.format(nEvents=nEventsPerJob),
0408                     'if hasattr(process,"maxLuminosityBlocks"): process.maxLuminosityBlocks.input = cms.untracked.int32(-1)\n'
0409                     'datasetInfo = ("%s","%s","%s")\n' % (self.user, self.dataset, fnmatch.translate(self.options.wildcard) )
0410                     ]
0411         config = insertLines( config, toInsert )
0412         output.writelines(config)
0413         output.close()
0414         return {'FullCFG':source}
0415 
0416 class CheckConfig(Task):
0417     """Check the basic syntax of a CFG file by running python on it."""    
0418     def __init__(self, dataset, user, options):
0419         Task.__init__(self,'CheckConfig', dataset, user, options)
0420     def run(self, input):
0421         
0422         full = input['FullCFG']['FullCFG']
0423         
0424         child = subprocess.Popen(['python',full], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0425         stdout, stderr = child.communicate()
0426         if child.returncode != 0:
0427             raise Exception("Syntax check of cfg failed. Error was '%s'. (%i)" % (stderr,child.returncode))
0428         return {'Status':'VALID'}   
0429 
0430 class RunTestEvents(Task):
0431     """Run cmsRun but with a small number of events on the job CFG."""    
0432 
0433     def __init__(self, dataset, user, options):
0434         Task.__init__(self,'RunTestEvents', dataset, user, options)
0435     def run(self, input):
0436         
0437         full = input['FullCFG']['FullCFG']
0438         jobdir = input['CreateJobDirectory']['JobDir']
0439         
0440         config = file(full).readlines()
0441         source = os.path.join(jobdir,'test_cfg.py')
0442         output = file(source,'w')
0443         toInsert = ['\n',
0444                     'process.maxEvents.input = cms.untracked.int32(5)\n',
0445                     'if hasattr(process,"source"): process.source.fileNames = process.source.fileNames[:10]\n'
0446                     ]
0447         config = insertLines( config, toInsert )
0448         output.writelines(config)
0449         output.close()
0450                 
0451         pwd = os.getcwd()
0452         
0453         error = None
0454         try:
0455             os.chdir(jobdir)
0456             
0457             child = subprocess.Popen(['cmsRun',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0458             stdout, stderr = child.communicate()
0459             
0460             if child.returncode != 0:
0461                 error = "Failed to cmsRun with a few events. Error was '%s' (%i)." % (stderr,child.returncode)
0462         finally:
0463             os.chdir(pwd)
0464             
0465         if error is not None:
0466             raise Exception(error)
0467 
0468         return {'Status':'VALID','TestCFG':source}
0469     
0470 class ExpandConfig(Task):
0471     """Runs edmConfigDump to produce an expanded cfg file"""    
0472 
0473     def __init__(self, dataset, user, options):
0474         Task.__init__(self,'ExpandConfig', dataset, user, options)
0475     def run(self, input):
0476         
0477         full = input['FullCFG']['FullCFG']
0478         jobdir = input['CreateJobDirectory']['JobDir']
0479 
0480         config = file(full).read()
0481         source = os.path.join(jobdir,'test_cfg.py')
0482         expanded = 'Expanded%s' % os.path.basename(full)
0483         output = file(source,'w')
0484         output.write(config)
0485         output.write("file('%s','w').write(process.dumpPython())\n" % expanded)
0486         output.close()
0487 
0488         pwd = os.getcwd()
0489         
0490         result = {}
0491         error = None
0492         try:
0493             os.chdir(jobdir)
0494             
0495             child = subprocess.Popen(['python',os.path.basename(source)], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0496             stdout, stderr = child.communicate()
0497             
0498             if child.returncode != 0:
0499                 error = "Failed to edmConfigDump. Error was '%s' (%i)." % (stderr,child.returncode)
0500             result['ExpandedFullCFG'] = os.path.join(jobdir,expanded)
0501             
0502         finally:
0503             os.chdir(pwd)
0504             
0505         if error is not None:
0506             raise Exception(error)
0507 
0508         return result
0509     
0510 class WriteToDatasets(Task):
0511     """Publish the sample to 'Datasets.txt' if required"""    
0512     def __init__(self, dataset, user, options):
0513         Task.__init__(self,'WriteToDatasets', dataset, user, options)
0514     def run(self, input):
0515         name = "%s/%s" % (self.dataset,self.options.tier)
0516         name = name.replace('//','/')
0517         user = self.options.batch_user
0518         added = addToDatasets(name, user = user)
0519         return {'Added':added, 'Name':name, 'User':user}       
0520     
0521 class RunCMSBatch(Task):
0522     """Run the 'cmsBatch.py' command on your CFG, submitting to the CERN batch system"""    
0523 
0524     def __init__(self, dataset, user, options):
0525         Task.__init__(self,'RunCMSBatch', dataset, user, options)
0526     def addOption(self, parser):
0527         parser.add_option("--batch_user", dest="batch_user", help="The user for LSF", default=os.getlogin())
0528         parser.add_option("--run_batch", dest="run_batch", default=True, action='store_true',help='Run on the batch system')
0529         parser.add_option("-N", "--numberOfInputFiles", dest="nInput",help="Number of input files per job",default=5,type=int)
0530         parser.add_option("-q", "--queue", dest="queue", help="The LSF queue to use", default="1nh")        
0531         parser.add_option("-t", "--tier", dest="tier",
0532                           help="Tier: extension you can give to specify you are doing a new production. If you give a Tier, your new files will appear in sampleName/tierName, which will constitute a new dataset.",
0533                           default="")
0534         parser.add_option("-G", "--group", dest="group", help="The LSF user group to use, e.g. 'u_zh'", default=None)        
0535         
0536     def run(self, input):
0537         find = FindOnCastor(self.dataset,self.options.batch_user,self.options)
0538         find.create = True
0539         out = find.run({})
0540         
0541         full = input['ExpandConfig']['ExpandedFullCFG']
0542         jobdir = input['CreateJobDirectory']['JobDir']
0543         
0544         sampleDir = os.path.join(out['Directory'],self.options.tier)
0545         sampleDir = castortools.castorToLFN(sampleDir)
0546         
0547         cmd = ['cmsBatch.py',str(self.options.nInput),os.path.basename(full),'-o','%s_Jobs' % self.options.tier,'--force']
0548         cmd.extend(['-r',sampleDir])
0549         if self.options.run_batch:
0550             jname = "%s/%s" % (self.dataset,self.options.tier)
0551             jname = jname.replace("//","/")
0552             user_group = ''
0553             if self.options.group is not None:
0554                 user_group = '-G %s' % self.options.group
0555             cmd.extend(['-b',"'bsub -q %s -J %s -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id.txt'" % (self.options.queue,jname,user_group)])
0556         print(" ".join(cmd))
0557         
0558         pwd = os.getcwd()
0559         
0560         error = None
0561         try:
0562             os.chdir(jobdir)
0563             returncode = os.system(" ".join(cmd))
0564 
0565             if returncode != 0:
0566                 error = "Running cmsBatch failed. Return code was %i." % returncode
0567         finally:
0568             os.chdir(pwd)
0569             
0570         if error is not None:
0571             raise Exception(error)
0572 
0573         return {'SampleDataset':"%s/%s" % (self.dataset,self.options.tier),'BatchUser':self.options.batch_user,
0574                 'SampleOutputDir':sampleDir,'LSFJobsTopDir':os.path.join(jobdir,'%s_Jobs' % self.options.tier)}
0575 
0576 class MonitorJobs(Task):
0577     """Monitor LSF jobs created with cmsBatch.py. Blocks until all jobs are finished."""    
0578     def __init__(self, dataset, user, options):
0579         Task.__init__(self,'MonitorJobs', dataset, user, options)
0580     
0581     def getjobid(self, job_dir):
0582         """Parse the LSF output to find the job id"""
0583         input = os.path.join(job_dir,'job_id.txt')
0584         result = None
0585         if os.path.exists(input):
0586             contents = file(input).read()
0587             for c in contents.split('\n'):
0588                 if c and re.match('^Job <\\d*> is submitted to queue <.*>',c) is not None:
0589                     try:
0590                         result = c.split('<')[1].split('>')[0]
0591                     except Exception as e:
0592                         print('Job ID parsing error',str(e),c, file=sys.stderr)
0593         return result
0594     
0595     def monitor(self, jobs, previous):
0596 
0597         #executes bjobs with a list of job IDs
0598         cmd = ['bjobs','-u',self.options.batch_user]
0599         cmd.extend([v for v in jobs.values() if v is not None])#filter out unknown IDs
0600         child = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
0601         stdout, stderr = child.communicate()
0602 
0603         def parseHeader(header):
0604             """Parse the header from bjobs"""
0605             tokens = [t for t in header.split(' ') if t]
0606             result = {}
0607             for i in range(len(tokens)):
0608                 result[tokens[i]] = i          
0609 
0610             return result
0611 
0612         result = {}
0613         if stdout:
0614             lines = stdout.split('\n')
0615             if lines:
0616                 header = parseHeader(lines[0])                                 
0617                 if not 'STAT' in header or not 'JOBID' in header:
0618                     print('Problem parsing bjobs header\n',lines, file=sys.stderr)
0619                     return result
0620                 for line in lines[1:]:
0621                     #TODO: Unreliable for some fields, e.g. dates
0622                     tokens = [t for t in line.split(' ') if t]
0623                     if len(tokens) < len(header): continue
0624                     id = tokens[header['JOBID']]
0625                     user = tokens[header['USER']]
0626                     status = tokens[header['STAT']]
0627 
0628                     result[id] = status               
0629                     
0630         if stderr:
0631             lines = stderr.split('\n')
0632             if lines:
0633                 for line in lines:
0634                     if line and re.match('^Job <\\d*> is not found',line) is not None:
0635                         try:
0636                             id = line.split('<')[1].split('>')[0]
0637                             if id not in result and id not in previous:
0638                                 result[id] = 'FORGOTTEN'
0639                         except Exception as e:
0640                             print('Job ID parsing error in STDERR',str(e),line, file=sys.stderr)
0641         
0642         #after one hour the status is no longer available     
0643         if result:
0644             for id in jobs.values():
0645                 if id not in result and id in previous:
0646                     result[id] = previous[id]
0647         return result
0648     
0649     def run(self, input):
0650 
0651         # return #COLIN
0652         jobsdir = input['RunCMSBatch']['LSFJobsTopDir']
0653         if not os.path.exists(jobsdir):
0654             raise Exception("LSF jobs dir does not exist: '%s'" % jobsdir)
0655         
0656         subjobs = [s for s in glob.glob("%s/Job_[0-9]*" % jobsdir) if os.path.isdir(s)]
0657         jobs = {}
0658         for s in subjobs:
0659             jobs[s] = self.getjobid(s)
0660 
0661         def checkStatus(stat):
0662             
0663             #gzip files on the fly
0664             actions = {'FilesToCompress':{'Files':[]}}
0665             
0666             result = {}
0667             for j, id in jobs.items():
0668                 if id is None:
0669                     result[j] = 'UNKNOWN'
0670                 else:
0671                     if id in stat:
0672                         result[j] = stat[id]
0673                         if result[j] in ['DONE','EXIT','FORGOTTEN']:
0674                             stdout = os.path.join(j,'LSFJOB_%s' % id,'STDOUT')
0675                             if os.path.exists(stdout):
0676                                 #compress this file
0677                                 actions['FilesToCompress']['Files'].append(stdout)
0678                                 result[j] = '%s.gz' % stdout
0679                             elif os.path.exists('%s.gz' % stdout):
0680                                 result[j] = '%s.gz' % stdout
0681                             else:
0682                                 result[j] = 'NOSTDOUT'
0683                         
0684                             #also compress the stderr, although this is mostly empty
0685                             stderr = os.path.join(j,'LSFJOB_%s' % id,'STDERR')
0686                             if os.path.exists(stderr):
0687                                 #compress this file
0688                                 actions['FilesToCompress']['Files'].append(stderr)
0689                                 
0690             compress = GZipFiles(self.dataset,self.user,self.options)
0691             compress.run(actions)
0692             return result
0693         
0694         def countJobs(stat):
0695             """Count jobs that are monitorable - i.e. not in a final state"""
0696             result = []
0697             for j, id in jobs.items():
0698                 if id is not None and id in stat:
0699                     st = stat[id]
0700                     if st in ['PEND','PSUSP','RUN','USUSP','SSUSP','WAIT']:
0701                         result.append(id)
0702             return result
0703         
0704         def writeKillScript(mon):
0705             """Write a shell script to kill the jobs we know about"""
0706             kill = os.path.join(jobsdir,'kill_jobs.sh')
0707             output = file(kill,'w')
0708             script = """
0709 #!/usr/bin/env bash
0710 echo "Killing jobs"
0711 bkill -u %s %s
0712             """ % (self.options.batch_user," ".join(mon))
0713             output.write(script)
0714             output.close()
0715             return mon
0716                     
0717         #continue monitoring while there are jobs to monitor
0718         status = self.monitor(jobs,{})
0719         monitorable = writeKillScript(countJobs(status))
0720         count = 0
0721         
0722         while monitorable:
0723             job_status = checkStatus(status)
0724             time.sleep(60)
0725             status = self.monitor(jobs,status)
0726             monitorable = writeKillScript(countJobs(status))
0727             if not (count % 3):
0728                 print('%s: Monitoring %i jobs (%s)' % (self.name,len(monitorable),self.dataset))
0729             count += 1
0730             
0731         return {'LSFJobStatus':checkStatus(status),'LSFJobIDs':jobs}  
0732     
0733 class CheckJobStatus(Task):
0734     """Checks the job STDOUT to catch common problems like exceptions, CPU time exceeded. Sets the job status in the report accordingly."""    
0735     def __init__(self, dataset, user, options):
0736         Task.__init__(self,'CheckJobStatus', dataset, user, options)
0737     def addOption(self, parser):
0738         parser.add_option("--output_wildcard", dest="output_wildcard", help="The wildcard to use when testing the output of this production (defaults to same as -w)", default=None)           
0739     def run(self, input):
0740         
0741         job_status = input['MonitorJobs']['LSFJobStatus']
0742 
0743         result = {}
0744         for j, status in job_status.items():
0745             valid = True
0746             if os.path.exists(status):
0747 
0748                 fileHandle = None
0749                 if status.endswith('.gz') or status.endswith('.GZ'):
0750                     fileHandle = gzip.GzipFile(status)
0751                 else:
0752                     fileHandle = file(status)
0753                 
0754                 open_count = 0
0755                 close_count = 0
0756                 for line in fileHandle:
0757                     #start by counting files opened and closed
0758                     #suggestion from Enrique
0759                     if 'pened file' in line:
0760                         open_count += 1
0761                     if 'losed file' in line:
0762                         close_count += 1
0763                     
0764                     if 'Exception' in line:
0765                         result[j] = 'Exception'
0766                         valid = False
0767                         break
0768                     elif 'CPU time limit exceeded' in line:
0769                         result[j] = 'CPUTimeExceeded'
0770                         valid = False
0771                         break
0772                     elif 'Killed' in line:
0773                         result[j] = 'JobKilled'
0774                         valid = False
0775                         break
0776                     elif 'A fatal system signal has occurred' in line:
0777                         result[j] = 'SegFault'
0778                         valid = False
0779                         break
0780                     
0781                 if valid and open_count != close_count:
0782                     result[j] = 'FileOpenCloseMismatch'
0783                     valid = False
0784                 if valid:
0785                     result[j] = 'VALID'
0786             else:
0787                 result[j] = status
0788 
0789         #allows a different wildcard in the final check.
0790         options = copy.deepcopy(self.options)
0791         if self.options.output_wildcard is not None:
0792             options.wildcard = self.options.output_wildcard
0793 
0794         mask = GenerateMask(input['RunCMSBatch']['SampleDataset'],self.options.batch_user,options)
0795         report = mask.run({'CheckForMask':{'MaskPresent':False}})
0796         report['LSFJobStatusCheck'] = result
0797         return report
0798     
0799 class WriteJobReport(Task):
0800     """Write a summary report on each job"""    
0801     def __init__(self, dataset, user, options):
0802         Task.__init__(self,'WriteJobReport', dataset, user, options)
0803     def run(self, input):
0804         
0805         report = input['CheckJobStatus']
0806         
0807         #collect a list of jobs by status
0808         states = {}
0809         for j, status in report['LSFJobStatusCheck'].items():
0810             if status not in states:
0811                 states[status] = []
0812             states[status].append(j)
0813         jobdir = input['CreateJobDirectory']['PWD']
0814         if not os.path.exists(jobdir):
0815             raise Exception("Top level job directory not found: '%s'" % jobdir)
0816         report_file = os.path.join(input['CreateJobDirectory']['JobDir'],'resubmit.sh')
0817 
0818         output = file(report_file,'w')
0819         output.write('#!/usr/bin/env bash\n')
0820         
0821         if report['MaskPresent']:
0822             mask = report['Report']
0823             output.write('#PrimaryDatasetFraction: %f\n' % mask['PrimaryDatasetFraction'])
0824             output.write('#FilesGood: %i\n' % mask['FilesGood'])
0825             output.write('#FilesBad: %i\n' % mask['FilesBad'])        
0826         
0827         user_group = ''
0828         if self.options.group is not None:
0829             user_group = '-G %s' % self.options.group
0830 
0831         for status, jobs in states.items():
0832             output.write('# %d jobs found in state %s\n' % (len(jobs),status) )
0833             if status == 'VALID':
0834                 continue
0835             for j in jobs:
0836                 jdir = os.path.join(jobdir,j)
0837                 output.write('pushd %s; bsub -q %s -J RESUB -u cmgtoolslsf@gmail.com %s < ./batchScript.sh | tee job_id_resub.txt; popd\n' % (jdir,self.options.queue,user_group))
0838         output.close()
0839             
0840         return {'SummaryFile':report_file}
0841     
0842 class CleanJobFiles(Task):
0843     """Removes and compresses auto-generated files from the job directory to save space."""    
0844     def __init__(self, dataset, user, options):
0845         Task.__init__(self,'CleanJobFiles', dataset, user, options)
0846     def run(self, input):
0847         
0848         jobdir = input['CreateJobDirectory']['JobDir']
0849         jobs = input['MonitorJobs']['LSFJobIDs']
0850         job_status = input['MonitorJobs']['LSFJobStatus']
0851         
0852         actions = {'FilesToCompress':{'Files':[]},'FilesToClean':{'Files':[]}}
0853         
0854         actions['FilesToClean']['Files'].append(input['ExpandConfig']['ExpandedFullCFG'])
0855         if 'RunTestEvents' in input:
0856             actions['FilesToClean']['Files'].append(input['RunTestEvents']['TestCFG'])
0857 
0858         for rt in glob.iglob('%s/*.root' % jobdir):
0859             actions['FilesToClean']['Files'].append(rt)
0860         for pyc in glob.iglob('%s/*.pyc' % jobdir):
0861             actions['FilesToClean']['Files'].append(pyc)
0862 
0863         for j in jobs:
0864             status = job_status[j]
0865             if os.path.exists(status) and not status.endswith('.gz'):
0866                 actions['FilesToCompress']['Files'].append(status)
0867                 
0868         compress = GZipFiles(self.dataset,self.user,self.options)
0869         compressed = compress.run(actions)
0870 
0871         clean = CleanFiles(self.dataset,self.user,self.options)
0872         removed = clean.run(actions)
0873         return {'Cleaned':removed,'Compressed':compressed}
0874