File indexing completed on 2023-03-17 11:15:50
0001 from __future__ import print_function
0002 from __future__ import absolute_import
0003
0004
0005
0006 from builtins import range
0007 import os
0008 import sys
0009 import imp
0010 import logging
0011 import pprint
0012 from math import ceil
0013 from .event import Event
0014 import timeit
0015 import resource
0016 import json
0017
0018 class Setup(object):
0019 '''The Looper creates a Setup object to hold information relevant during
0020 the whole process, such as the process configuration obtained from
0021 the configuration file, or services that can be used by several analyzers.
0022
0023 The user may freely attach new information to the setup object,
0024 as long as this information is relevant during the whole process.
0025 If the information is event specific, it should be attached to the event
0026 object instead.
0027 '''
0028 def __init__(self, config, services):
0029 '''
0030 Create a Setup object.
0031
0032 parameters:
0033
0034 config: configuration object from the configuration file
0035
0036 services: dictionary of services indexed by service name.
0037 The service name has the form classObject_instanceLabel
0038 as in this example:
0039 <base_heppy_path>.framework.services.tfile.TFileService_myhists
0040 To find out about the service name of a given service,
0041 load your configuration file in python, and print the service.
0042 '''
0043 self.config = config
0044 self.services = services
0045
0046 def close(self):
0047 '''Stop all services'''
0048 for service in self.services.values():
0049 service.stop()
0050
0051
0052 class Looper(object):
0053 """Creates a set of analyzers, and schedules the event processing."""
0054
0055 def __init__( self, name,
0056 config,
0057 nEvents=None,
0058 firstEvent=0,
0059 nPrint=0,
0060 timeReport=False,
0061 quiet=False,
0062 memCheckFromEvent=-1):
0063 """Handles the processing of an event sample.
0064 An Analyzer is built for each Config.Analyzer present
0065 in sequence. The Looper can then be used to process an event,
0066 or a collection of events.
0067
0068 Parameters:
0069 name : name of the Looper, will be used as the output directory name
0070 config : process configuration information, see Config
0071 nEvents : number of events to process. Defaults to all.
0072 firstEvent : first event to process. Defaults to the first one.
0073 nPrint : number of events to print at the beginning
0074 """
0075
0076 self.config = config
0077 self.name = self._prepareOutput(name)
0078 self.outDir = self.name
0079 self.logger = logging.getLogger( self.name )
0080 self.logger.addHandler(logging.FileHandler('/'.join([self.name,
0081 'log.txt'])))
0082 self.logger.propagate = False
0083 if not quiet:
0084 self.logger.addHandler( logging.StreamHandler(sys.stdout) )
0085
0086 self.cfg_comp = config.components[0]
0087 self.classes = {}
0088 self.analyzers = map( self._build, config.sequence )
0089 self.nEvents = nEvents
0090 self.firstEvent = firstEvent
0091 self.nPrint = int(nPrint)
0092 self.timeReport = [ {'time':0.0,'events':0} for a in self.analyzers ] if timeReport else False
0093 self.memReportFirstEvent = memCheckFromEvent
0094 self.memLast=0
0095 tree_name = None
0096 if( hasattr(self.cfg_comp, 'tree_name') ):
0097 tree_name = self.cfg_comp.tree_name
0098 if len(self.cfg_comp.files)==0:
0099 errmsg = 'please provide at least an input file in the files attribute of this component\n' + str(self.cfg_comp)
0100 raise ValueError( errmsg )
0101 if hasattr(config,"preprocessor") and config.preprocessor is not None :
0102 self.cfg_comp = config.preprocessor.run(self.cfg_comp,self.outDir,firstEvent,nEvents)
0103 if hasattr(self.cfg_comp,"options"):
0104 print(self.cfg_comp.files,self.cfg_comp.options)
0105 self.events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
0106 else :
0107 self.events = config.events_class(self.cfg_comp.files, tree_name)
0108 if hasattr(self.cfg_comp, 'fineSplit'):
0109 fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
0110 if fineSplitFactor > 1:
0111 if len(self.cfg_comp.files) != 1:
0112 raise RuntimeError("Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
0113 totevents = min(len(self.events),int(nEvents)) if (nEvents and int(nEvents) not in [-1,0]) else len(self.events)
0114 self.nEvents = int(ceil(totevents/float(fineSplitFactor)))
0115 self.firstEvent = firstEvent + fineSplitIndex * self.nEvents
0116 if self.firstEvent + self.nEvents >= totevents:
0117 self.nEvents = totevents - self.firstEvent
0118
0119
0120 self.event = None
0121 services = dict()
0122 for cfg_serv in config.services:
0123 service = self._build(cfg_serv)
0124 services[cfg_serv.name] = service
0125
0126
0127
0128 self.setup = Setup(config, services)
0129
0130 def _build(self, cfg):
0131 theClass = cfg.class_object
0132 obj = theClass( cfg, self.cfg_comp, self.outDir )
0133 return obj
0134
0135 def _prepareOutput(self, name):
0136 index = 0
0137 tmpname = name
0138 while True and index < 2000:
0139 try:
0140
0141 os.mkdir( tmpname )
0142 break
0143 except OSError:
0144 index += 1
0145 tmpname = '%s_%d' % (name, index)
0146 if index == 2000:
0147 raise ValueError( "More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
0148 return tmpname
0149
0150
0151 def loop(self):
0152 """Loop on a given number of events.
0153
0154 At the beginning of the loop,
0155 Analyzer.beginLoop is called for each Analyzer.
0156 At each event, self.process is called.
0157 At the end of the loop, Analyzer.endLoop is called.
0158 """
0159 nEvents = self.nEvents
0160 firstEvent = self.firstEvent
0161 iEv = firstEvent
0162 if nEvents is None or int(nEvents) > len(self.events) :
0163 nEvents = len(self.events)
0164 else:
0165 nEvents = int(nEvents)
0166 eventSize = nEvents
0167 self.logger.info(
0168 'starting loop at event {firstEvent} '\
0169 'to process {eventSize} events.'.format(firstEvent=firstEvent,
0170 eventSize=eventSize))
0171 self.logger.info( str( self.cfg_comp ) )
0172 for analyzer in self.analyzers:
0173 analyzer.beginLoop(self.setup)
0174 try:
0175 for iEv in range(firstEvent, firstEvent+eventSize):
0176
0177
0178 if iEv%100 ==0:
0179
0180 if not hasattr(self,'start_time'):
0181 print('event', iEv)
0182 self.start_time = timeit.default_timer()
0183 self.start_time_event = iEv
0184 else:
0185 print('event %d (%.1f ev/s)' % (iEv, (iEv-self.start_time_event)/float(timeit.default_timer() - self.start_time)))
0186
0187 self.process( iEv )
0188 if iEv<self.nPrint:
0189 print(self.event)
0190
0191 except UserWarning:
0192 print('Stopped loop following a UserWarning exception')
0193
0194 info = self.logger.info
0195 warning = self.logger.warning
0196 warning('number of events processed: {nEv}'.format(nEv=iEv+1))
0197 warning('')
0198 info( self.cfg_comp )
0199 info('')
0200 for analyzer in self.analyzers:
0201 analyzer.endLoop(self.setup)
0202 if self.timeReport:
0203 allev = max([x['events'] for x in self.timeReport])
0204 warning("\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
0205 warning("%9s %9s %9s %9s %6s %s" % ("processed","all evts","time/proc", " time/all", " [%] ", "analyer"))
0206 warning("%9s %9s %9s %9s %6s %s" % ("---------","--------","---------", "---------", " -----", "-------------"))
0207 sumtime = sum(rep['time'] for rep in self.timeReport)
0208 passev = self.timeReport[-1]['events']
0209 for ana,rep in zip(self.analyzers,self.timeReport):
0210 timePerProcEv = rep['time']/(rep['events']-1) if rep['events'] > 1 else 0
0211 timePerAllEv = rep['time']/(allev-1) if allev > 1 else 0
0212 fracAllEv = rep['time']/sumtime
0213 warning( "%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep['events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
0214 totPerProcEv = sumtime/(passev-1) if passev > 1 else 0
0215 totPerAllEv = sumtime/(allev-1) if allev > 1 else 0
0216 warning("%9s %9s %9s %9s %s" % ("---------","--------","---------", "---------", "-------------"))
0217 warning("%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0, "TOTAL"))
0218 warning("")
0219 if hasattr(self.events, 'endLoop'): self.events.endLoop()
0220 if hasattr(self.config,"preprocessor") and self.config.preprocessor is not None:
0221 if hasattr(self.config.preprocessor,"endLoop"):
0222 self.config.preprocessor.endLoop(self.cfg_comp)
0223
0224 def process(self, iEv ):
0225 """Run event processing for all analyzers in the sequence.
0226
0227 This function is called by self.loop,
0228 but can also be called directly from
0229 the python interpreter, to jump to a given event.
0230 """
0231 self.event = Event(iEv, self.events[iEv], self.setup)
0232 self.iEvent = iEv
0233 for i,analyzer in enumerate(self.analyzers):
0234 if not analyzer.beginLoopCalled:
0235 analyzer.beginLoop(self.setup)
0236 start = timeit.default_timer()
0237 if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
0238 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
0239 if memNow > self.memLast :
0240 print("Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
0241 self.memLast=memNow
0242 ret = analyzer.process( self.event )
0243 if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
0244 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
0245 if memNow > self.memLast :
0246 print("Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
0247 self.memLast=memNow
0248 if self.timeReport:
0249 self.timeReport[i]['events'] += 1
0250 if self.timeReport[i]['events'] > 0:
0251 self.timeReport[i]['time'] += timeit.default_timer() - start
0252 if ret == False:
0253 return (False, analyzer.name)
0254 if iEv<self.nPrint:
0255 self.logger.info( self.event.__str__() )
0256 return (True, analyzer.name)
0257
0258 def write(self):
0259 """Writes all analyzers.
0260
0261 See Analyzer.Write for more information.
0262 """
0263 for analyzer in self.analyzers:
0264 analyzer.write(self.setup)
0265 self.setup.close()
0266
0267
0268 if __name__ == '__main__':
0269
0270 import pickle
0271 import sys
0272 import os
0273 from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
0274 from optparse import OptionParser
0275 parser = OptionParser(usage='%prog cfgFileName compFileName [--options=optFile.json]')
0276 parser.add_option('--options',dest='options',default='',help='options json file')
0277 (options,args) = parser.parse_args()
0278
0279 if options.options!='':
0280 jsonfilename = options.options
0281 jfile = open (jsonfilename, 'r')
0282 opts=json.loads(jfile.readline())
0283 for k,v in opts.items():
0284 _heppyGlobalOptions[k]=v
0285 jfile.close()
0286
0287 if len(args) == 1 :
0288 cfgFileName = args[0]
0289 pckfile = open( cfgFileName, 'r' )
0290 config = pickle.load( pckfile )
0291 comp = config.components[0]
0292 events_class = config.events_class
0293 elif len(args) == 2 :
0294 cfgFileName = args[0]
0295 file = open( cfgFileName, 'r' )
0296 cfg = imp.load_source( 'cfg', cfgFileName, file)
0297 compFileName = args[1]
0298 pckfile = open( compFileName, 'r' )
0299 comp = pickle.load( pckfile )
0300 cfg.config.components=[comp]
0301 events_class = cfg.config.events_class
0302
0303 looper = Looper( 'Loop', cfg.config,nPrint = 5)
0304 looper.loop()
0305 looper.write()
0306