Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:23:30

0001 #!/usr/bin/env python
0002 # Copyright (C) 2014 Colin Bernet
0003 # https://github.com/cbernet/heppy/blob/master/LICENSE
0004 
0005 from __future__ import print_function
0006 from builtins import range
0007 import os
0008 import shutil
0009 import glob
0010 import sys
0011 import imp
0012 import copy
0013 from multiprocessing import Pool
0014 from pprint import pprint
0015 
0016 # import root in batch mode if "-i" is not among the options
0017 if "-i" not in sys.argv:
0018     oldv = sys.argv[:]
0019     sys.argv = [ "-b-"]
0020     import ROOT
0021     ROOT.gROOT.SetBatch(True)
0022     sys.argv = oldv
0023 
0024 
0025 from PhysicsTools.HeppyCore.framework.looper import Looper
0026 
0027 # global, to be used interactively when only one component is processed.
0028 loop = None
0029 
0030 def callBack( result ):
0031     pass
0032     print('production done:', str(result))
0033 
0034 def runLoopAsync(comp, outDir, configName, options):
0035     try:
0036         loop = runLoop( comp, outDir, copy.copy(sys.modules[configName].config), options)
0037         return loop.name
0038     except Exception:
0039         import traceback
0040         print("ERROR processing component %s" % comp.name)
0041         print(comp)
0042         print("STACK TRACE: ")
0043         print(traceback.format_exc())
0044         raise
0045 
0046 def runLoop( comp, outDir, config, options):
0047     fullName = '/'.join( [outDir, comp.name ] )
0048     # import pdb; pdb.set_trace()
0049     config.components = [comp]
0050     loop = Looper( fullName,
0051                    config,
0052                    options.nevents, 0,
0053                    nPrint = options.nprint,
0054                    timeReport = options.timeReport,
0055                    quiet = options.quiet)
0056     # print loop
0057     if options.iEvent is None:
0058         loop.loop()
0059         loop.write()
0060         # print loop
0061     else:
0062         # loop.InitOutput()
0063         iEvent = int(options.iEvent)
0064         loop.process( iEvent )
0065     return loop
0066 
0067 
0068 def createOutputDir(dir, components, force):
0069     '''Creates the output dir, dealing with the case where dir exists.'''
0070     answer = None
0071     try:
0072         os.mkdir(dir)
0073         return True
0074     except OSError:
0075         print('directory %s already exists' % dir)
0076         print('contents: ')
0077         dirlist = [path for path in os.listdir(dir) if os.path.isdir( '/'.join([dir, path]) )]
0078         pprint( dirlist )
0079         print('component list: ')
0080         print([comp.name for comp in components])
0081         if force is True:
0082             print('force mode, continue.')
0083             return True
0084         else:
0085             while answer not in ['Y','y','yes','N','n','no']:
0086                 answer = raw_input('Continue? [y/n]')
0087             if answer.lower().startswith('n'):
0088                 return False
0089             elif answer.lower().startswith('y'):
0090                 return True
0091             else:
0092                 raise ValueError( ' '.join(['answer can not have this value!',
0093                                             answer]) )
0094 
0095 def chunks(l, n):
0096     return [l[i:i+n] for i in range(0, len(l), n)]
0097 
0098 def split(comps):
0099     # import pdb; pdb.set_trace()
0100     splitComps = []
0101     for comp in comps:
0102         if hasattr( comp, 'fineSplitFactor') and comp.fineSplitFactor>1:
0103             subchunks = list(range(comp.fineSplitFactor))
0104             for ichunk, chunk in enumerate([(f,i) for f in comp.files for i in subchunks]):
0105                 newComp = copy.deepcopy(comp)
0106                 newComp.files = [chunk[0]]
0107                 newComp.fineSplit = ( chunk[1], comp.fineSplitFactor )
0108                 newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
0109                                                        index=ichunk)
0110                 splitComps.append( newComp )
0111         elif hasattr( comp, 'splitFactor') and comp.splitFactor>1:
0112             chunkSize = len(comp.files) / comp.splitFactor
0113             if len(comp.files) % comp.splitFactor:
0114                 chunkSize += 1
0115             # print 'chunk size',chunkSize, len(comp.files), comp.splitFactor
0116             for ichunk, chunk in enumerate( chunks( comp.files, chunkSize)):
0117                 newComp = copy.deepcopy(comp)
0118                 newComp.files = chunk
0119                 newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
0120                                                        index=ichunk)
0121                 splitComps.append( newComp )
0122         else:
0123             splitComps.append( comp )
0124     return splitComps
0125 
0126 
0127 _heppyGlobalOptions = {}
0128 
0129 def getHeppyOption(name,default=None):
0130     global _heppyGlobalOptions
0131     return _heppyGlobalOptions[name] if name in _heppyGlobalOptions else default
0132 def setHeppyOption(name,value=True):
0133     global _heppyGlobalOptions
0134     _heppyGlobalOptions[name] = value
0135 
0136 def main( options, args, parser ):
0137 
0138     if len(args) != 2:
0139         parser.print_help()
0140         print('ERROR: please provide the processing name and the component list')
0141         sys.exit(1)
0142 
0143     outDir = args[0]
0144     if os.path.exists(outDir) and not os.path.isdir( outDir ):
0145         parser.print_help()
0146         print('ERROR: when it exists, first argument must be a directory.')
0147         sys.exit(2)
0148     cfgFileName = args[1]
0149     if not os.path.isfile( cfgFileName ):
0150         parser.print_help()
0151         print('ERROR: second argument must be an existing file (your input cfg).')
0152         sys.exit(3)
0153 
0154     if options.verbose:
0155         import logging
0156         logging.basicConfig(level=logging.INFO)
0157 
0158     # Propagate global options to _heppyGlobalOptions within this module
0159     # I have to import it explicitly, 'global' does not work since the
0160     # module is not set when executing the main
0161     from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
0162     for opt in options.extraOptions:
0163         if "=" in opt:
0164             (key,val) = opt.split("=",1)
0165             _heppyGlobalOptions[key] = val
0166         else:
0167             _heppyGlobalOptions[opt] = True
0168 
0169     file = open( cfgFileName, 'r' )
0170     cfg = imp.load_source( 'PhysicsTools.HeppyCore.__cfg_to_run__', cfgFileName, file)
0171 
0172     selComps = [comp for comp in cfg.config.components if len(comp.files)>0]
0173     selComps = split(selComps)
0174     # for comp in selComps:
0175     #    print comp
0176     if len(selComps)>options.ntasks:
0177         print("WARNING: too many threads {tnum}, will just use a maximum of {jnum}.".format(tnum=len(selComps),jnum=options.ntasks))
0178     if not createOutputDir(outDir, selComps, options.force):
0179         print('exiting')
0180         sys.exit(0)
0181     if len(selComps)>1:
0182         shutil.copy( cfgFileName, outDir )
0183         pool = Pool(processes=min(len(selComps),options.ntasks))
0184         ## workaround for a scoping problem in ipython+multiprocessing
0185         import PhysicsTools.HeppyCore.framework.heppy_loop as ML 
0186         for comp in selComps:
0187             print('submitting', comp.name)
0188             pool.apply_async( ML.runLoopAsync, [comp, outDir, 'PhysicsTools.HeppyCore.__cfg_to_run__', options],
0189                               callback=ML.callBack)
0190         pool.close()
0191         pool.join()
0192     else:
0193         # when running only one loop, do not use multiprocessor module.
0194         # then, the exceptions are visible -> use only one sample for testing
0195         global loop
0196         loop = runLoop( comp, outDir, cfg.config, options )
0197     return loop