File indexing completed on 2023-03-17 11:15:50
0001
0002
0003
0004
0005 from __future__ import print_function
0006 from builtins import range
0007 import os
0008 import shutil
0009 import glob
0010 import sys
0011 import imp
0012 import copy
0013 from multiprocessing import Pool
0014 from pprint import pprint
0015
0016
0017 if "-i" not in sys.argv:
0018 oldv = sys.argv[:]
0019 sys.argv = [ "-b-"]
0020 import ROOT
0021 ROOT.gROOT.SetBatch(True)
0022 sys.argv = oldv
0023
0024
0025 from PhysicsTools.HeppyCore.framework.looper import Looper
0026
0027
0028 loop = None
0029
0030 def callBack( result ):
0031 pass
0032 print('production done:', str(result))
0033
0034 def runLoopAsync(comp, outDir, configName, options):
0035 try:
0036 loop = runLoop( comp, outDir, copy.copy(sys.modules[configName].config), options)
0037 return loop.name
0038 except Exception:
0039 import traceback
0040 print("ERROR processing component %s" % comp.name)
0041 print(comp)
0042 print("STACK TRACE: ")
0043 print(traceback.format_exc())
0044 raise
0045
0046 def runLoop( comp, outDir, config, options):
0047 fullName = '/'.join( [outDir, comp.name ] )
0048
0049 config.components = [comp]
0050 loop = Looper( fullName,
0051 config,
0052 options.nevents, 0,
0053 nPrint = options.nprint,
0054 timeReport = options.timeReport,
0055 quiet = options.quiet)
0056
0057 if options.iEvent is None:
0058 loop.loop()
0059 loop.write()
0060
0061 else:
0062
0063 iEvent = int(options.iEvent)
0064 loop.process( iEvent )
0065 return loop
0066
0067
0068 def createOutputDir(dir, components, force):
0069 '''Creates the output dir, dealing with the case where dir exists.'''
0070 answer = None
0071 try:
0072 os.mkdir(dir)
0073 return True
0074 except OSError:
0075 print('directory %s already exists' % dir)
0076 print('contents: ')
0077 dirlist = [path for path in os.listdir(dir) if os.path.isdir( '/'.join([dir, path]) )]
0078 pprint( dirlist )
0079 print('component list: ')
0080 print([comp.name for comp in components])
0081 if force is True:
0082 print('force mode, continue.')
0083 return True
0084 else:
0085 while answer not in ['Y','y','yes','N','n','no']:
0086 answer = raw_input('Continue? [y/n]')
0087 if answer.lower().startswith('n'):
0088 return False
0089 elif answer.lower().startswith('y'):
0090 return True
0091 else:
0092 raise ValueError( ' '.join(['answer can not have this value!',
0093 answer]) )
0094
0095 def chunks(l, n):
0096 return [l[i:i+n] for i in range(0, len(l), n)]
0097
0098 def split(comps):
0099
0100 splitComps = []
0101 for comp in comps:
0102 if hasattr( comp, 'fineSplitFactor') and comp.fineSplitFactor>1:
0103 subchunks = list(range(comp.fineSplitFactor))
0104 for ichunk, chunk in enumerate([(f,i) for f in comp.files for i in subchunks]):
0105 newComp = copy.deepcopy(comp)
0106 newComp.files = [chunk[0]]
0107 newComp.fineSplit = ( chunk[1], comp.fineSplitFactor )
0108 newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
0109 index=ichunk)
0110 splitComps.append( newComp )
0111 elif hasattr( comp, 'splitFactor') and comp.splitFactor>1:
0112 chunkSize = len(comp.files) / comp.splitFactor
0113 if len(comp.files) % comp.splitFactor:
0114 chunkSize += 1
0115
0116 for ichunk, chunk in enumerate( chunks( comp.files, chunkSize)):
0117 newComp = copy.deepcopy(comp)
0118 newComp.files = chunk
0119 newComp.name = '{name}_Chunk{index}'.format(name=newComp.name,
0120 index=ichunk)
0121 splitComps.append( newComp )
0122 else:
0123 splitComps.append( comp )
0124 return splitComps
0125
0126
0127 _heppyGlobalOptions = {}
0128
0129 def getHeppyOption(name,default=None):
0130 global _heppyGlobalOptions
0131 return _heppyGlobalOptions[name] if name in _heppyGlobalOptions else default
0132 def setHeppyOption(name,value=True):
0133 global _heppyGlobalOptions
0134 _heppyGlobalOptions[name] = value
0135
0136 def main( options, args, parser ):
0137
0138 if len(args) != 2:
0139 parser.print_help()
0140 print('ERROR: please provide the processing name and the component list')
0141 sys.exit(1)
0142
0143 outDir = args[0]
0144 if os.path.exists(outDir) and not os.path.isdir( outDir ):
0145 parser.print_help()
0146 print('ERROR: when it exists, first argument must be a directory.')
0147 sys.exit(2)
0148 cfgFileName = args[1]
0149 if not os.path.isfile( cfgFileName ):
0150 parser.print_help()
0151 print('ERROR: second argument must be an existing file (your input cfg).')
0152 sys.exit(3)
0153
0154 if options.verbose:
0155 import logging
0156 logging.basicConfig(level=logging.INFO)
0157
0158
0159
0160
0161 from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
0162 for opt in options.extraOptions:
0163 if "=" in opt:
0164 (key,val) = opt.split("=",1)
0165 _heppyGlobalOptions[key] = val
0166 else:
0167 _heppyGlobalOptions[opt] = True
0168
0169 file = open( cfgFileName, 'r' )
0170 cfg = imp.load_source( 'PhysicsTools.HeppyCore.__cfg_to_run__', cfgFileName, file)
0171
0172 selComps = [comp for comp in cfg.config.components if len(comp.files)>0]
0173 selComps = split(selComps)
0174
0175
0176 if len(selComps)>options.ntasks:
0177 print("WARNING: too many threads {tnum}, will just use a maximum of {jnum}.".format(tnum=len(selComps),jnum=options.ntasks))
0178 if not createOutputDir(outDir, selComps, options.force):
0179 print('exiting')
0180 sys.exit(0)
0181 if len(selComps)>1:
0182 shutil.copy( cfgFileName, outDir )
0183 pool = Pool(processes=min(len(selComps),options.ntasks))
0184
0185 import PhysicsTools.HeppyCore.framework.heppy_loop as ML
0186 for comp in selComps:
0187 print('submitting', comp.name)
0188 pool.apply_async( ML.runLoopAsync, [comp, outDir, 'PhysicsTools.HeppyCore.__cfg_to_run__', options],
0189 callback=ML.callBack)
0190 pool.close()
0191 pool.join()
0192 else:
0193
0194
0195 global loop
0196 loop = runLoop( comp, outDir, cfg.config, options )
0197 return loop