File indexing completed on 2024-11-25 02:29:50
0001
0002
0003
0004
0005 from builtins import range
0006 import os
0007 import shutil
0008 import glob
0009 import sys
0010 import imp
0011 import copy
0012 from multiprocessing import Pool
0013 from pprint import pprint
0014
0015
0016 if "-i" not in sys.argv:
0017 oldv = sys.argv[:]
0018 sys.argv = [ "-b-"]
0019 import ROOT
0020 ROOT.gROOT.SetBatch(True)
0021 sys.argv = oldv
0022
0023
0024 from PhysicsTools.HeppyCore.framework.looper import Looper
0025
0026
0027 loop = None
0028
0029 def callBack( result ):
0030 pass
0031 print('production done:', str(result))
0032
0033 def runLoopAsync(comp, outDir, configName, options):
0034 try:
0035 loop = runLoop( comp, outDir, copy.copy(sys.modules[configName].config), options)
0036 return loop.name
0037 except Exception:
0038 import traceback
0039 print("ERROR processing component %s" % comp.name)
0040 print(comp)
0041 print("STACK TRACE: ")
0042 print(traceback.format_exc())
0043 raise
0044
0045 def runLoop( comp, outDir, config, options):
0046 fullName = '/'.join( [outDir, comp.name ] )
0047
0048 config.components = [comp]
0049 loop = Looper( fullName,
0050 config,
0051 options.nevents, 0,
0052 nPrint = options.nprint,
0053 timeReport = options.timeReport,
0054 quiet = options.quiet)
0055
0056 if options.iEvent is None:
0057 loop.loop()
0058 loop.write()
0059
0060 else:
0061
0062 iEvent = int(options.iEvent)
0063 loop.process( iEvent )
0064 return loop
0065
0066
0067 def createOutputDir(dir, components, force):
0068 '''Creates the output dir, dealing with the case where dir exists.'''
0069 answer = None
0070 try:
0071 os.mkdir(dir)
0072 return True
0073 except OSError:
0074 print('directory %s already exists' % dir)
0075 print('contents: ')
0076 dirlist = [path for path in os.listdir(dir) if os.path.isdir( '/'.join([dir, path]) )]
0077 pprint( dirlist )
0078 print('component list: ')
0079 print([comp.name for comp in components])
0080 if force is True:
0081 print('force mode, continue.')
0082 return True
0083 else:
0084 while answer not in ['Y','y','yes','N','n','no']:
0085 answer = raw_input('Continue? [y/n]')
0086 if answer.lower().startswith('n'):
0087 return False
0088 elif answer.lower().startswith('y'):
0089 return True
0090 else:
0091 raise ValueError( ' '.join(['answer can not have this value!',
0092 answer]) )
0093
0094 def chunks(l, n):
0095 return [l[i:i+n] for i in range(0, len(l), n)]
0096
0097 def split(comps):
0098
0099 splitComps = []
0100 for comp in comps:
0101 if hasattr( comp, 'fineSplitFactor') and comp.fineSplitFactor>1:
0102 subchunks = list(range(comp.fineSplitFactor))
0103 for ichunk, chunk in enumerate([(f,i) for f in comp.files for i in subchunks]):
0104 newComp = copy.deepcopy(comp)
0105 newComp.files = [chunk[0]]
0106 newComp.fineSplit = ( chunk[1], comp.fineSplitFactor )
0107 newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
0108 index=ichunk)
0109 splitComps.append( newComp )
0110 elif hasattr( comp, 'splitFactor') and comp.splitFactor>1:
0111 chunkSize = len(comp.files) / comp.splitFactor
0112 if len(comp.files) % comp.splitFactor:
0113 chunkSize += 1
0114
0115 for ichunk, chunk in enumerate( chunks( comp.files, chunkSize)):
0116 newComp = copy.deepcopy(comp)
0117 newComp.files = chunk
0118 newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
0119 index=ichunk)
0120 splitComps.append( newComp )
0121 else:
0122 splitComps.append( comp )
0123 return splitComps
0124
0125
0126 _heppyGlobalOptions = {}
0127
0128 def getHeppyOption(name,default=None):
0129 global _heppyGlobalOptions
0130 return _heppyGlobalOptions[name] if name in _heppyGlobalOptions else default
0131 def setHeppyOption(name,value=True):
0132 global _heppyGlobalOptions
0133 _heppyGlobalOptions[name] = value
0134
0135 def main( options, args, parser ):
0136
0137 if len(args) != 2:
0138 parser.print_help()
0139 print('ERROR: please provide the processing name and the component list')
0140 sys.exit(1)
0141
0142 outDir = args[0]
0143 if os.path.exists(outDir) and not os.path.isdir( outDir ):
0144 parser.print_help()
0145 print('ERROR: when it exists, first argument must be a directory.')
0146 sys.exit(2)
0147 cfgFileName = args[1]
0148 if not os.path.isfile( cfgFileName ):
0149 parser.print_help()
0150 print('ERROR: second argument must be an existing file (your input cfg).')
0151 sys.exit(3)
0152
0153 if options.verbose:
0154 import logging
0155 logging.basicConfig(level=logging.INFO)
0156
0157
0158
0159
0160 from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
0161 for opt in options.extraOptions:
0162 if "=" in opt:
0163 (key,val) = opt.split("=",1)
0164 _heppyGlobalOptions[key] = val
0165 else:
0166 _heppyGlobalOptions[opt] = True
0167
0168 file = open( cfgFileName, 'r' )
0169 cfg = imp.load_source( 'PhysicsTools.HeppyCore.__cfg_to_run__', cfgFileName, file)
0170
0171 selComps = [comp for comp in cfg.config.components if len(comp.files)>0]
0172 selComps = split(selComps)
0173
0174
0175 if len(selComps)>options.ntasks:
0176 print("WARNING: too many threads {tnum}, will just use a maximum of {jnum}.".format(tnum=len(selComps),jnum=options.ntasks))
0177 if not createOutputDir(outDir, selComps, options.force):
0178 print('exiting')
0179 sys.exit(0)
0180 if len(selComps)>1:
0181 shutil.copy( cfgFileName, outDir )
0182 pool = Pool(processes=min(len(selComps),options.ntasks))
0183
0184 import PhysicsTools.HeppyCore.framework.heppy_loop as ML
0185 for comp in selComps:
0186 print('submitting', comp.name)
0187 pool.apply_async( ML.runLoopAsync, [comp, outDir, 'PhysicsTools.HeppyCore.__cfg_to_run__', options],
0188 callback=ML.callBack)
0189 pool.close()
0190 pool.join()
0191 else:
0192
0193
0194 global loop
0195 loop = runLoop( comp, outDir, cfg.config, options )
0196 return loop