File indexing completed on 2024-11-25 02:29:50
0001
0002
0003
0004 from builtins import range
0005 import os
0006 import sys
0007 import imp
0008 import logging
0009 import pprint
0010 from math import ceil
0011 from .event import Event
0012 import timeit
0013 import resource
0014 import json
0015
0016 class Setup(object):
0017 '''The Looper creates a Setup object to hold information relevant during
0018 the whole process, such as the process configuration obtained from
0019 the configuration file, or services that can be used by several analyzers.
0020
0021 The user may freely attach new information to the setup object,
0022 as long as this information is relevant during the whole process.
0023 If the information is event specific, it should be attached to the event
0024 object instead.
0025 '''
0026 def __init__(self, config, services):
0027 '''
0028 Create a Setup object.
0029
0030 parameters:
0031
0032 config: configuration object from the configuration file
0033
0034 services: dictionary of services indexed by service name.
0035 The service name has the form classObject_instanceLabel
0036 as in this example:
0037 <base_heppy_path>.framework.services.tfile.TFileService_myhists
0038 To find out about the service name of a given service,
0039 load your configuration file in python, and print the service.
0040 '''
0041 self.config = config
0042 self.services = services
0043
0044 def close(self):
0045 '''Stop all services'''
0046 for service in self.services.values():
0047 service.stop()
0048
0049
0050 class Looper(object):
0051 """Creates a set of analyzers, and schedules the event processing."""
0052
0053 def __init__( self, name,
0054 config,
0055 nEvents=None,
0056 firstEvent=0,
0057 nPrint=0,
0058 timeReport=False,
0059 quiet=False,
0060 memCheckFromEvent=-1):
0061 """Handles the processing of an event sample.
0062 An Analyzer is built for each Config.Analyzer present
0063 in sequence. The Looper can then be used to process an event,
0064 or a collection of events.
0065
0066 Parameters:
0067 name : name of the Looper, will be used as the output directory name
0068 config : process configuration information, see Config
0069 nEvents : number of events to process. Defaults to all.
0070 firstEvent : first event to process. Defaults to the first one.
0071 nPrint : number of events to print at the beginning
0072 """
0073
0074 self.config = config
0075 self.name = self._prepareOutput(name)
0076 self.outDir = self.name
0077 self.logger = logging.getLogger( self.name )
0078 self.logger.addHandler(logging.FileHandler('/'.join([self.name,
0079 'log.txt'])))
0080 self.logger.propagate = False
0081 if not quiet:
0082 self.logger.addHandler( logging.StreamHandler(sys.stdout) )
0083
0084 self.cfg_comp = config.components[0]
0085 self.classes = {}
0086 self.analyzers = map( self._build, config.sequence )
0087 self.nEvents = nEvents
0088 self.firstEvent = firstEvent
0089 self.nPrint = int(nPrint)
0090 self.timeReport = [ {'time':0.0,'events':0} for a in self.analyzers ] if timeReport else False
0091 self.memReportFirstEvent = memCheckFromEvent
0092 self.memLast=0
0093 tree_name = None
0094 if( hasattr(self.cfg_comp, 'tree_name') ):
0095 tree_name = self.cfg_comp.tree_name
0096 if len(self.cfg_comp.files)==0:
0097 errmsg = 'please provide at least an input file in the files attribute of this component\n' + str(self.cfg_comp)
0098 raise ValueError( errmsg )
0099 if hasattr(config,"preprocessor") and config.preprocessor is not None :
0100 self.cfg_comp = config.preprocessor.run(self.cfg_comp,self.outDir,firstEvent,nEvents)
0101 if hasattr(self.cfg_comp,"options"):
0102 print(self.cfg_comp.files,self.cfg_comp.options)
0103 self.events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
0104 else :
0105 self.events = config.events_class(self.cfg_comp.files, tree_name)
0106 if hasattr(self.cfg_comp, 'fineSplit'):
0107 fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
0108 if fineSplitFactor > 1:
0109 if len(self.cfg_comp.files) != 1:
0110 raise RuntimeError("Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
0111 totevents = min(len(self.events),int(nEvents)) if (nEvents and int(nEvents) not in [-1,0]) else len(self.events)
0112 self.nEvents = int(ceil(totevents/float(fineSplitFactor)))
0113 self.firstEvent = firstEvent + fineSplitIndex * self.nEvents
0114 if self.firstEvent + self.nEvents >= totevents:
0115 self.nEvents = totevents - self.firstEvent
0116
0117
0118 self.event = None
0119 services = dict()
0120 for cfg_serv in config.services:
0121 service = self._build(cfg_serv)
0122 services[cfg_serv.name] = service
0123
0124
0125
0126 self.setup = Setup(config, services)
0127
0128 def _build(self, cfg):
0129 theClass = cfg.class_object
0130 obj = theClass( cfg, self.cfg_comp, self.outDir )
0131 return obj
0132
0133 def _prepareOutput(self, name):
0134 index = 0
0135 tmpname = name
0136 while True and index < 2000:
0137 try:
0138
0139 os.mkdir( tmpname )
0140 break
0141 except OSError:
0142 index += 1
0143 tmpname = '%s_%d' % (name, index)
0144 if index == 2000:
0145 raise ValueError( "More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
0146 return tmpname
0147
0148
0149 def loop(self):
0150 """Loop on a given number of events.
0151
0152 At the beginning of the loop,
0153 Analyzer.beginLoop is called for each Analyzer.
0154 At each event, self.process is called.
0155 At the end of the loop, Analyzer.endLoop is called.
0156 """
0157 nEvents = self.nEvents
0158 firstEvent = self.firstEvent
0159 iEv = firstEvent
0160 if nEvents is None or int(nEvents) > len(self.events) :
0161 nEvents = len(self.events)
0162 else:
0163 nEvents = int(nEvents)
0164 eventSize = nEvents
0165 self.logger.info(
0166 'starting loop at event {firstEvent} '\
0167 'to process {eventSize} events.'.format(firstEvent=firstEvent,
0168 eventSize=eventSize))
0169 self.logger.info( str( self.cfg_comp ) )
0170 for analyzer in self.analyzers:
0171 analyzer.beginLoop(self.setup)
0172 try:
0173 for iEv in range(firstEvent, firstEvent+eventSize):
0174
0175
0176 if iEv%100 ==0:
0177
0178 if not hasattr(self,'start_time'):
0179 print('event', iEv)
0180 self.start_time = timeit.default_timer()
0181 self.start_time_event = iEv
0182 else:
0183 print('event %d (%.1f ev/s)' % (iEv, (iEv-self.start_time_event)/float(timeit.default_timer() - self.start_time)))
0184
0185 self.process( iEv )
0186 if iEv<self.nPrint:
0187 print(self.event)
0188
0189 except UserWarning:
0190 print('Stopped loop following a UserWarning exception')
0191
0192 info = self.logger.info
0193 warning = self.logger.warning
0194 warning('number of events processed: {nEv}'.format(nEv=iEv+1))
0195 warning('')
0196 info( self.cfg_comp )
0197 info('')
0198 for analyzer in self.analyzers:
0199 analyzer.endLoop(self.setup)
0200 if self.timeReport:
0201 allev = max([x['events'] for x in self.timeReport])
0202 warning("\n ---- TimeReport (all times in ms; first evt is skipped) ---- ")
0203 warning("%9s %9s %9s %9s %6s %s" % ("processed","all evts","time/proc", " time/all", " [%] ", "analyer"))
0204 warning("%9s %9s %9s %9s %6s %s" % ("---------","--------","---------", "---------", " -----", "-------------"))
0205 sumtime = sum(rep['time'] for rep in self.timeReport)
0206 passev = self.timeReport[-1]['events']
0207 for ana,rep in zip(self.analyzers,self.timeReport):
0208 timePerProcEv = rep['time']/(rep['events']-1) if rep['events'] > 1 else 0
0209 timePerAllEv = rep['time']/(allev-1) if allev > 1 else 0
0210 fracAllEv = rep['time']/sumtime
0211 warning( "%9d %9d %10.2f %10.2f %5.1f%% %s" % ( rep['events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
0212 totPerProcEv = sumtime/(passev-1) if passev > 1 else 0
0213 totPerAllEv = sumtime/(allev-1) if allev > 1 else 0
0214 warning("%9s %9s %9s %9s %s" % ("---------","--------","---------", "---------", "-------------"))
0215 warning("%9d %9d %10.2f %10.2f %5.1f%% %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0, "TOTAL"))
0216 warning("")
0217 if hasattr(self.events, 'endLoop'): self.events.endLoop()
0218 if hasattr(self.config,"preprocessor") and self.config.preprocessor is not None:
0219 if hasattr(self.config.preprocessor,"endLoop"):
0220 self.config.preprocessor.endLoop(self.cfg_comp)
0221
0222 def process(self, iEv ):
0223 """Run event processing for all analyzers in the sequence.
0224
0225 This function is called by self.loop,
0226 but can also be called directly from
0227 the python interpreter, to jump to a given event.
0228 """
0229 self.event = Event(iEv, self.events[iEv], self.setup)
0230 self.iEvent = iEv
0231 for i,analyzer in enumerate(self.analyzers):
0232 if not analyzer.beginLoopCalled:
0233 analyzer.beginLoop(self.setup)
0234 start = timeit.default_timer()
0235 if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
0236 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
0237 if memNow > self.memLast :
0238 print("Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
0239 self.memLast=memNow
0240 ret = analyzer.process( self.event )
0241 if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:
0242 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
0243 if memNow > self.memLast :
0244 print("Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
0245 self.memLast=memNow
0246 if self.timeReport:
0247 self.timeReport[i]['events'] += 1
0248 if self.timeReport[i]['events'] > 0:
0249 self.timeReport[i]['time'] += timeit.default_timer() - start
0250 if ret == False:
0251 return (False, analyzer.name)
0252 if iEv<self.nPrint:
0253 self.logger.info( self.event.__str__() )
0254 return (True, analyzer.name)
0255
0256 def write(self):
0257 """Writes all analyzers.
0258
0259 See Analyzer.Write for more information.
0260 """
0261 for analyzer in self.analyzers:
0262 analyzer.write(self.setup)
0263 self.setup.close()
0264
0265
0266 if __name__ == '__main__':
0267
0268 import pickle
0269 import sys
0270 import os
0271 from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
0272 from optparse import OptionParser
0273 parser = OptionParser(usage='%prog cfgFileName compFileName [--options=optFile.json]')
0274 parser.add_option('--options',dest='options',default='',help='options json file')
0275 (options,args) = parser.parse_args()
0276
0277 if options.options!='':
0278 jsonfilename = options.options
0279 jfile = open (jsonfilename, 'r')
0280 opts=json.loads(jfile.readline())
0281 for k,v in opts.items():
0282 _heppyGlobalOptions[k]=v
0283 jfile.close()
0284
0285 if len(args) == 1 :
0286 cfgFileName = args[0]
0287 pckfile = open( cfgFileName, 'r' )
0288 config = pickle.load( pckfile )
0289 comp = config.components[0]
0290 events_class = config.events_class
0291 elif len(args) == 2 :
0292 cfgFileName = args[0]
0293 file = open( cfgFileName, 'r' )
0294 cfg = imp.load_source( 'cfg', cfgFileName, file)
0295 compFileName = args[1]
0296 pckfile = open( compFileName, 'r' )
0297 comp = pickle.load( pckfile )
0298 cfg.config.components=[comp]
0299 events_class = cfg.config.events_class
0300
0301 looper = Looper( 'Loop', cfg.config,nPrint = 5)
0302 looper.loop()
0303 looper.write()
0304