Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-11 23:31:43

0001 #!/usr/bin/env python3
0002 from PhysicsTools.NanoAODTools.postprocessing.framework.jobreport import JobReport
0003 from PhysicsTools.NanoAODTools.postprocessing.framework.preskimming import preSkim
0004 from PhysicsTools.NanoAODTools.postprocessing.framework.output import FriendOutput, FullOutput
0005 from PhysicsTools.NanoAODTools.postprocessing.framework.eventloop import eventLoop
0006 from PhysicsTools.NanoAODTools.postprocessing.framework.datamodel import InputTree
0007 from PhysicsTools.NanoAODTools.postprocessing.framework.branchselection import BranchSelection
0008 import os
0009 import time
0010 import hashlib
0011 import subprocess
0012 import ROOT
0013 ROOT.PyConfig.IgnoreCommandLineOptions = True
0014 
0015 
0016 class PostProcessor:
0017     def __init__(
0018             self, outputDir, inputFiles, cut=None, branchsel=None, modules=[],
0019             compression="LZMA:9", friend=False, postfix=None, jsonInput=None,
0020             noOut=False, justcount=False, provenance=False, haddFileName=None,
0021             fwkJobReport=False, histFileName=None, histDirName=None,
0022             outputbranchsel=None, maxEntries=None, firstEntry=0, prefetch=False,
0023             longTermCache=False
0024     ):
0025         self.outputDir = outputDir
0026         self.inputFiles = inputFiles
0027         self.cut = cut
0028         self.modules = modules
0029         self.compression = compression
0030         self.postfix = postfix
0031         self.json = jsonInput
0032         self.noOut = noOut
0033         self.friend = friend
0034         self.justcount = justcount
0035         self.provenance = provenance
0036         self.jobReport = JobReport() if fwkJobReport else None
0037         self.haddFileName = haddFileName
0038         self.histFile = None
0039         self.histDirName = None
0040         if self.jobReport and not self.haddFileName:
0041             print("Because you requested a FJR we assume you want the final " \
0042                 "hadd. No name specified for the output file, will use tree.root")
0043             self.haddFileName = "tree.root"
0044         self.branchsel = BranchSelection(branchsel) if branchsel else None
0045         if outputbranchsel is not None:
0046             self.outputbranchsel = BranchSelection(outputbranchsel)
0047         elif outputbranchsel is None and branchsel is not None:
0048             # Use the same branches in the output as in input
0049             self.outputbranchsel = BranchSelection(branchsel)
0050         else:
0051             self.outputbranchsel = None
0052 
0053         self.histFileName = histFileName
0054         self.histDirName = histDirName
0055         # 2^63 - 1, largest int64
0056         self.maxEntries = maxEntries if maxEntries else 9223372036854775807
0057         self.firstEntry = firstEntry
0058         self.prefetch = prefetch  # prefetch files to TMPDIR using xrdcp
0059         # keep cached files across runs (it's then up to you to clean up the temp)
0060         self.longTermCache = longTermCache
0061 
0062     def prefetchFile(self, fname, verbose=True):
0063         tmpdir = os.environ['TMPDIR'] if 'TMPDIR' in os.environ else "/tmp"
0064         if not fname.startswith("root://"):
0065             return fname, False
0066         rndchars = "".join([hex(i)[2:] for i in bytearray(os.urandom(8))]) \
0067             if not self.longTermCache else "long_cache-id%d-%s" \
0068             % (os.getuid(), hashlib.sha1(fname.encode('utf-8')).hexdigest())
0069         localfile = "%s/%s-%s.root" \
0070             % (tmpdir, os.path.basename(fname).replace(".root", ""), rndchars)
0071         if self.longTermCache and os.path.exists(localfile):
0072             if verbose:
0073                 print("Filename %s is already available in local path %s " \
0074                     % (fname, localfile))
0075             return localfile, False
0076         try:
0077             if verbose:
0078                 print("Filename %s is remote, will do a copy to local path %s"\
0079                     % (fname, localfile))
0080             start = time.time()
0081             subprocess.check_output(["xrdcp", "-f", "-N", fname, localfile])
0082             if verbose:
0083                 print("Time used for transferring the file locally: %.2f s"\
0084                     % (time.time() - start))
0085             return localfile, (not self.longTermCache)
0086         except:
0087             if verbose:
0088                 print("Error: could not save file locally, will run from remote")
0089             if os.path.exists(localfile):
0090                 if verbose:
0091                     print("Deleting partially transferred file %s" % localfile)
0092                 try:
0093                     os.unlink(localfile)
0094                 except:
0095                     pass
0096             return fname, False
0097 
0098     def run(self):
0099         outpostfix = self.postfix if self.postfix is not None else (
0100             "_Friend" if self.friend else "_Skim")
0101         if not self.noOut:
0102 
0103             if self.compression != "none":
0104                 ROOT.gInterpreter.ProcessLine("#include <Compression.h>")
0105                 (algo, level) = self.compression.split(":")
0106                 compressionLevel = int(level)
0107                 if algo == "LZMA":
0108                     compressionAlgo = ROOT.RCompressionSetting.EAlgorithm.kLZMA
0109                 elif algo == "ZLIB":
0110                     compressionAlgo = ROOT.RCompressionSetting.EAlgorithm.kZLIB
0111                 elif algo == "LZ4":
0112                     compressionAlgo = ROOT.RCompressionSetting.EAlgorithm.kLZ4
0113                 else:
0114                     raise RuntimeError("Unsupported compression %s" % algo)
0115             else:
0116                 compressionLevel = 0
0117             print("Will write selected trees to " + self.outputDir)
0118             if not self.justcount:
0119                 if not os.path.exists(self.outputDir):
0120                     os.system("mkdir -p " + self.outputDir)
0121         else:
0122             compressionLevel = 0
0123 
0124         if self.noOut:
0125             if len(self.modules) == 0:
0126                 raise RuntimeError(
0127                     "Running with --noout and no modules does nothing!")
0128 
0129         # Open histogram file, if desired
0130         if (self.histFileName is not None and self.histDirName is None) or (self.histFileName is None and self.histDirName is not None):
0131             raise RuntimeError(
0132                 "Must specify both histogram file and histogram directory!")
0133         elif self.histFileName is not None and self.histDirName is not None:
0134             self.histFile = ROOT.TFile.Open(self.histFileName, "RECREATE")
0135         else:
0136             self.histFile = None
0137 
0138         for m in self.modules:
0139             if hasattr(m, 'writeHistFile') and m.writeHistFile:
0140                 m.beginJob(histFile=self.histFile,
0141                            histDirName=self.histDirName)
0142             else:
0143                 m.beginJob()
0144 
0145         fullClone = (len(self.modules) == 0)
0146         outFileNames = []
0147         t0 = time.time()
0148         totEntriesRead = 0
0149         for fname in self.inputFiles:
0150             ffnames = []
0151             if "," in fname:
0152                 fnames = fname.split(',')
0153                 fname, ffnames = fnames[0], fnames[1:]
0154 
0155             fname = fname.strip()
0156 
0157             # Convert LFN to PFN if needed; this requires edmFileUtil to be present in $PATH
0158             if fname.startswith('/store/') :
0159                 fname = subprocess.check_output(['edmFileUtil', '-d', '-f '+fname]).decode("utf-8").strip()
0160 
0161             # open input file
0162             print(time.strftime("%d-%b-%Y %H:%M:%S %Z", time.localtime()), " Initiating request to open file %s" %(fname), flush=True) # CMSSW-syle message, required by eos caching scripts
0163             if self.prefetch:
0164                 ftoread, toBeDeleted = self.prefetchFile(fname)
0165                 inFile = ROOT.TFile.Open(ftoread)
0166             else:
0167                 inFile = ROOT.TFile.Open(fname)
0168 
0169             # get input tree
0170             inTree = inFile.Get("Events")
0171             if inTree is None:
0172                 inTree = inFile.Get("Friends")
0173             nEntries = min(inTree.GetEntries() -
0174                            self.firstEntry, self.maxEntries)
0175             totEntriesRead += nEntries
0176             # pre-skimming
0177             elist, jsonFilter = preSkim(
0178                 inTree, self.json, self.cut, maxEntries=self.maxEntries, firstEntry=self.firstEntry)
0179             if self.justcount:
0180                 print('Would select %d / %d entries from %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, fname, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
0181                 if self.prefetch:
0182                     if toBeDeleted:
0183                         os.unlink(ftoread)
0184                 continue
0185             else:
0186                 print('Pre-select %d entries out of %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
0187                 inAddFiles = []
0188                 inAddTrees = []
0189             for ffname in ffnames:
0190                 inAddFiles.append(ROOT.TFile.Open(ffname))
0191                 inAddTree = inAddFiles[-1].Get("Events")
0192                 if inAddTree is None:
0193                     inAddTree = inAddFiles[-1].Get("Friends")
0194                 inAddTrees.append(inAddTree)
0195                 inTree.AddFriend(inAddTree)
0196 
0197             if fullClone:
0198                 # no need of a reader (no event loop), but set up the elist if available
0199                 if elist:
0200                     inTree.SetEntryList(elist)
0201             else:
0202                 # initialize reader
0203                 if elist:
0204                     inTree = InputTree(inTree, elist)
0205                 else:
0206                     inTree = InputTree(inTree)
0207 
0208             # prepare output file
0209             if not self.noOut:
0210                 outFileName = os.path.join(self.outputDir, os.path.basename(
0211                     fname).replace(".root", outpostfix + ".root"))
0212                 outFile = ROOT.TFile.Open(
0213                     outFileName, "RECREATE", "", compressionLevel)
0214                 outFileNames.append(outFileName)
0215                 if compressionLevel:
0216                     outFile.SetCompressionAlgorithm(compressionAlgo)
0217                 # prepare output tree
0218                 if self.friend:
0219                     outTree = FriendOutput(inFile, inTree, outFile)
0220                 else:
0221                     firstEntry = 0 if fullClone and elist else self.firstEntry
0222                     outTree = FullOutput(
0223                         inFile,
0224                         inTree,
0225                         outFile,
0226                         branchSelection=self.branchsel,
0227                         outputbranchSelection=self.outputbranchsel,
0228                         fullClone=fullClone,
0229                         maxEntries=self.maxEntries,
0230                         firstEntry=firstEntry,
0231                         jsonFilter=jsonFilter,
0232                         provenance=self.provenance)
0233             else:
0234                 outFile = None
0235                 outTree = None
0236                 if self.branchsel:
0237                     self.branchsel.selectBranches(inTree)
0238 
0239             # process events, if needed
0240             if not fullClone and not (elist and elist.GetN() == 0):
0241                 eventRange = range(self.firstEntry, self.firstEntry +
0242                                     nEntries) if nEntries > 0 and not elist else None
0243                 (nall, npass, timeLoop) = eventLoop(
0244                     self.modules, inFile, outFile, inTree, outTree,
0245                     eventRange=eventRange, maxEvents=self.maxEntries
0246                 )
0247                 print('Processed %d preselected entries from %s (%s entries). Finally selected %d entries' % (nall, fname, nEntries, npass))
0248             elif outTree is not None:
0249                 nall = nEntries
0250                 print('Selected %d / %d entries from %s (%.2f%%)' % (outTree.tree().GetEntries(), nall, fname, outTree.tree().GetEntries() / (0.01 * nall) if nall else 0))
0251 
0252             # now write the output
0253             if not self.noOut:
0254                 outTree.write()
0255                 outFile.Close()
0256                 print("Done %s" % outFileName)
0257             if self.jobReport:
0258                 self.jobReport.addInputFile(fname, nall)
0259             if self.prefetch:
0260                 if toBeDeleted:
0261                     os.unlink(ftoread)
0262 
0263         for m in self.modules:
0264             m.endJob()
0265 
0266         # close histogram file
0267         if self.histFile != None:
0268             self.histFile.Close()
0269 
0270         print("Total time %.1f sec. to process %i events. Rate = %.1f Hz." % ((time.time() - t0), totEntriesRead, totEntriesRead / (time.time() - t0)))
0271 
0272         if self.haddFileName:
0273             haddnano = "./haddnano.py" if os.path.isfile(
0274                 "./haddnano.py") else "haddnano.py"
0275             os.system("%s %s %s" %
0276                       (haddnano, self.haddFileName, " ".join(outFileNames)))
0277         if self.jobReport:
0278             self.jobReport.addOutputFile(self.haddFileName)
0279             self.jobReport.save()