Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:55

0001 #include "FWCore/Common/interface/OutputProcessBlockHelper.h"
0002 
0003 #include "DataFormats/Provenance/interface/StoredProcessBlockHelper.h"
0004 #include "FWCore/Common/interface/ProcessBlockHelperBase.h"
0005 
0006 #include <algorithm>
0007 #include <cassert>
0008 #include <utility>
0009 
0010 namespace edm {
0011 
0012   void OutputProcessBlockHelper::updateAfterProductSelection(
0013       std::set<std::string> const& processesWithKeptProcessBlockProducts,
0014       ProcessBlockHelperBase const& processBlockHelper) {
0015     processBlockHelper_ = &processBlockHelper;
0016 
0017     // Copy the list of processes with ProcessBlock products from the EventProcessor or SubProcess,
0018     // except remove any processes where the output module calling this has dropped all ProcessBlock
0019     // products. We want to maintain the same order and only remove elements. Fill a vector that can
0020     // translate from one process list to the other.
0021     assert(processesWithProcessBlockProducts_.empty());
0022     unsigned int iProcess = 0;
0023     for (auto const& process : processBlockHelper.processesWithProcessBlockProducts()) {
0024       if (processesWithKeptProcessBlockProducts.find(process) != processesWithKeptProcessBlockProducts.end()) {
0025         processesWithProcessBlockProducts_.emplace_back(process);
0026         translateFromStoredIndex_.emplace_back(iProcess);
0027       }
0028       ++iProcess;
0029     }
0030 
0031     for (auto const& addedProcess : processBlockHelper.addedProcesses()) {
0032       // count new processes producing ProcessBlock products that are
0033       // kept by the OutputModule. There can be at most 1 except
0034       // in the case of SubProcesses.
0035       if (std::find(processesWithProcessBlockProducts_.begin(),
0036                     processesWithProcessBlockProducts_.end(),
0037                     addedProcess) != processesWithProcessBlockProducts_.end()) {
0038         ++nAddedProcesses_;
0039       }
0040     }
0041 
0042     // Determine if any ProcessBlock product from the input file is kept by the output module.
0043     // Do this by looking for a process name on both the list of processes with ProcessBlock
0044     // products kept by the output module and process names from the input with ProcessBlock
0045     // products.
0046     productsFromInputKept_ =
0047         std::find_first_of(processesWithProcessBlockProducts_.begin(),
0048                            processesWithProcessBlockProducts_.end(),
0049                            processBlockHelper.topProcessesWithProcessBlockProducts().begin(),
0050                            processBlockHelper.topProcessesWithProcessBlockProducts().begin() +
0051                                processBlockHelper.nProcessesInFirstFile()) != processesWithProcessBlockProducts_.end();
0052   }
0053 
0054   void OutputProcessBlockHelper::fillCacheIndices(StoredProcessBlockHelper& storedProcessBlockHelper) const {
0055     // The stored cache indices are the ones we want to fill.
0056     // This will get written to the output file.
0057     // Note for output the vector of vectors is flattened into a single vector
0058     std::vector<unsigned int> storedCacheIndices;
0059 
0060     // Number of processes in StoredProcessBlockHelper.
0061     unsigned int nStoredProcesses = storedProcessBlockHelper.processesWithProcessBlockProducts().size();
0062 
0063     if (!productsFromInputKept_) {
0064       // This is really simple if we are not keeping any ProcessBlock products
0065       // from the input file. Deal with that special case first.
0066       // Except for the special case of a SubProcess, nStoredProcesses will be 1.
0067       assert(nAddedProcesses_ == nStoredProcesses);
0068       storedCacheIndices.reserve(nStoredProcesses);
0069       for (unsigned int i = 0; i < nStoredProcesses; ++i) {
0070         storedCacheIndices.push_back(i);
0071       }
0072       storedProcessBlockHelper.setProcessBlockCacheIndices(std::move(storedCacheIndices));
0073       return;
0074     }
0075 
0076     // Cache indices of the main ProcessBlockHelper we use as input. This
0077     // ProcessBlockHelper is owned by the EventProcessor.
0078     std::vector<std::vector<unsigned int>> const& cacheIndices = processBlockHelper_->processBlockCacheIndices();
0079 
0080     // We need to convert the cache indices in the ProcessBlockHelper to have different values when
0081     // put in the StoredProcessBlockHelper. The values are not the same because the ProcessBlocks are
0082     // not read in the same order in this process as compared to the next process which will read
0083     // the output file that is being written (the read order is the same as the order the cache
0084     // objects are placed in the cache vectors). In this process, they are ordered first by input file,
0085     // second by process and last by TTree entry. In the next process, this output file will be read
0086     // as a single input file. The ProcessBlocks are read in process order (this will be a subset
0087     // of the process list in ProcessBlockHelper, maybe smaller or maybe the same), and finally in
0088     // order of entry in the TTree. This conversion is done by subtracting and adding some
0089     // offsets and a lot of the following code involves calculating these offsets to do the conversion.
0090 
0091     // We will need the info in these to calculate the offsets
0092     std::vector<unsigned int> const& cacheIndexVectorsPerFile = processBlockHelper_->cacheIndexVectorsPerFile();
0093     std::vector<unsigned int> const& cacheEntriesPerFile = processBlockHelper_->cacheEntriesPerFile();
0094     std::vector<std::vector<unsigned int>> const& nEntries = processBlockHelper_->nEntries();
0095 
0096     assert(!cacheIndices.empty());
0097     // Count processes in the input file with saved ProcessBlock products in the output
0098     unsigned int nInputProcesses = 0;
0099     for (unsigned int iStoredProcess = 0; iStoredProcess < nStoredProcesses; ++iStoredProcess) {
0100       // The existing cache indices in processBlockHelper include only indices
0101       // corresponding to the processes in the input files. If there are more, then
0102       // they correspond to current process (and there only will be more if some
0103       // of the newly produced ProcessBlock products are going to be kept).
0104       // There will be at most 1 added process except in the case of SubProcesses.
0105       if (translateFromStoredIndex_[iStoredProcess] < cacheIndices[0].size()) {
0106         ++nInputProcesses;
0107       }
0108     }
0109 
0110     // The following are the 4 offsets. The first two are defined relative to the
0111     // cache sequence in this process. The second two offsets are defined relative
0112     // to the cache sequence when the output file we are writing is read.
0113 
0114     // 1. Total number of cache entries in all input files prior to the current input file
0115     unsigned int fileOffset = 0;
0116 
0117     // 2. For each process, the total number of cache entries in processes in the current
0118     // input file and before the process
0119     std::vector<unsigned int> processOffset(nInputProcesses, 0);
0120 
0121     // 3. For each input process with ProcessBlock products stored by this
0122     // output module, the total number of cache entries in earlier input processes
0123     // that have ProcessBlock products stored by this output module.
0124     // Summed over all input files and including only processes in StoredProcessBlockHelper.
0125     // Plus an extra element at the end that includes all entries in all such processes.
0126     assert(!nEntries.empty());
0127     std::vector<unsigned int> storedProcessOffset(nInputProcesses + 1, 0);
0128 
0129     // 4. For each process with ProcessBlock products stored by this output module,
0130     // the total number of cache entries in that process in all input files before
0131     // the current input file.
0132     std::vector<unsigned int> storedFileInProcessOffset(nInputProcesses, 0);
0133 
0134     setStoredProcessOffset(nInputProcesses, nEntries, storedProcessOffset);
0135 
0136     storedCacheIndices.reserve(cacheIndices.size() * nStoredProcesses);
0137 
0138     unsigned int iFile = 0;
0139     unsigned int innerVectorsCurrentFile = 0;
0140 
0141     // In ProcessBlockHelper, there is a vector which contains vectors
0142     // of cache indices. Iterate over the inner vectors.
0143     for (auto const& innerVectorOfCacheIndices : cacheIndices) {
0144       // The inner vectors are associated with input files. Several contiguous
0145       // inner vectors can be associated with the same input file. Check to
0146       // see if we have crossed an input file boundary and set the file
0147       // index, iFile, at the next file associated with at least
0148       // one inner vector if necessary.
0149       while (innerVectorsCurrentFile == cacheIndexVectorsPerFile[iFile]) {
0150         // Sum cache entries for all files before the current file in
0151         // ProcessBlockHelper
0152         fileOffset += cacheEntriesPerFile[iFile];
0153         ++iFile;
0154         innerVectorsCurrentFile = 0;
0155       }
0156       if (innerVectorsCurrentFile == 0) {
0157         // Call these when the input file changes
0158         setProcessOffset(iFile, nInputProcesses, nEntries, processOffset);
0159         setStoredFileInProcessOffset(iFile, nInputProcesses, nEntries, storedFileInProcessOffset);
0160       }
0161       ++innerVectorsCurrentFile;
0162 
0163       assert(nInputProcesses + nAddedProcesses_ == nStoredProcesses);
0164 
0165       // Really fill the cache indices that will be stored in the output file in this loop
0166       for (unsigned int iStoredProcess = 0; iStoredProcess < nStoredProcesses; ++iStoredProcess) {
0167         unsigned int storedCacheIndex = ProcessBlockHelperBase::invalidCacheIndex();
0168         if (iStoredProcess < nInputProcesses) {
0169           unsigned int cacheIndex = innerVectorOfCacheIndices[translateFromStoredIndex_[iStoredProcess]];
0170           if (cacheIndex != ProcessBlockHelperBase::invalidCacheIndex()) {
0171             // The offsets count in the cache sequence to the first entry in
0172             // the current input file and process
0173             unsigned int inputOffset = fileOffset + processOffset[iStoredProcess];
0174             unsigned int storedOffset = storedProcessOffset[iStoredProcess] + storedFileInProcessOffset[iStoredProcess];
0175             storedCacheIndex = cacheIndex - inputOffset + storedOffset;
0176           }
0177         } else {
0178           // This corresponds to the current process if it has newly produced
0179           // ProcessBlock products (plus possibly SubProcesses).
0180           storedCacheIndex = storedProcessOffset[nInputProcesses] + iStoredProcess - nInputProcesses;
0181         }
0182         storedCacheIndices.push_back(storedCacheIndex);
0183       }
0184     }
0185     storedProcessBlockHelper.setProcessBlockCacheIndices(std::move(storedCacheIndices));
0186   }
0187 
0188   void OutputProcessBlockHelper::setStoredProcessOffset(unsigned int nInputProcesses,
0189                                                         std::vector<std::vector<unsigned int>> const& nEntries,
0190                                                         std::vector<unsigned int>& storedProcessOffset) const {
0191     unsigned int iStored = 0;
0192     for (auto& offset : storedProcessOffset) {
0193       offset = 0;
0194       // loop over earlier processes
0195       for (unsigned int jStored = 0; jStored < iStored; ++jStored) {
0196         unsigned int indexInEventProcessor = translateFromStoredIndex_[jStored];
0197         // loop over input files
0198         for (auto const& entries : nEntries) {
0199           assert(indexInEventProcessor < entries.size());
0200           offset += entries[indexInEventProcessor];
0201         }
0202       }
0203       ++iStored;
0204     }
0205   }
0206 
0207   void OutputProcessBlockHelper::setProcessOffset(unsigned int iFile,
0208                                                   unsigned int nInputProcesses,
0209                                                   std::vector<std::vector<unsigned int>> const& nEntries,
0210                                                   std::vector<unsigned int>& processOffset) const {
0211     unsigned int iStored = 0;
0212     for (auto& offset : processOffset) {
0213       offset = 0;
0214       unsigned int iProcess = translateFromStoredIndex_[iStored];
0215       for (unsigned int jProcess = 0; jProcess < iProcess; ++jProcess) {
0216         offset += nEntries[iFile][jProcess];
0217       }
0218       ++iStored;
0219     }
0220   }
0221 
0222   void OutputProcessBlockHelper::setStoredFileInProcessOffset(
0223       unsigned int iFile,
0224       unsigned int nInputProcesses,
0225       std::vector<std::vector<unsigned int>> const& nEntries,
0226       std::vector<unsigned int>& storedFileInProcessOffset) const {
0227     unsigned int iStored = 0;
0228     for (auto& offset : storedFileInProcessOffset) {
0229       offset = 0;
0230       unsigned int indexInEventProcessor = translateFromStoredIndex_[iStored];
0231       // loop over input files before current input file
0232       for (unsigned int jFile = 0; jFile < iFile; ++jFile) {
0233         assert(indexInEventProcessor < nEntries[jFile].size());
0234         offset += nEntries[jFile][indexInEventProcessor];
0235       }
0236       ++iStored;
0237     }
0238   }
0239 
0240 }  // namespace edm