File indexing completed on 2024-11-11 23:31:43
0001
0002 from PhysicsTools.NanoAODTools.postprocessing.framework.jobreport import JobReport
0003 from PhysicsTools.NanoAODTools.postprocessing.framework.preskimming import preSkim
0004 from PhysicsTools.NanoAODTools.postprocessing.framework.output import FriendOutput, FullOutput
0005 from PhysicsTools.NanoAODTools.postprocessing.framework.eventloop import eventLoop
0006 from PhysicsTools.NanoAODTools.postprocessing.framework.datamodel import InputTree
0007 from PhysicsTools.NanoAODTools.postprocessing.framework.branchselection import BranchSelection
0008 import os
0009 import time
0010 import hashlib
0011 import subprocess
0012 import ROOT
0013 ROOT.PyConfig.IgnoreCommandLineOptions = True
0014
0015
0016 class PostProcessor:
0017 def __init__(
0018 self, outputDir, inputFiles, cut=None, branchsel=None, modules=[],
0019 compression="LZMA:9", friend=False, postfix=None, jsonInput=None,
0020 noOut=False, justcount=False, provenance=False, haddFileName=None,
0021 fwkJobReport=False, histFileName=None, histDirName=None,
0022 outputbranchsel=None, maxEntries=None, firstEntry=0, prefetch=False,
0023 longTermCache=False
0024 ):
0025 self.outputDir = outputDir
0026 self.inputFiles = inputFiles
0027 self.cut = cut
0028 self.modules = modules
0029 self.compression = compression
0030 self.postfix = postfix
0031 self.json = jsonInput
0032 self.noOut = noOut
0033 self.friend = friend
0034 self.justcount = justcount
0035 self.provenance = provenance
0036 self.jobReport = JobReport() if fwkJobReport else None
0037 self.haddFileName = haddFileName
0038 self.histFile = None
0039 self.histDirName = None
0040 if self.jobReport and not self.haddFileName:
0041 print("Because you requested a FJR we assume you want the final " \
0042 "hadd. No name specified for the output file, will use tree.root")
0043 self.haddFileName = "tree.root"
0044 self.branchsel = BranchSelection(branchsel) if branchsel else None
0045 if outputbranchsel is not None:
0046 self.outputbranchsel = BranchSelection(outputbranchsel)
0047 elif outputbranchsel is None and branchsel is not None:
0048
0049 self.outputbranchsel = BranchSelection(branchsel)
0050 else:
0051 self.outputbranchsel = None
0052
0053 self.histFileName = histFileName
0054 self.histDirName = histDirName
0055
0056 self.maxEntries = maxEntries if maxEntries else 9223372036854775807
0057 self.firstEntry = firstEntry
0058 self.prefetch = prefetch
0059
0060 self.longTermCache = longTermCache
0061
0062 def prefetchFile(self, fname, verbose=True):
0063 tmpdir = os.environ['TMPDIR'] if 'TMPDIR' in os.environ else "/tmp"
0064 if not fname.startswith("root://"):
0065 return fname, False
0066 rndchars = "".join([hex(i)[2:] for i in bytearray(os.urandom(8))]) \
0067 if not self.longTermCache else "long_cache-id%d-%s" \
0068 % (os.getuid(), hashlib.sha1(fname.encode('utf-8')).hexdigest())
0069 localfile = "%s/%s-%s.root" \
0070 % (tmpdir, os.path.basename(fname).replace(".root", ""), rndchars)
0071 if self.longTermCache and os.path.exists(localfile):
0072 if verbose:
0073 print("Filename %s is already available in local path %s " \
0074 % (fname, localfile))
0075 return localfile, False
0076 try:
0077 if verbose:
0078 print("Filename %s is remote, will do a copy to local path %s"\
0079 % (fname, localfile))
0080 start = time.time()
0081 subprocess.check_output(["xrdcp", "-f", "-N", fname, localfile])
0082 if verbose:
0083 print("Time used for transferring the file locally: %.2f s"\
0084 % (time.time() - start))
0085 return localfile, (not self.longTermCache)
0086 except:
0087 if verbose:
0088 print("Error: could not save file locally, will run from remote")
0089 if os.path.exists(localfile):
0090 if verbose:
0091 print("Deleting partially transferred file %s" % localfile)
0092 try:
0093 os.unlink(localfile)
0094 except:
0095 pass
0096 return fname, False
0097
0098 def run(self):
0099 outpostfix = self.postfix if self.postfix is not None else (
0100 "_Friend" if self.friend else "_Skim")
0101 if not self.noOut:
0102
0103 if self.compression != "none":
0104 ROOT.gInterpreter.ProcessLine("#include <Compression.h>")
0105 (algo, level) = self.compression.split(":")
0106 compressionLevel = int(level)
0107 if algo == "LZMA":
0108 compressionAlgo = ROOT.RCompressionSetting.EAlgorithm.kLZMA
0109 elif algo == "ZLIB":
0110 compressionAlgo = ROOT.RCompressionSetting.EAlgorithm.kZLIB
0111 elif algo == "LZ4":
0112 compressionAlgo = ROOT.RCompressionSetting.EAlgorithm.kLZ4
0113 else:
0114 raise RuntimeError("Unsupported compression %s" % algo)
0115 else:
0116 compressionLevel = 0
0117 print("Will write selected trees to " + self.outputDir)
0118 if not self.justcount:
0119 if not os.path.exists(self.outputDir):
0120 os.system("mkdir -p " + self.outputDir)
0121 else:
0122 compressionLevel = 0
0123
0124 if self.noOut:
0125 if len(self.modules) == 0:
0126 raise RuntimeError(
0127 "Running with --noout and no modules does nothing!")
0128
0129
0130 if (self.histFileName is not None and self.histDirName is None) or (self.histFileName is None and self.histDirName is not None):
0131 raise RuntimeError(
0132 "Must specify both histogram file and histogram directory!")
0133 elif self.histFileName is not None and self.histDirName is not None:
0134 self.histFile = ROOT.TFile.Open(self.histFileName, "RECREATE")
0135 else:
0136 self.histFile = None
0137
0138 for m in self.modules:
0139 if hasattr(m, 'writeHistFile') and m.writeHistFile:
0140 m.beginJob(histFile=self.histFile,
0141 histDirName=self.histDirName)
0142 else:
0143 m.beginJob()
0144
0145 fullClone = (len(self.modules) == 0)
0146 outFileNames = []
0147 t0 = time.time()
0148 totEntriesRead = 0
0149 for fname in self.inputFiles:
0150 ffnames = []
0151 if "," in fname:
0152 fnames = fname.split(',')
0153 fname, ffnames = fnames[0], fnames[1:]
0154
0155 fname = fname.strip()
0156
0157
0158 if fname.startswith('/store/') :
0159 fname = subprocess.check_output(['edmFileUtil', '-d', '-f '+fname]).decode("utf-8").strip()
0160
0161
0162 print(time.strftime("%d-%b-%Y %H:%M:%S %Z", time.localtime()), " Initiating request to open file %s" %(fname), flush=True)
0163 if self.prefetch:
0164 ftoread, toBeDeleted = self.prefetchFile(fname)
0165 inFile = ROOT.TFile.Open(ftoread)
0166 else:
0167 inFile = ROOT.TFile.Open(fname)
0168
0169
0170 inTree = inFile.Get("Events")
0171 if inTree is None:
0172 inTree = inFile.Get("Friends")
0173 nEntries = min(inTree.GetEntries() -
0174 self.firstEntry, self.maxEntries)
0175 totEntriesRead += nEntries
0176
0177 elist, jsonFilter = preSkim(
0178 inTree, self.json, self.cut, maxEntries=self.maxEntries, firstEntry=self.firstEntry)
0179 if self.justcount:
0180 print('Would select %d / %d entries from %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, fname, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
0181 if self.prefetch:
0182 if toBeDeleted:
0183 os.unlink(ftoread)
0184 continue
0185 else:
0186 print('Pre-select %d entries out of %s (%.2f%%)' % (elist.GetN() if elist else nEntries, nEntries, (elist.GetN() if elist else nEntries) / (0.01 * nEntries) if nEntries else 0))
0187 inAddFiles = []
0188 inAddTrees = []
0189 for ffname in ffnames:
0190 inAddFiles.append(ROOT.TFile.Open(ffname))
0191 inAddTree = inAddFiles[-1].Get("Events")
0192 if inAddTree is None:
0193 inAddTree = inAddFiles[-1].Get("Friends")
0194 inAddTrees.append(inAddTree)
0195 inTree.AddFriend(inAddTree)
0196
0197 if fullClone:
0198
0199 if elist:
0200 inTree.SetEntryList(elist)
0201 else:
0202
0203 if elist:
0204 inTree = InputTree(inTree, elist)
0205 else:
0206 inTree = InputTree(inTree)
0207
0208
0209 if not self.noOut:
0210 outFileName = os.path.join(self.outputDir, os.path.basename(
0211 fname).replace(".root", outpostfix + ".root"))
0212 outFile = ROOT.TFile.Open(
0213 outFileName, "RECREATE", "", compressionLevel)
0214 outFileNames.append(outFileName)
0215 if compressionLevel:
0216 outFile.SetCompressionAlgorithm(compressionAlgo)
0217
0218 if self.friend:
0219 outTree = FriendOutput(inFile, inTree, outFile)
0220 else:
0221 firstEntry = 0 if fullClone and elist else self.firstEntry
0222 outTree = FullOutput(
0223 inFile,
0224 inTree,
0225 outFile,
0226 branchSelection=self.branchsel,
0227 outputbranchSelection=self.outputbranchsel,
0228 fullClone=fullClone,
0229 maxEntries=self.maxEntries,
0230 firstEntry=firstEntry,
0231 jsonFilter=jsonFilter,
0232 provenance=self.provenance)
0233 else:
0234 outFile = None
0235 outTree = None
0236 if self.branchsel:
0237 self.branchsel.selectBranches(inTree)
0238
0239
0240 if not fullClone and not (elist and elist.GetN() == 0):
0241 eventRange = range(self.firstEntry, self.firstEntry +
0242 nEntries) if nEntries > 0 and not elist else None
0243 (nall, npass, timeLoop) = eventLoop(
0244 self.modules, inFile, outFile, inTree, outTree,
0245 eventRange=eventRange, maxEvents=self.maxEntries
0246 )
0247 print('Processed %d preselected entries from %s (%s entries). Finally selected %d entries' % (nall, fname, nEntries, npass))
0248 elif outTree is not None:
0249 nall = nEntries
0250 print('Selected %d / %d entries from %s (%.2f%%)' % (outTree.tree().GetEntries(), nall, fname, outTree.tree().GetEntries() / (0.01 * nall) if nall else 0))
0251
0252
0253 if not self.noOut:
0254 outTree.write()
0255 outFile.Close()
0256 print("Done %s" % outFileName)
0257 if self.jobReport:
0258 self.jobReport.addInputFile(fname, nall)
0259 if self.prefetch:
0260 if toBeDeleted:
0261 os.unlink(ftoread)
0262
0263 for m in self.modules:
0264 m.endJob()
0265
0266
0267 if self.histFile != None:
0268 self.histFile.Close()
0269
0270 print("Total time %.1f sec. to process %i events. Rate = %.1f Hz." % ((time.time() - t0), totEntriesRead, totEntriesRead / (time.time() - t0)))
0271
0272 if self.haddFileName:
0273 haddnano = "./haddnano.py" if os.path.isfile(
0274 "./haddnano.py") else "haddnano.py"
0275 os.system("%s %s %s" %
0276 (haddnano, self.haddFileName, " ".join(outFileNames)))
0277 if self.jobReport:
0278 self.jobReport.addOutputFile(self.haddFileName)
0279 self.jobReport.save()