Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:50

0001 # Copyright (C) 2014 Colin Bernet
0002 # https://github.com/cbernet/heppy/blob/master/LICENSE
0003 
0004 from builtins import range
0005 import os
0006 import sys
0007 import imp
0008 import logging
0009 import pprint
0010 from math import ceil
0011 from .event import Event
0012 import timeit
0013 import resource
0014 import json
0015 
0016 class Setup(object):
0017     '''The Looper creates a Setup object to hold information relevant during 
0018     the whole process, such as the process configuration obtained from 
0019     the configuration file, or services that can be used by several analyzers.
0020 
0021     The user may freely attach new information to the setup object, 
0022     as long as this information is relevant during the whole process. 
0023     If the information is event specific, it should be attached to the event 
0024     object instead.
0025     ''' 
0026     def __init__(self, config, services):
0027         '''
0028         Create a Setup object. 
0029         
0030         parameters: 
0031         
0032         config: configuration object from the configuration file
0033         
0034         services: dictionary of services indexed by service name.
0035         The service name has the form classObject_instanceLabel 
0036         as in this example: 
0037         <base_heppy_path>.framework.services.tfile.TFileService_myhists
0038         To find out about the service name of a given service, 
0039         load your configuration file in python, and print the service. 
0040         '''
0041         self.config = config
0042         self.services = services
0043         
0044     def close(self):
0045         '''Stop all services'''
0046         for service in self.services.values():
0047             service.stop()
0048         
0049 
0050 class Looper(object):
0051     """Creates a set of analyzers, and schedules the event processing."""
0052 
0053     def __init__( self, name,
0054                   config, 
0055                   nEvents=None,
0056                   firstEvent=0,
0057                   nPrint=0,
0058                   timeReport=False,
0059                   quiet=False,
0060                   memCheckFromEvent=-1):
0061         """Handles the processing of an event sample.
0062         An Analyzer is built for each Config.Analyzer present
0063         in sequence. The Looper can then be used to process an event,
0064         or a collection of events.
0065 
0066         Parameters:
0067         name    : name of the Looper, will be used as the output directory name
0068         config  : process configuration information, see Config
0069         nEvents : number of events to process. Defaults to all.
0070         firstEvent : first event to process. Defaults to the first one.
0071         nPrint  : number of events to print at the beginning
0072         """
0073 
0074         self.config = config
0075         self.name = self._prepareOutput(name)
0076         self.outDir = self.name
0077         self.logger = logging.getLogger( self.name )
0078         self.logger.addHandler(logging.FileHandler('/'.join([self.name,
0079                                                              'log.txt'])))
0080         self.logger.propagate = False
0081         if not quiet: 
0082             self.logger.addHandler( logging.StreamHandler(sys.stdout) )
0083 
0084         self.cfg_comp = config.components[0]
0085         self.classes = {}
0086         self.analyzers = map( self._build, config.sequence )
0087         self.nEvents = nEvents
0088         self.firstEvent = firstEvent
0089         self.nPrint = int(nPrint)
0090         self.timeReport = [ {'time':0.0,'events':0} for a in self.analyzers ] if timeReport else False
0091         self.memReportFirstEvent = memCheckFromEvent
0092         self.memLast=0
0093         tree_name = None
0094         if( hasattr(self.cfg_comp, 'tree_name') ):
0095             tree_name = self.cfg_comp.tree_name
0096         if len(self.cfg_comp.files)==0:
0097             errmsg = 'please provide at least an input file in the files attribute of this component\n' + str(self.cfg_comp)
0098             raise ValueError( errmsg )
0099         if hasattr(config,"preprocessor") and config.preprocessor is not None :
0100               self.cfg_comp = config.preprocessor.run(self.cfg_comp,self.outDir,firstEvent,nEvents)
0101         if hasattr(self.cfg_comp,"options"):
0102               print(self.cfg_comp.files,self.cfg_comp.options)
0103               self.events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
0104         else :
0105               self.events = config.events_class(self.cfg_comp.files, tree_name)
0106         if hasattr(self.cfg_comp, 'fineSplit'):
0107             fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
0108             if fineSplitFactor > 1:
0109                 if len(self.cfg_comp.files) != 1:
0110                     raise RuntimeError("Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
0111                 totevents = min(len(self.events),int(nEvents)) if (nEvents and int(nEvents) not in [-1,0]) else len(self.events)
0112                 self.nEvents = int(ceil(totevents/float(fineSplitFactor)))
0113                 self.firstEvent = firstEvent + fineSplitIndex * self.nEvents
0114                 if self.firstEvent + self.nEvents >= totevents:
0115                     self.nEvents = totevents - self.firstEvent 
0116                 #print "For component %s will process %d events starting from the %d one, ending at %d excluded" % (self.cfg_comp.name, self.nEvents, self.firstEvent, self.nEvents + self.firstEvent)
0117         # self.event is set in self.process
0118         self.event = None
0119         services = dict()
0120         for cfg_serv in config.services:
0121             service = self._build(cfg_serv)
0122             services[cfg_serv.name] = service
0123         # would like to provide a copy of the config to the setup,
0124         # so that analyzers cannot modify the config of other analyzers. 
0125         # but cannot copy the autofill config.
0126         self.setup = Setup(config, services)
0127 
0128     def _build(self, cfg):
0129         theClass = cfg.class_object
0130         obj = theClass( cfg, self.cfg_comp, self.outDir )
0131         return obj
0132         
0133     def _prepareOutput(self, name):
0134         index = 0
0135         tmpname = name
0136         while True and index < 2000:
0137             try:
0138                 # print 'mkdir', self.name
0139                 os.mkdir( tmpname )
0140                 break
0141             except OSError:
0142                 index += 1
0143                 tmpname = '%s_%d' % (name, index)
0144         if index == 2000:
0145               raise ValueError( "More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
0146         return tmpname
0147 
0148 
0149     def loop(self):
0150         """Loop on a given number of events.
0151 
0152         At the beginning of the loop, 
0153         Analyzer.beginLoop is called for each Analyzer.
0154         At each event, self.process is called.
0155         At the end of the loop, Analyzer.endLoop is called.
0156         """
0157         nEvents = self.nEvents
0158         firstEvent = self.firstEvent
0159         iEv = firstEvent
0160         if nEvents is None or int(nEvents) > len(self.events) :
0161             nEvents = len(self.events)
0162         else:
0163             nEvents = int(nEvents)
0164         eventSize = nEvents
0165         self.logger.info(
0166             'starting loop at event {firstEvent} '\
0167                 'to process {eventSize} events.'.format(firstEvent=firstEvent,
0168                                                         eventSize=eventSize))
0169         self.logger.info( str( self.cfg_comp ) )
0170         for analyzer in self.analyzers:
0171             analyzer.beginLoop(self.setup)
0172         try:
0173             for iEv in range(firstEvent, firstEvent+eventSize):
0174                 # if iEv == nEvents:
0175                 #     break
0176                 if iEv%100 ==0:
0177                     # print 'event', iEv
0178                     if not hasattr(self,'start_time'):
0179                         print('event', iEv)
0180                         self.start_time = timeit.default_timer()
0181                         self.start_time_event = iEv
0182                     else:
0183                         print('event %d (%.1f ev/s)' % (iEv, (iEv-self.start_time_event)/float(timeit.default_timer() - self.start_time)))
0184 
0185                 self.process( iEv )
0186                 if iEv<self.nPrint:
0187                     print(self.event)
0188 
0189         except UserWarning:
0190             print('Stopped loop following a UserWarning exception')
0191 
0192         info = self.logger.info
0193         warning = self.logger.warning
0194         warning('number of events processed: {nEv}'.format(nEv=iEv+1))
0195         warning('')
0196         info( self.cfg_comp )
0197         info('')        
0198         for analyzer in self.analyzers:
0199             analyzer.endLoop(self.setup)
0200         if self.timeReport:
0201             allev = max([x['events'] for x in self.timeReport])
0202             warning("\n      ---- TimeReport (all times in ms; first evt is skipped) ---- ")
0203             warning("%9s   %9s    %9s   %9s %6s   %s" % ("processed","all evts","time/proc", " time/all", "  [%] ", "analyer"))
0204             warning("%9s   %9s    %9s   %9s %6s   %s" % ("---------","--------","---------", "---------", " -----", "-------------"))
0205             sumtime = sum(rep['time'] for rep in self.timeReport)
0206             passev  = self.timeReport[-1]['events']
0207             for ana,rep in zip(self.analyzers,self.timeReport):
0208                 timePerProcEv = rep['time']/(rep['events']-1) if rep['events'] > 1 else 0
0209                 timePerAllEv  = rep['time']/(allev-1)         if allev > 1         else 0
0210                 fracAllEv     = rep['time']/sumtime
0211                 warning( "%9d   %9d   %10.2f  %10.2f %5.1f%%   %s" % ( rep['events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
0212             totPerProcEv = sumtime/(passev-1) if passev > 1 else 0
0213             totPerAllEv  = sumtime/(allev-1)  if allev > 1  else 0
0214             warning("%9s   %9s    %9s   %9s   %s" % ("---------","--------","---------", "---------", "-------------"))
0215             warning("%9d   %9d   %10.2f  %10.2f %5.1f%%   %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0, "TOTAL"))
0216             warning("")
0217         if hasattr(self.events, 'endLoop'): self.events.endLoop()
0218         if hasattr(self.config,"preprocessor") and self.config.preprocessor is not None:
0219               if hasattr(self.config.preprocessor,"endLoop"):
0220                   self.config.preprocessor.endLoop(self.cfg_comp)
0221 
0222     def process(self, iEv ):
0223         """Run event processing for all analyzers in the sequence.
0224 
0225         This function is called by self.loop,
0226         but can also be called directly from
0227         the python interpreter, to jump to a given event.
0228         """
0229         self.event = Event(iEv, self.events[iEv], self.setup)
0230         self.iEvent = iEv
0231         for i,analyzer in enumerate(self.analyzers):
0232             if not analyzer.beginLoopCalled:
0233                 analyzer.beginLoop(self.setup)
0234             start = timeit.default_timer()
0235             if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:           
0236                 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
0237                 if memNow > self.memLast :
0238                    print("Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
0239                 self.memLast=memNow
0240             ret = analyzer.process( self.event )
0241             if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:           
0242                 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
0243                 if memNow > self.memLast :
0244                    print("Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
0245                 self.memLast=memNow
0246             if self.timeReport:
0247                 self.timeReport[i]['events'] += 1
0248                 if self.timeReport[i]['events'] > 0:
0249                     self.timeReport[i]['time'] += timeit.default_timer() - start
0250             if ret == False:
0251                 return (False, analyzer.name)
0252         if iEv<self.nPrint:
0253             self.logger.info( self.event.__str__() )
0254         return (True, analyzer.name)
0255 
0256     def write(self):
0257         """Writes all analyzers.
0258 
0259         See Analyzer.Write for more information.
0260         """
0261         for analyzer in self.analyzers:
0262             analyzer.write(self.setup)
0263         self.setup.close() 
0264 
0265 
0266 if __name__ == '__main__':
0267 
0268     import pickle
0269     import sys
0270     import os
0271     from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
0272     from optparse import OptionParser
0273     parser = OptionParser(usage='%prog cfgFileName compFileName [--options=optFile.json]')
0274     parser.add_option('--options',dest='options',default='',help='options json file')
0275     (options,args) = parser.parse_args()
0276 
0277     if options.options!='':
0278         jsonfilename = options.options
0279         jfile = open (jsonfilename, 'r')
0280         opts=json.loads(jfile.readline())
0281         for k,v in opts.items():
0282             _heppyGlobalOptions[k]=v
0283         jfile.close()
0284 
0285     if len(args) == 1 :
0286         cfgFileName = args[0]
0287         pckfile = open( cfgFileName, 'r' )
0288         config = pickle.load( pckfile )
0289         comp = config.components[0]
0290         events_class = config.events_class
0291     elif len(args) == 2 :
0292         cfgFileName = args[0]
0293         file = open( cfgFileName, 'r' )
0294         cfg = imp.load_source( 'cfg', cfgFileName, file)
0295         compFileName = args[1]
0296         pckfile = open( compFileName, 'r' )
0297         comp = pickle.load( pckfile )
0298         cfg.config.components=[comp]
0299         events_class = cfg.config.events_class
0300 
0301     looper = Looper( 'Loop', cfg.config,nPrint = 5)
0302     looper.loop()
0303     looper.write()
0304