Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:50

0001 #!/usr/bin/env python
0002 # Copyright (C) 2014 Colin Bernet
0003 # https://github.com/cbernet/heppy/blob/master/LICENSE
0004 
0005 from builtins import range
0006 import os
0007 import shutil
0008 import glob
0009 import sys
0010 import imp
0011 import copy
0012 from multiprocessing import Pool
0013 from pprint import pprint
0014 
0015 # import root in batch mode if "-i" is not among the options
0016 if "-i" not in sys.argv:
0017     oldv = sys.argv[:]
0018     sys.argv = [ "-b-"]
0019     import ROOT
0020     ROOT.gROOT.SetBatch(True)
0021     sys.argv = oldv
0022 
0023 
0024 from PhysicsTools.HeppyCore.framework.looper import Looper
0025 
0026 # global, to be used interactively when only one component is processed.
0027 loop = None
0028 
0029 def callBack( result ):
0030     pass
0031     print('production done:', str(result))
0032 
0033 def runLoopAsync(comp, outDir, configName, options):
0034     try:
0035         loop = runLoop( comp, outDir, copy.copy(sys.modules[configName].config), options)
0036         return loop.name
0037     except Exception:
0038         import traceback
0039         print("ERROR processing component %s" % comp.name)
0040         print(comp)
0041         print("STACK TRACE: ")
0042         print(traceback.format_exc())
0043         raise
0044 
0045 def runLoop( comp, outDir, config, options):
0046     fullName = '/'.join( [outDir, comp.name ] )
0047     # import pdb; pdb.set_trace()
0048     config.components = [comp]
0049     loop = Looper( fullName,
0050                    config,
0051                    options.nevents, 0,
0052                    nPrint = options.nprint,
0053                    timeReport = options.timeReport,
0054                    quiet = options.quiet)
0055     # print loop
0056     if options.iEvent is None:
0057         loop.loop()
0058         loop.write()
0059         # print loop
0060     else:
0061         # loop.InitOutput()
0062         iEvent = int(options.iEvent)
0063         loop.process( iEvent )
0064     return loop
0065 
0066 
0067 def createOutputDir(dir, components, force):
0068     '''Creates the output dir, dealing with the case where dir exists.'''
0069     answer = None
0070     try:
0071         os.mkdir(dir)
0072         return True
0073     except OSError:
0074         print('directory %s already exists' % dir)
0075         print('contents: ')
0076         dirlist = [path for path in os.listdir(dir) if os.path.isdir( '/'.join([dir, path]) )]
0077         pprint( dirlist )
0078         print('component list: ')
0079         print([comp.name for comp in components])
0080         if force is True:
0081             print('force mode, continue.')
0082             return True
0083         else:
0084             while answer not in ['Y','y','yes','N','n','no']:
0085                 answer = raw_input('Continue? [y/n]')
0086             if answer.lower().startswith('n'):
0087                 return False
0088             elif answer.lower().startswith('y'):
0089                 return True
0090             else:
0091                 raise ValueError( ' '.join(['answer can not have this value!',
0092                                             answer]) )
0093 
0094 def chunks(l, n):
0095     return [l[i:i+n] for i in range(0, len(l), n)]
0096 
0097 def split(comps):
0098     # import pdb; pdb.set_trace()
0099     splitComps = []
0100     for comp in comps:
0101         if hasattr( comp, 'fineSplitFactor') and comp.fineSplitFactor>1:
0102             subchunks = list(range(comp.fineSplitFactor))
0103             for ichunk, chunk in enumerate([(f,i) for f in comp.files for i in subchunks]):
0104                 newComp = copy.deepcopy(comp)
0105                 newComp.files = [chunk[0]]
0106                 newComp.fineSplit = ( chunk[1], comp.fineSplitFactor )
0107                 newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
0108                                                        index=ichunk)
0109                 splitComps.append( newComp )
0110         elif hasattr( comp, 'splitFactor') and comp.splitFactor>1:
0111             chunkSize = len(comp.files) / comp.splitFactor
0112             if len(comp.files) % comp.splitFactor:
0113                 chunkSize += 1
0114             # print 'chunk size',chunkSize, len(comp.files), comp.splitFactor
0115             for ichunk, chunk in enumerate( chunks( comp.files, chunkSize)):
0116                 newComp = copy.deepcopy(comp)
0117                 newComp.files = chunk
0118                 newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
0119                                                        index=ichunk)
0120                 splitComps.append( newComp )
0121         else:
0122             splitComps.append( comp )
0123     return splitComps
0124 
0125 
0126 _heppyGlobalOptions = {}
0127 
0128 def getHeppyOption(name,default=None):
0129     global _heppyGlobalOptions
0130     return _heppyGlobalOptions[name] if name in _heppyGlobalOptions else default
0131 def setHeppyOption(name,value=True):
0132     global _heppyGlobalOptions
0133     _heppyGlobalOptions[name] = value
0134 
0135 def main( options, args, parser ):
0136 
0137     if len(args) != 2:
0138         parser.print_help()
0139         print('ERROR: please provide the processing name and the component list')
0140         sys.exit(1)
0141 
0142     outDir = args[0]
0143     if os.path.exists(outDir) and not os.path.isdir( outDir ):
0144         parser.print_help()
0145         print('ERROR: when it exists, first argument must be a directory.')
0146         sys.exit(2)
0147     cfgFileName = args[1]
0148     if not os.path.isfile( cfgFileName ):
0149         parser.print_help()
0150         print('ERROR: second argument must be an existing file (your input cfg).')
0151         sys.exit(3)
0152 
0153     if options.verbose:
0154         import logging
0155         logging.basicConfig(level=logging.INFO)
0156 
0157     # Propagate global options to _heppyGlobalOptions within this module
0158     # I have to import it explicitly, 'global' does not work since the
0159     # module is not set when executing the main
0160     from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
0161     for opt in options.extraOptions:
0162         if "=" in opt:
0163             (key,val) = opt.split("=",1)
0164             _heppyGlobalOptions[key] = val
0165         else:
0166             _heppyGlobalOptions[opt] = True
0167 
0168     file = open( cfgFileName, 'r' )
0169     cfg = imp.load_source( 'PhysicsTools.HeppyCore.__cfg_to_run__', cfgFileName, file)
0170 
0171     selComps = [comp for comp in cfg.config.components if len(comp.files)>0]
0172     selComps = split(selComps)
0173     # for comp in selComps:
0174     #    print comp
0175     if len(selComps)>options.ntasks:
0176         print("WARNING: too many threads {tnum}, will just use a maximum of {jnum}.".format(tnum=len(selComps),jnum=options.ntasks))
0177     if not createOutputDir(outDir, selComps, options.force):
0178         print('exiting')
0179         sys.exit(0)
0180     if len(selComps)>1:
0181         shutil.copy( cfgFileName, outDir )
0182         pool = Pool(processes=min(len(selComps),options.ntasks))
0183         ## workaround for a scoping problem in ipython+multiprocessing
0184         import PhysicsTools.HeppyCore.framework.heppy_loop as ML 
0185         for comp in selComps:
0186             print('submitting', comp.name)
0187             pool.apply_async( ML.runLoopAsync, [comp, outDir, 'PhysicsTools.HeppyCore.__cfg_to_run__', options],
0188                               callback=ML.callBack)
0189         pool.close()
0190         pool.join()
0191     else:
0192         # when running only one loop, do not use multiprocessor module.
0193         # then, the exceptions are visible -> use only one sample for testing
0194         global loop
0195         loop = runLoop( comp, outDir, cfg.config, options )
0196     return loop