Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:55

0001 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0002 
0003 #include "DataFormats/Provenance/interface/EventToProcessBlockIndexes.h"
0004 #include "DataFormats/Provenance/interface/StoredProcessBlockHelper.h"
0005 
0006 #include <algorithm>
0007 #include <cassert>
0008 #include <utility>
0009 
0010 namespace edm {
0011 
0012   ProcessBlockHelperBase const* ProcessBlockHelper::topProcessBlockHelper() const { return this; }
0013 
0014   std::vector<std::string> const& ProcessBlockHelper::topProcessesWithProcessBlockProducts() const {
0015     return processesWithProcessBlockProducts();
0016   }
0017 
0018   unsigned int ProcessBlockHelper::nProcessesInFirstFile() const { return nProcessesInFirstFile_; }
0019 
0020   std::vector<std::vector<unsigned int>> const& ProcessBlockHelper::processBlockCacheIndices() const {
0021     return processBlockCacheIndices_;
0022   }
0023 
0024   std::vector<std::vector<unsigned int>> const& ProcessBlockHelper::nEntries() const { return nEntries_; }
0025 
0026   std::vector<unsigned int> const& ProcessBlockHelper::cacheIndexVectorsPerFile() const {
0027     return cacheIndexVectorsPerFile_;
0028   }
0029 
0030   std::vector<unsigned int> const& ProcessBlockHelper::cacheEntriesPerFile() const { return cacheEntriesPerFile_; }
0031 
0032   unsigned int ProcessBlockHelper::processBlockIndex(
0033       std::string const& processName, EventToProcessBlockIndexes const& eventToProcessBlockIndexes) const {
0034     for (unsigned int iProcess = 0; iProcess < nProcessesInFirstFile_; ++iProcess) {
0035       if (processName == processesWithProcessBlockProducts()[iProcess]) {
0036         return processBlockCacheIndices_[eventToProcessBlockIndexes.index()][iProcess];
0037       }
0038     }
0039     return invalidCacheIndex();
0040   }
0041 
0042   unsigned int ProcessBlockHelper::outerOffset() const { return outerOffset_; }
0043 
0044   // Modifies the process names and cache indices in the StoredProcessBlockHelper.
0045   // Part of the dropOnInput. Also part of reordering that forces ProcessBlocks
0046   // to be read in the same order for all input files.
0047   bool ProcessBlockHelper::firstFileDropProcessesAndReorderStored(
0048       StoredProcessBlockHelper& storedProcessBlockHelper,
0049       std::set<std::string> const& processesToKeep,
0050       std::vector<unsigned int> const& nEntries,
0051       std::vector<unsigned int>& finalIndexToStoredIndex) const {
0052     std::vector<std::string> const& storedProcesses = storedProcessBlockHelper.processesWithProcessBlockProducts();
0053 
0054     bool isModified = false;
0055     unsigned int finalSize = 0;
0056     for (auto const& process : storedProcesses) {
0057       if (processesToKeep.find(process) == processesToKeep.end()) {
0058         isModified = true;
0059       } else {
0060         ++finalSize;
0061       }
0062     }
0063     if (!isModified) {
0064       return isModified;
0065     }
0066 
0067     std::vector<std::string> finalProcesses;
0068     finalProcesses.reserve(finalSize);
0069     finalIndexToStoredIndex.reserve(finalSize);
0070     for (unsigned int iStored = 0; iStored < storedProcesses.size(); ++iStored) {
0071       if (processesToKeep.find(storedProcesses[iStored]) != processesToKeep.end()) {
0072         finalProcesses.emplace_back(storedProcesses[iStored]);
0073         finalIndexToStoredIndex.emplace_back(iStored);
0074       }
0075     }
0076     dropProcessesAndReorderStoredImpl(storedProcessBlockHelper, finalProcesses, nEntries, finalIndexToStoredIndex);
0077     return isModified;
0078   }
0079 
0080   // Modifies the process names and cache indices in the StoredProcessBlockHelper.
0081   // Part of the dropOnInput. Also part of reordering that forces ProcessBlocks
0082   // to be read in the same order for all input files.
0083   bool ProcessBlockHelper::dropProcessesAndReorderStored(StoredProcessBlockHelper& storedProcessBlockHelper,
0084                                                          std::set<std::string> const& processesToKeep,
0085                                                          std::vector<unsigned int> const& nEntries,
0086                                                          std::vector<unsigned int>& finalIndexToStoredIndex,
0087                                                          std::vector<std::string> const& firstFileFinalProcesses) const {
0088     std::vector<std::string> const& storedProcesses = storedProcessBlockHelper.processesWithProcessBlockProducts();
0089 
0090     std::vector<std::string> finalProcesses;
0091     // Always will allocate enough space (sometimes too much)
0092     finalProcesses.reserve(storedProcesses.size());
0093     finalIndexToStoredIndex.reserve(storedProcesses.size());
0094 
0095     // The outer loop here establishes the order
0096     // Only allows processes that got into finalProcesses for the first file
0097     for (unsigned int iFirst = 0; iFirst < firstFileFinalProcesses.size(); ++iFirst) {
0098       // Only includes processes also in storedProcesses
0099       for (unsigned int iStored = 0; iStored < storedProcesses.size(); ++iStored) {
0100         std::string const& storedProcess = storedProcesses[iStored];
0101         if (firstFileFinalProcesses[iFirst] == storedProcess) {
0102           // Also requires processes have kept ProcessBlock products
0103           if (processesToKeep.find(storedProcess) != processesToKeep.end()) {
0104             finalProcesses.emplace_back(storedProcess);
0105             finalIndexToStoredIndex.emplace_back(iStored);
0106             break;
0107           }
0108         }
0109       }
0110     }
0111 
0112     bool isModified = true;
0113     if (storedProcesses == finalProcesses) {
0114       isModified = false;
0115       return isModified;
0116     }
0117 
0118     dropProcessesAndReorderStoredImpl(storedProcessBlockHelper, finalProcesses, nEntries, finalIndexToStoredIndex);
0119     return isModified;
0120   }
0121 
0122   void ProcessBlockHelper::initializeFromPrimaryInput(StoredProcessBlockHelper const& storedProcessBlockHelper) {
0123     if (!initializedFromInput_) {
0124       initializedFromInput_ = true;
0125 
0126       assert(processesWithProcessBlockProducts().empty());
0127       setProcessesWithProcessBlockProducts(storedProcessBlockHelper.processesWithProcessBlockProducts());
0128       nProcessesInFirstFile_ = processesWithProcessBlockProducts().size();
0129     }
0130   }
0131 
0132   void ProcessBlockHelper::fillFromPrimaryInput(StoredProcessBlockHelper const& storedProcessBlockHelper,
0133                                                 std::vector<unsigned int> const& nEntries) {
0134     // I've written this so it will continue to work even if we someday relax
0135     // the strict merging requirement in the ProductRegistry (there
0136     // is a little extra complexity that may be unnecessary...).
0137 
0138     std::vector<std::string> const& storedProcesses = storedProcessBlockHelper.processesWithProcessBlockProducts();
0139     std::vector<unsigned int> const& storedCacheIndices = storedProcessBlockHelper.processBlockCacheIndices();
0140 
0141     outerOffset_ = processBlockCacheIndices_.size();
0142 
0143     if (nProcessesInFirstFile_ == 0) {
0144       // There were no ProcessBlock products in the first file. Nothing to do.
0145       return;
0146     } else if (!storedProcesses.empty()) {
0147       // Subsequent input file with ProcessBlock products
0148       fillFromPrimaryInputWhenNotEmpty(storedProcesses, storedCacheIndices, nEntries);
0149     } else if (storedProcesses.empty()) {
0150       // Subsequent input file without ProcessBlock products
0151       processBlockCacheIndices_.emplace_back(nProcessesInFirstFile_, invalidCacheIndex());
0152       cacheIndexVectorsPerFile_.push_back(1);
0153       std::vector<unsigned int> newNEntries(nProcessesInFirstFile_, 0);
0154       fillEntriesFromPrimaryInput(std::move(newNEntries));
0155     }
0156   }
0157 
0158   void ProcessBlockHelper::clearAfterOutputFilesClose() {
0159     processBlockCacheIndices_.clear();
0160     nEntries_.clear();
0161     cacheIndexVectorsPerFile_.clear();
0162     cacheEntriesPerFile_.clear();
0163     outerOffset_ = 0;
0164     cacheIndexOffset_ = 0;
0165   }
0166 
0167   // Modifies the process names and cache indices in the StoredProcessBlockHelper.
0168   // Part of the dropOnInput. Also part of reordering that forces ProcessBlocks
0169   // to be read in the same order for all input files.
0170   void ProcessBlockHelper::dropProcessesAndReorderStoredImpl(
0171       StoredProcessBlockHelper& storedProcessBlockHelper,
0172       std::vector<std::string>& finalProcesses,
0173       std::vector<unsigned int> const& nEntries,
0174       std::vector<unsigned int> const& finalIndexToStoredIndex) const {
0175     std::vector<std::string> const& storedProcesses = storedProcessBlockHelper.processesWithProcessBlockProducts();
0176     std::vector<unsigned int> const& storedCacheIndices = storedProcessBlockHelper.processBlockCacheIndices();
0177 
0178     assert(!storedProcesses.empty());
0179     std::vector<unsigned int> newCacheIndices;
0180     if (!finalProcesses.empty()) {
0181       // ProcessBlocks are read in the order of storedProcesses and within
0182       // each process in entry order in the TTree.  This establishes the cache
0183       // order that the values in storedCacheIndices refer to. The "offset" refers
0184       // to cache index value of the first ProcessBlock in a TTree.
0185       std::vector<unsigned int> oldOffsets;
0186       oldOffsets.reserve(storedProcesses.size());
0187       unsigned int offset = 0;
0188       for (auto const& entries : nEntries) {
0189         oldOffsets.emplace_back(offset);
0190         offset += entries;
0191       }
0192 
0193       unsigned int nFinalProcesses = finalProcesses.size();
0194       std::vector<unsigned int> newOffsets;
0195       newOffsets.reserve(nFinalProcesses);
0196       offset = 0;
0197       for (unsigned int iNew = 0; iNew < nFinalProcesses; ++iNew) {
0198         newOffsets.emplace_back(offset);
0199         offset += nEntries[finalIndexToStoredIndex[iNew]];
0200       }
0201 
0202       unsigned int nOuterLoop = storedCacheIndices.size() / storedProcesses.size();
0203       assert(nOuterLoop * storedProcesses.size() == storedCacheIndices.size());
0204       newCacheIndices.reserve(nOuterLoop * nFinalProcesses);
0205       unsigned int storedOuterOffset = 0;
0206 
0207       for (unsigned int k = 0; k < nOuterLoop; ++k) {
0208         for (unsigned int j = 0; j < nFinalProcesses; ++j) {
0209           unsigned int storedIndex = finalIndexToStoredIndex[j];
0210           unsigned int oldCacheIndex = storedCacheIndices[storedOuterOffset + storedIndex];
0211           unsigned int newCacheIndex = invalidCacheIndex();
0212           if (oldCacheIndex != invalidCacheIndex()) {
0213             newCacheIndex = oldCacheIndex - oldOffsets[storedIndex] + newOffsets[j];
0214           }
0215           newCacheIndices.emplace_back(newCacheIndex);
0216         }
0217         storedOuterOffset += storedProcesses.size();
0218       }
0219     }
0220     storedProcessBlockHelper.setProcessBlockCacheIndices(std::move(newCacheIndices));
0221     storedProcessBlockHelper.setProcessesWithProcessBlockProducts(std::move(finalProcesses));
0222   }
0223 
0224   void ProcessBlockHelper::fillFromPrimaryInputWhenNotEmpty(std::vector<std::string> const& storedProcesses,
0225                                                             std::vector<unsigned int> const& storedCacheIndices,
0226                                                             std::vector<unsigned int> const& nEntries) {
0227     assert(nProcessesInFirstFile_ <= processesWithProcessBlockProducts().size());
0228 
0229     // Calculate a translation from an index into the process names from the first file
0230     // to an index into the process names from the current file. Note the value will
0231     // be left invalid if the process name is not found in the current file.
0232     std::vector<unsigned int> firstFileToStored(nProcessesInFirstFile_, invalidProcessIndex());
0233     std::vector<unsigned int> newNEntries(nProcessesInFirstFile_, 0);
0234     for (unsigned int j = 0; j < nProcessesInFirstFile_; ++j) {
0235       for (unsigned int k = 0; k < storedProcesses.size(); ++k) {
0236         if (processesWithProcessBlockProducts()[j] == storedProcesses[k]) {
0237           firstFileToStored[j] = k;
0238           newNEntries[j] = nEntries[k];
0239           break;
0240         }
0241       }
0242     }
0243 
0244     // Append the cache indices from the current input file to the
0245     // cache indices container from the previous files.
0246     unsigned int nCacheIndexVectors = storedCacheIndices.size() / storedProcesses.size();
0247     assert(storedProcesses.size() * nCacheIndexVectors == storedCacheIndices.size());
0248     processBlockCacheIndices_.resize(processBlockCacheIndices_.size() + nCacheIndexVectors);
0249     unsigned int storedIndex = 0;
0250     for (unsigned int k = 0; k < nCacheIndexVectors; ++k) {
0251       processBlockCacheIndices_[outerOffset_ + k].reserve(nProcessesInFirstFile_);
0252       for (unsigned int j = 0; j < nProcessesInFirstFile_; ++j) {
0253         unsigned int iStored = firstFileToStored[j];
0254         if (iStored == invalidProcessIndex()) {
0255           processBlockCacheIndices_[outerOffset_ + k].push_back(invalidCacheIndex());
0256         } else {
0257           unsigned int oldCacheIndex = storedCacheIndices[storedIndex];
0258           ++storedIndex;
0259           unsigned int newCacheIndex = invalidCacheIndex();
0260           if (oldCacheIndex != invalidCacheIndex()) {
0261             newCacheIndex = oldCacheIndex + cacheIndexOffset_;
0262           }
0263           processBlockCacheIndices_[outerOffset_ + k].push_back(newCacheIndex);
0264         }
0265       }
0266     }
0267     cacheIndexVectorsPerFile_.push_back(nCacheIndexVectors);
0268     fillEntriesFromPrimaryInput(std::move(newNEntries));
0269   }
0270 
0271   void ProcessBlockHelper::fillEntriesFromPrimaryInput(std::vector<unsigned int> nEntries) {
0272     unsigned int entriesThisFile = 0;
0273     for (auto const& entries : nEntries) {
0274       entriesThisFile += entries;
0275     }
0276     nEntries_.push_back(std::move(nEntries));
0277     cacheEntriesPerFile_.push_back(entriesThisFile);
0278     cacheIndexOffset_ += entriesThisFile;
0279   }
0280 
0281 }  // namespace edm