Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-22 06:27:17

0001 #include "FWCore/Common/interface/OutputProcessBlockHelper.h"
0002 
0003 #include "DataFormats/Provenance/interface/StoredProcessBlockHelper.h"
0004 #include "FWCore/Common/interface/ProcessBlockHelperBase.h"
0005 
0006 #include <algorithm>
0007 #include <cassert>
0008 #include <utility>
0009 
0010 namespace edm {
0011 
0012   void OutputProcessBlockHelper::updateAfterProductSelection(
0013       std::set<std::string> const& processesWithKeptProcessBlockProducts,
0014       ProcessBlockHelperBase const& processBlockHelper) {
0015     processBlockHelper_ = &processBlockHelper;
0016 
0017     // Copy the list of processes with ProcessBlock products from the EventProcessor,
0018     // except remove any processes where the output module calling this has dropped all ProcessBlock
0019     // products. We want to maintain the same order and only remove elements. Fill a vector that can
0020     // translate from one process list to the other.
0021     assert(processesWithProcessBlockProducts_.empty());
0022     unsigned int iProcess = 0;
0023     for (auto const& process : processBlockHelper.processesWithProcessBlockProducts()) {
0024       if (processesWithKeptProcessBlockProducts.find(process) != processesWithKeptProcessBlockProducts.end()) {
0025         processesWithProcessBlockProducts_.emplace_back(process);
0026         translateFromStoredIndex_.emplace_back(iProcess);
0027       }
0028       ++iProcess;
0029     }
0030 
0031     for (auto const& addedProcess : processBlockHelper.addedProcesses()) {
0032       // count new processes producing ProcessBlock products that are
0033       // kept by the OutputModule. There can be at most 1.
0034       if (std::find(processesWithProcessBlockProducts_.begin(),
0035                     processesWithProcessBlockProducts_.end(),
0036                     addedProcess) != processesWithProcessBlockProducts_.end()) {
0037         ++nAddedProcesses_;
0038       }
0039     }
0040 
0041     // Determine if any ProcessBlock product from the input file is kept by the output module.
0042     // Do this by looking for a process name on both the list of processes with ProcessBlock
0043     // products kept by the output module and process names from the input with ProcessBlock
0044     // products.
0045     productsFromInputKept_ =
0046         std::find_first_of(processesWithProcessBlockProducts_.begin(),
0047                            processesWithProcessBlockProducts_.end(),
0048                            processBlockHelper.topProcessesWithProcessBlockProducts().begin(),
0049                            processBlockHelper.topProcessesWithProcessBlockProducts().begin() +
0050                                processBlockHelper.nProcessesInFirstFile()) != processesWithProcessBlockProducts_.end();
0051   }
0052 
0053   void OutputProcessBlockHelper::fillCacheIndices(StoredProcessBlockHelper& storedProcessBlockHelper) const {
0054     // The stored cache indices are the ones we want to fill.
0055     // This will get written to the output file.
0056     // Note for output the vector of vectors is flattened into a single vector
0057     std::vector<unsigned int> storedCacheIndices;
0058 
0059     // Number of processes in StoredProcessBlockHelper.
0060     unsigned int nStoredProcesses = storedProcessBlockHelper.processesWithProcessBlockProducts().size();
0061 
0062     if (!productsFromInputKept_) {
0063       // This is really simple if we are not keeping any ProcessBlock products
0064       // from the input file. Deal with that special case first.
0065       // nStoredProcesses will be 1.
0066       assert(nAddedProcesses_ == nStoredProcesses);
0067       storedCacheIndices.reserve(nStoredProcesses);
0068       for (unsigned int i = 0; i < nStoredProcesses; ++i) {
0069         storedCacheIndices.push_back(i);
0070       }
0071       storedProcessBlockHelper.setProcessBlockCacheIndices(std::move(storedCacheIndices));
0072       return;
0073     }
0074 
0075     // Cache indices of the main ProcessBlockHelper we use as input. This
0076     // ProcessBlockHelper is owned by the EventProcessor.
0077     std::vector<std::vector<unsigned int>> const& cacheIndices = processBlockHelper_->processBlockCacheIndices();
0078 
0079     // We need to convert the cache indices in the ProcessBlockHelper to have different values when
0080     // put in the StoredProcessBlockHelper. The values are not the same because the ProcessBlocks are
0081     // not read in the same order in this process as compared to the next process which will read
0082     // the output file that is being written (the read order is the same as the order the cache
0083     // objects are placed in the cache vectors). In this process, they are ordered first by input file,
0084     // second by process and last by TTree entry. In the next process, this output file will be read
0085     // as a single input file. The ProcessBlocks are read in process order (this will be a subset
0086     // of the process list in ProcessBlockHelper, maybe smaller or maybe the same), and finally in
0087     // order of entry in the TTree. This conversion is done by subtracting and adding some
0088     // offsets and a lot of the following code involves calculating these offsets to do the conversion.
0089 
0090     // We will need the info in these to calculate the offsets
0091     std::vector<unsigned int> const& cacheIndexVectorsPerFile = processBlockHelper_->cacheIndexVectorsPerFile();
0092     std::vector<unsigned int> const& cacheEntriesPerFile = processBlockHelper_->cacheEntriesPerFile();
0093     std::vector<std::vector<unsigned int>> const& nEntries = processBlockHelper_->nEntries();
0094 
0095     assert(!cacheIndices.empty());
0096     // Count processes in the input file with saved ProcessBlock products in the output
0097     unsigned int nInputProcesses = 0;
0098     for (unsigned int iStoredProcess = 0; iStoredProcess < nStoredProcesses; ++iStoredProcess) {
0099       // The existing cache indices in processBlockHelper include only indices
0100       // corresponding to the processes in the input files. If there are more, then
0101       // they correspond to current process (and there only will be more if some
0102       // of the newly produced ProcessBlock products are going to be kept).
0103       // There will be at most 1 added process.
0104       if (translateFromStoredIndex_[iStoredProcess] < cacheIndices[0].size()) {
0105         ++nInputProcesses;
0106       }
0107     }
0108 
0109     // The following are the 4 offsets. The first two are defined relative to the
0110     // cache sequence in this process. The second two offsets are defined relative
0111     // to the cache sequence when the output file we are writing is read.
0112 
0113     // 1. Total number of cache entries in all input files prior to the current input file
0114     unsigned int fileOffset = 0;
0115 
0116     // 2. For each process, the total number of cache entries in processes in the current
0117     // input file and before the process
0118     std::vector<unsigned int> processOffset(nInputProcesses, 0);
0119 
0120     // 3. For each input process with ProcessBlock products stored by this
0121     // output module, the total number of cache entries in earlier input processes
0122     // that have ProcessBlock products stored by this output module.
0123     // Summed over all input files and including only processes in StoredProcessBlockHelper.
0124     // Plus an extra element at the end that includes all entries in all such processes.
0125     assert(!nEntries.empty());
0126     std::vector<unsigned int> storedProcessOffset(nInputProcesses + 1, 0);
0127 
0128     // 4. For each process with ProcessBlock products stored by this output module,
0129     // the total number of cache entries in that process in all input files before
0130     // the current input file.
0131     std::vector<unsigned int> storedFileInProcessOffset(nInputProcesses, 0);
0132 
0133     setStoredProcessOffset(nInputProcesses, nEntries, storedProcessOffset);
0134 
0135     storedCacheIndices.reserve(cacheIndices.size() * nStoredProcesses);
0136 
0137     unsigned int iFile = 0;
0138     unsigned int innerVectorsCurrentFile = 0;
0139 
0140     // In ProcessBlockHelper, there is a vector which contains vectors
0141     // of cache indices. Iterate over the inner vectors.
0142     for (auto const& innerVectorOfCacheIndices : cacheIndices) {
0143       // The inner vectors are associated with input files. Several contiguous
0144       // inner vectors can be associated with the same input file. Check to
0145       // see if we have crossed an input file boundary and set the file
0146       // index, iFile, at the next file associated with at least
0147       // one inner vector if necessary.
0148       while (innerVectorsCurrentFile == cacheIndexVectorsPerFile[iFile]) {
0149         // Sum cache entries for all files before the current file in
0150         // ProcessBlockHelper
0151         fileOffset += cacheEntriesPerFile[iFile];
0152         ++iFile;
0153         innerVectorsCurrentFile = 0;
0154       }
0155       if (innerVectorsCurrentFile == 0) {
0156         // Call these when the input file changes
0157         setProcessOffset(iFile, nInputProcesses, nEntries, processOffset);
0158         setStoredFileInProcessOffset(iFile, nInputProcesses, nEntries, storedFileInProcessOffset);
0159       }
0160       ++innerVectorsCurrentFile;
0161 
0162       assert(nInputProcesses + nAddedProcesses_ == nStoredProcesses);
0163 
0164       // Really fill the cache indices that will be stored in the output file in this loop
0165       for (unsigned int iStoredProcess = 0; iStoredProcess < nStoredProcesses; ++iStoredProcess) {
0166         unsigned int storedCacheIndex = ProcessBlockHelperBase::invalidCacheIndex();
0167         if (iStoredProcess < nInputProcesses) {
0168           unsigned int cacheIndex = innerVectorOfCacheIndices[translateFromStoredIndex_[iStoredProcess]];
0169           if (cacheIndex != ProcessBlockHelperBase::invalidCacheIndex()) {
0170             // The offsets count in the cache sequence to the first entry in
0171             // the current input file and process
0172             unsigned int inputOffset = fileOffset + processOffset[iStoredProcess];
0173             unsigned int storedOffset = storedProcessOffset[iStoredProcess] + storedFileInProcessOffset[iStoredProcess];
0174             storedCacheIndex = cacheIndex - inputOffset + storedOffset;
0175           }
0176         } else {
0177           // This corresponds to the current process if it has newly produced
0178           // ProcessBlock products.
0179           storedCacheIndex = storedProcessOffset[nInputProcesses] + iStoredProcess - nInputProcesses;
0180         }
0181         storedCacheIndices.push_back(storedCacheIndex);
0182       }
0183     }
0184     storedProcessBlockHelper.setProcessBlockCacheIndices(std::move(storedCacheIndices));
0185   }
0186 
0187   void OutputProcessBlockHelper::setStoredProcessOffset(unsigned int nInputProcesses,
0188                                                         std::vector<std::vector<unsigned int>> const& nEntries,
0189                                                         std::vector<unsigned int>& storedProcessOffset) const {
0190     unsigned int iStored = 0;
0191     for (auto& offset : storedProcessOffset) {
0192       offset = 0;
0193       // loop over earlier processes
0194       for (unsigned int jStored = 0; jStored < iStored; ++jStored) {
0195         unsigned int indexInEventProcessor = translateFromStoredIndex_[jStored];
0196         // loop over input files
0197         for (auto const& entries : nEntries) {
0198           assert(indexInEventProcessor < entries.size());
0199           offset += entries[indexInEventProcessor];
0200         }
0201       }
0202       ++iStored;
0203     }
0204   }
0205 
0206   void OutputProcessBlockHelper::setProcessOffset(unsigned int iFile,
0207                                                   unsigned int nInputProcesses,
0208                                                   std::vector<std::vector<unsigned int>> const& nEntries,
0209                                                   std::vector<unsigned int>& processOffset) const {
0210     unsigned int iStored = 0;
0211     for (auto& offset : processOffset) {
0212       offset = 0;
0213       unsigned int iProcess = translateFromStoredIndex_[iStored];
0214       for (unsigned int jProcess = 0; jProcess < iProcess; ++jProcess) {
0215         offset += nEntries[iFile][jProcess];
0216       }
0217       ++iStored;
0218     }
0219   }
0220 
0221   void OutputProcessBlockHelper::setStoredFileInProcessOffset(
0222       unsigned int iFile,
0223       unsigned int nInputProcesses,
0224       std::vector<std::vector<unsigned int>> const& nEntries,
0225       std::vector<unsigned int>& storedFileInProcessOffset) const {
0226     unsigned int iStored = 0;
0227     for (auto& offset : storedFileInProcessOffset) {
0228       offset = 0;
0229       unsigned int indexInEventProcessor = translateFromStoredIndex_[iStored];
0230       // loop over input files before current input file
0231       for (unsigned int jFile = 0; jFile < iFile; ++jFile) {
0232         assert(indexInEventProcessor < nEntries[jFile].size());
0233         offset += nEntries[jFile][indexInEventProcessor];
0234       }
0235       ++iStored;
0236     }
0237   }
0238 
0239 }  // namespace edm