Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:23:30

0001 from __future__ import print_function
0002 from __future__ import absolute_import
0003 # Copyright (C) 2014 Colin Bernet
0004 # https://github.com/cbernet/heppy/blob/master/LICENSE
0005 
0006 from builtins import range
0007 import os
0008 import sys
0009 import imp
0010 import logging
0011 import pprint
0012 from math import ceil
0013 from .event import Event
0014 import timeit
0015 import resource
0016 import json
0017 
0018 class Setup(object):
0019     '''The Looper creates a Setup object to hold information relevant during 
0020     the whole process, such as the process configuration obtained from 
0021     the configuration file, or services that can be used by several analyzers.
0022 
0023     The user may freely attach new information to the setup object, 
0024     as long as this information is relevant during the whole process. 
0025     If the information is event specific, it should be attached to the event 
0026     object instead.
0027     ''' 
0028     def __init__(self, config, services):
0029         '''
0030         Create a Setup object. 
0031         
0032         parameters: 
0033         
0034         config: configuration object from the configuration file
0035         
0036         services: dictionary of services indexed by service name.
0037         The service name has the form classObject_instanceLabel 
0038         as in this example: 
0039         <base_heppy_path>.framework.services.tfile.TFileService_myhists
0040         To find out about the service name of a given service, 
0041         load your configuration file in python, and print the service. 
0042         '''
0043         self.config = config
0044         self.services = services
0045         
0046     def close(self):
0047         '''Stop all services'''
0048         for service in self.services.values():
0049             service.stop()
0050         
0051 
0052 class Looper(object):
0053     """Creates a set of analyzers, and schedules the event processing."""
0054 
0055     def __init__( self, name,
0056                   config, 
0057                   nEvents=None,
0058                   firstEvent=0,
0059                   nPrint=0,
0060                   timeReport=False,
0061                   quiet=False,
0062                   memCheckFromEvent=-1):
0063         """Handles the processing of an event sample.
0064         An Analyzer is built for each Config.Analyzer present
0065         in sequence. The Looper can then be used to process an event,
0066         or a collection of events.
0067 
0068         Parameters:
0069         name    : name of the Looper, will be used as the output directory name
0070         config  : process configuration information, see Config
0071         nEvents : number of events to process. Defaults to all.
0072         firstEvent : first event to process. Defaults to the first one.
0073         nPrint  : number of events to print at the beginning
0074         """
0075 
0076         self.config = config
0077         self.name = self._prepareOutput(name)
0078         self.outDir = self.name
0079         self.logger = logging.getLogger( self.name )
0080         self.logger.addHandler(logging.FileHandler('/'.join([self.name,
0081                                                              'log.txt'])))
0082         self.logger.propagate = False
0083         if not quiet: 
0084             self.logger.addHandler( logging.StreamHandler(sys.stdout) )
0085 
0086         self.cfg_comp = config.components[0]
0087         self.classes = {}
0088         self.analyzers = map( self._build, config.sequence )
0089         self.nEvents = nEvents
0090         self.firstEvent = firstEvent
0091         self.nPrint = int(nPrint)
0092         self.timeReport = [ {'time':0.0,'events':0} for a in self.analyzers ] if timeReport else False
0093         self.memReportFirstEvent = memCheckFromEvent
0094         self.memLast=0
0095         tree_name = None
0096         if( hasattr(self.cfg_comp, 'tree_name') ):
0097             tree_name = self.cfg_comp.tree_name
0098         if len(self.cfg_comp.files)==0:
0099             errmsg = 'please provide at least an input file in the files attribute of this component\n' + str(self.cfg_comp)
0100             raise ValueError( errmsg )
0101         if hasattr(config,"preprocessor") and config.preprocessor is not None :
0102               self.cfg_comp = config.preprocessor.run(self.cfg_comp,self.outDir,firstEvent,nEvents)
0103         if hasattr(self.cfg_comp,"options"):
0104               print(self.cfg_comp.files,self.cfg_comp.options)
0105               self.events = config.events_class(self.cfg_comp.files, tree_name,options=self.cfg_comp.options)
0106         else :
0107               self.events = config.events_class(self.cfg_comp.files, tree_name)
0108         if hasattr(self.cfg_comp, 'fineSplit'):
0109             fineSplitIndex, fineSplitFactor = self.cfg_comp.fineSplit
0110             if fineSplitFactor > 1:
0111                 if len(self.cfg_comp.files) != 1:
0112                     raise RuntimeError("Any component with fineSplit > 1 is supposed to have just a single file, while %s has %s" % (self.cfg_comp.name, self.cfg_comp.files))
0113                 totevents = min(len(self.events),int(nEvents)) if (nEvents and int(nEvents) not in [-1,0]) else len(self.events)
0114                 self.nEvents = int(ceil(totevents/float(fineSplitFactor)))
0115                 self.firstEvent = firstEvent + fineSplitIndex * self.nEvents
0116                 if self.firstEvent + self.nEvents >= totevents:
0117                     self.nEvents = totevents - self.firstEvent 
0118                 #print "For component %s will process %d events starting from the %d one, ending at %d excluded" % (self.cfg_comp.name, self.nEvents, self.firstEvent, self.nEvents + self.firstEvent)
0119         # self.event is set in self.process
0120         self.event = None
0121         services = dict()
0122         for cfg_serv in config.services:
0123             service = self._build(cfg_serv)
0124             services[cfg_serv.name] = service
0125         # would like to provide a copy of the config to the setup,
0126         # so that analyzers cannot modify the config of other analyzers. 
0127         # but cannot copy the autofill config.
0128         self.setup = Setup(config, services)
0129 
0130     def _build(self, cfg):
0131         theClass = cfg.class_object
0132         obj = theClass( cfg, self.cfg_comp, self.outDir )
0133         return obj
0134         
0135     def _prepareOutput(self, name):
0136         index = 0
0137         tmpname = name
0138         while True and index < 2000:
0139             try:
0140                 # print 'mkdir', self.name
0141                 os.mkdir( tmpname )
0142                 break
0143             except OSError:
0144                 index += 1
0145                 tmpname = '%s_%d' % (name, index)
0146         if index == 2000:
0147               raise ValueError( "More than 2000 output folder with same name or 2000 attempts failed, please clean-up, change name or check permissions")
0148         return tmpname
0149 
0150 
0151     def loop(self):
0152         """Loop on a given number of events.
0153 
0154         At the beginning of the loop, 
0155         Analyzer.beginLoop is called for each Analyzer.
0156         At each event, self.process is called.
0157         At the end of the loop, Analyzer.endLoop is called.
0158         """
0159         nEvents = self.nEvents
0160         firstEvent = self.firstEvent
0161         iEv = firstEvent
0162         if nEvents is None or int(nEvents) > len(self.events) :
0163             nEvents = len(self.events)
0164         else:
0165             nEvents = int(nEvents)
0166         eventSize = nEvents
0167         self.logger.info(
0168             'starting loop at event {firstEvent} '\
0169                 'to process {eventSize} events.'.format(firstEvent=firstEvent,
0170                                                         eventSize=eventSize))
0171         self.logger.info( str( self.cfg_comp ) )
0172         for analyzer in self.analyzers:
0173             analyzer.beginLoop(self.setup)
0174         try:
0175             for iEv in range(firstEvent, firstEvent+eventSize):
0176                 # if iEv == nEvents:
0177                 #     break
0178                 if iEv%100 ==0:
0179                     # print 'event', iEv
0180                     if not hasattr(self,'start_time'):
0181                         print('event', iEv)
0182                         self.start_time = timeit.default_timer()
0183                         self.start_time_event = iEv
0184                     else:
0185                         print('event %d (%.1f ev/s)' % (iEv, (iEv-self.start_time_event)/float(timeit.default_timer() - self.start_time)))
0186 
0187                 self.process( iEv )
0188                 if iEv<self.nPrint:
0189                     print(self.event)
0190 
0191         except UserWarning:
0192             print('Stopped loop following a UserWarning exception')
0193 
0194         info = self.logger.info
0195         warning = self.logger.warning
0196         warning('number of events processed: {nEv}'.format(nEv=iEv+1))
0197         warning('')
0198         info( self.cfg_comp )
0199         info('')        
0200         for analyzer in self.analyzers:
0201             analyzer.endLoop(self.setup)
0202         if self.timeReport:
0203             allev = max([x['events'] for x in self.timeReport])
0204             warning("\n      ---- TimeReport (all times in ms; first evt is skipped) ---- ")
0205             warning("%9s   %9s    %9s   %9s %6s   %s" % ("processed","all evts","time/proc", " time/all", "  [%] ", "analyer"))
0206             warning("%9s   %9s    %9s   %9s %6s   %s" % ("---------","--------","---------", "---------", " -----", "-------------"))
0207             sumtime = sum(rep['time'] for rep in self.timeReport)
0208             passev  = self.timeReport[-1]['events']
0209             for ana,rep in zip(self.analyzers,self.timeReport):
0210                 timePerProcEv = rep['time']/(rep['events']-1) if rep['events'] > 1 else 0
0211                 timePerAllEv  = rep['time']/(allev-1)         if allev > 1         else 0
0212                 fracAllEv     = rep['time']/sumtime
0213                 warning( "%9d   %9d   %10.2f  %10.2f %5.1f%%   %s" % ( rep['events'], allev, 1000*timePerProcEv, 1000*timePerAllEv, 100.0*fracAllEv, ana.name))
0214             totPerProcEv = sumtime/(passev-1) if passev > 1 else 0
0215             totPerAllEv  = sumtime/(allev-1)  if allev > 1  else 0
0216             warning("%9s   %9s    %9s   %9s   %s" % ("---------","--------","---------", "---------", "-------------"))
0217             warning("%9d   %9d   %10.2f  %10.2f %5.1f%%   %s" % ( passev, allev, 1000*totPerProcEv, 1000*totPerAllEv, 100.0, "TOTAL"))
0218             warning("")
0219         if hasattr(self.events, 'endLoop'): self.events.endLoop()
0220         if hasattr(self.config,"preprocessor") and self.config.preprocessor is not None:
0221               if hasattr(self.config.preprocessor,"endLoop"):
0222                   self.config.preprocessor.endLoop(self.cfg_comp)
0223 
0224     def process(self, iEv ):
0225         """Run event processing for all analyzers in the sequence.
0226 
0227         This function is called by self.loop,
0228         but can also be called directly from
0229         the python interpreter, to jump to a given event.
0230         """
0231         self.event = Event(iEv, self.events[iEv], self.setup)
0232         self.iEvent = iEv
0233         for i,analyzer in enumerate(self.analyzers):
0234             if not analyzer.beginLoopCalled:
0235                 analyzer.beginLoop(self.setup)
0236             start = timeit.default_timer()
0237             if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:           
0238                 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
0239                 if memNow > self.memLast :
0240                    print("Mem Jump detected before analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
0241                 self.memLast=memNow
0242             ret = analyzer.process( self.event )
0243             if self.memReportFirstEvent >=0 and iEv >= self.memReportFirstEvent:           
0244                 memNow=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
0245                 if memNow > self.memLast :
0246                    print("Mem Jump detected in analyzer %s at event %s. RSS(before,after,difference) %s %s %s "%( analyzer.name, iEv, self.memLast, memNow, memNow-self.memLast))
0247                 self.memLast=memNow
0248             if self.timeReport:
0249                 self.timeReport[i]['events'] += 1
0250                 if self.timeReport[i]['events'] > 0:
0251                     self.timeReport[i]['time'] += timeit.default_timer() - start
0252             if ret == False:
0253                 return (False, analyzer.name)
0254         if iEv<self.nPrint:
0255             self.logger.info( self.event.__str__() )
0256         return (True, analyzer.name)
0257 
0258     def write(self):
0259         """Writes all analyzers.
0260 
0261         See Analyzer.Write for more information.
0262         """
0263         for analyzer in self.analyzers:
0264             analyzer.write(self.setup)
0265         self.setup.close() 
0266 
0267 
0268 if __name__ == '__main__':
0269 
0270     import pickle
0271     import sys
0272     import os
0273     from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
0274     from optparse import OptionParser
0275     parser = OptionParser(usage='%prog cfgFileName compFileName [--options=optFile.json]')
0276     parser.add_option('--options',dest='options',default='',help='options json file')
0277     (options,args) = parser.parse_args()
0278 
0279     if options.options!='':
0280         jsonfilename = options.options
0281         jfile = open (jsonfilename, 'r')
0282         opts=json.loads(jfile.readline())
0283         for k,v in opts.items():
0284             _heppyGlobalOptions[k]=v
0285         jfile.close()
0286 
0287     if len(args) == 1 :
0288         cfgFileName = args[0]
0289         pckfile = open( cfgFileName, 'r' )
0290         config = pickle.load( pckfile )
0291         comp = config.components[0]
0292         events_class = config.events_class
0293     elif len(args) == 2 :
0294         cfgFileName = args[0]
0295         file = open( cfgFileName, 'r' )
0296         cfg = imp.load_source( 'cfg', cfgFileName, file)
0297         compFileName = args[1]
0298         pckfile = open( compFileName, 'r' )
0299         comp = pickle.load( pckfile )
0300         cfg.config.components=[comp]
0301         events_class = cfg.config.events_class
0302 
0303     looper = Looper( 'Loop', cfg.config,nPrint = 5)
0304     looper.loop()
0305     looper.write()
0306