Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-22 06:27:20

0001 
0002 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0003 
0004 #include "DataFormats/Provenance/interface/IndexIntoFile.h"
0005 #include "DataFormats/Provenance/interface/StoredMergeableRunProductMetadata.h"
0006 #include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
0007 #include "FWCore/Utilities/interface/EDMException.h"
0008 
0009 #include <algorithm>
0010 #include <memory>
0011 
0012 namespace edm {
0013 
0014   MergeableRunProductMetadata::MergeableRunProductMetadata(
0015       MergeableRunProductProcesses const& mergeableRunProductProcesses)
0016       : mergeableRunProductProcesses_(&mergeableRunProductProcesses),
0017         metadataForProcesses_(mergeableRunProductProcesses.size()) {}
0018 
0019   MergeableRunProductMetadata::~MergeableRunProductMetadata() {}
0020 
0021   void MergeableRunProductMetadata::preReadFile() { mergeLumisFromIndexIntoFile(); }
0022 
0023   void MergeableRunProductMetadata::readRun(
0024       long long inputRunEntry,
0025       StoredMergeableRunProductMetadata const& inputStoredMergeableRunProductMetadata,
0026       IndexIntoFileItrHolder const& inputIndexIntoFileItr) {
0027     unsigned int processIndex{0};
0028     for (auto& metadataForProcess : metadataForProcesses_) {
0029       bool valid = true;
0030       std::vector<LuminosityBlockNumber_t>::const_iterator lumisInRunBeingReadBegin;
0031       std::vector<LuminosityBlockNumber_t>::const_iterator lumisInRunBeingReadEnd;
0032 
0033       std::string const& processName = mergeableRunProductProcesses_->processesWithMergeableRunProducts()[processIndex];
0034 
0035       if (inputStoredMergeableRunProductMetadata.getLumiContent(
0036               inputRunEntry, processName, valid, lumisInRunBeingReadBegin, lumisInRunBeingReadEnd)) {
0037         // This is a reference to the container accumulating the luminosity
0038         // block numbers for the run entries read associated with the current
0039         // run being processed that correspond to the luminosity block content
0040         // for the mergeable run products created in the process.
0041         std::vector<LuminosityBlockNumber_t>& lumis = metadataForProcess.lumis();
0042 
0043         // In the following, iter1 refers to the lumis associated with run entries already read
0044         // and iter2 refers to the lumis associated with the current run entry being read.
0045 
0046         bool elementsIn2NotIn1 = false;
0047         bool elementsIn1NotIn2 = false;
0048         bool sharedElements = false;
0049 
0050         std::vector<LuminosityBlockNumber_t> temp;
0051         temp.reserve(lumis.size() + (lumisInRunBeingReadEnd - lumisInRunBeingReadBegin));
0052         std::vector<LuminosityBlockNumber_t>::const_iterator end1 = lumis.end();
0053         std::vector<LuminosityBlockNumber_t>::const_iterator end2 = lumisInRunBeingReadEnd;
0054         for (std::vector<LuminosityBlockNumber_t>::const_iterator iter1 = lumis.begin(),
0055                                                                   iter2 = lumisInRunBeingReadBegin;
0056              iter1 != end1 || iter2 != end2;) {
0057           if (iter1 == end1) {
0058             temp.push_back(*iter2);
0059             ++iter2;
0060             elementsIn2NotIn1 = true;
0061             continue;
0062           } else if (iter2 == end2) {
0063             temp.push_back(*iter1);
0064             ++iter1;
0065             elementsIn1NotIn2 = true;
0066             continue;
0067           } else if (*iter1 < *iter2) {
0068             temp.push_back(*iter1);
0069             ++iter1;
0070             elementsIn1NotIn2 = true;
0071           } else if (*iter1 > *iter2) {
0072             temp.push_back(*iter2);
0073             ++iter2;
0074             elementsIn2NotIn1 = true;
0075           } else {
0076             // they must be equal
0077             sharedElements = true;
0078             temp.push_back(*iter1);
0079             ++iter1;
0080             ++iter2;
0081           }
0082         }
0083         lumis.swap(temp);
0084         if (!sharedElements && elementsIn2NotIn1 && elementsIn1NotIn2) {
0085           metadataForProcess.setMergeDecision(MERGE);
0086           if (!valid) {
0087             metadataForProcess.setValid(false);
0088           }
0089         } else if (!elementsIn2NotIn1) {
0090           metadataForProcess.setMergeDecision(IGNORE);
0091         } else if (!elementsIn1NotIn2) {
0092           metadataForProcess.setMergeDecision(REPLACE);
0093           if (!valid) {
0094             metadataForProcess.setValid(false);
0095           }
0096         } else {
0097           // In this case there is no way to get the correct answer.
0098           // The result will always be invalid.
0099           metadataForProcess.setMergeDecision(MERGE);
0100           metadataForProcess.setValid(false);
0101         }
0102 
0103       } else {
0104         metadataForProcess.setMergeDecision(MERGE);
0105         if (!valid) {
0106           metadataForProcess.setValid(false);
0107         }
0108         metadataForProcess.setUseIndexIntoFile(true);
0109         if (!gotLumisFromIndexIntoFile_) {
0110           inputIndexIntoFileItr.getLumisInRun(lumisFromIndexIntoFile_);
0111           gotLumisFromIndexIntoFile_ = true;
0112         }
0113       }
0114       ++processIndex;
0115     }  // end of loop over processes
0116   }  // end of readRun function
0117 
0118   void MergeableRunProductMetadata::writeLumi(LuminosityBlockNumber_t lumi) {
0119     if (metadataForProcesses_.empty()) {
0120       return;
0121     }
0122     lumisProcessed_.push_back(lumi);
0123   }
0124 
0125   void MergeableRunProductMetadata::preWriteRun() {
0126     if (metadataForProcesses_.empty()) {
0127       return;
0128     }
0129 
0130     mergeLumisFromIndexIntoFile();
0131 
0132     // Sort the lumiProcessed vector and ignore the duplicate
0133     // entries
0134 
0135     // Not sure if this copy is necessary. I'm copying because
0136     // I am not sure the standard algorithms work on TBB containers.
0137     // I couldn't find anything saying they did when I searched ...
0138     std::vector<LuminosityBlockNumber_t> lumisProcessed;
0139     lumisProcessed.reserve(lumisProcessed_.size());
0140     for (auto const& lumi : lumisProcessed_) {
0141       lumisProcessed.push_back(lumi);
0142     }
0143 
0144     std::sort(lumisProcessed.begin(), lumisProcessed.end());
0145     auto uniqueEnd = std::unique(lumisProcessed.begin(), lumisProcessed.end());
0146 
0147     for (auto& metadataForProcess : metadataForProcesses_) {
0148       // Did we process all the lumis in this process that were processed
0149       // in the process that created the mergeable run products.
0150       metadataForProcess.setAllLumisProcessed(std::includes(
0151           lumisProcessed.begin(), uniqueEnd, metadataForProcess.lumis().begin(), metadataForProcess.lumis().end()));
0152     }
0153   }
0154 
0155   void MergeableRunProductMetadata::postWriteRun() {
0156     lumisProcessed_.clear();
0157     for (auto& metadataForProcess : metadataForProcesses_) {
0158       metadataForProcess.reset();
0159     }
0160   }
0161 
0162   void MergeableRunProductMetadata::addEntryToStoredMetadata(StoredMergeableRunProductMetadata& storedMetadata) const {
0163     if (metadataForProcesses_.empty()) {
0164       return;
0165     }
0166 
0167     std::vector<std::string> const& storedProcesses = storedMetadata.processesWithMergeableRunProducts();
0168     if (storedProcesses.empty()) {
0169       return;
0170     }
0171 
0172     unsigned long long beginProcess = storedMetadata.singleRunEntryAndProcesses().size();
0173     unsigned long long endProcess = beginProcess;
0174 
0175     std::vector<std::string> const& processesWithMergeableRunProducts =
0176         mergeableRunProductProcesses_->processesWithMergeableRunProducts();
0177 
0178     for (unsigned int storedProcessIndex = 0; storedProcessIndex < storedProcesses.size(); ++storedProcessIndex) {
0179       // Look for a matching process. It is intentional that no process
0180       // is added when there is no match. storedProcesses only contains
0181       // processes which created mergeable run products selected by the
0182       // output module to be written out. processesWithMergeableRunProducts_
0183       // only has processes which created mergeable run products that were
0184       // read from the input data files. Note storedProcesses may be
0185       // missing processes because the output module dropped products.
0186       for (unsigned int transientProcessIndex = 0; transientProcessIndex < processesWithMergeableRunProducts.size();
0187            ++transientProcessIndex) {
0188         // This string comparison could be optimized away by storing an index mapping in
0189         // OutputModuleBase calculated once early in a job. (? Given how rare
0190         // mergeable run products are this optimization may not be worth doing)
0191         if (processesWithMergeableRunProducts[transientProcessIndex] == storedProcesses[storedProcessIndex]) {
0192           if (addProcess(storedMetadata,
0193                          metadataForProcesses_.at(transientProcessIndex),
0194                          storedProcessIndex,
0195                          beginProcess,
0196                          endProcess)) {
0197             ++endProcess;
0198           }
0199           break;
0200         }
0201       }
0202     }
0203     storedMetadata.singleRunEntries().emplace_back(beginProcess, endProcess);
0204   }
0205 
0206   bool MergeableRunProductMetadata::addProcess(StoredMergeableRunProductMetadata& storedMetadata,
0207                                                MetadataForProcess const& metadataForProcess,
0208                                                unsigned int storedProcessIndex,
0209                                                unsigned long long beginProcess,
0210                                                unsigned long long endProcess) const {
0211     if (metadataForProcess.valid() && metadataForProcess.allLumisProcessed()) {
0212       return false;
0213     }
0214 
0215     storedMetadata.allValidAndUseIndexIntoFile() = false;
0216 
0217     unsigned long long iBeginLumi = 0;
0218     unsigned long long iEndLumi = 0;
0219 
0220     // See if we need to store the set of lumi numbers corresponding
0221     // to this process and run entry. If they were all processed then
0222     // we can just get the lumi numbers out of IndexIntoFile and do
0223     // not need to store them here
0224     if (!metadataForProcess.allLumisProcessed()) {
0225       // If we need to store the numbers, then we can check to
0226       // make sure this does not duplicate the lumi numbers we
0227       // stored for another process. If we did then we can just
0228       // just reference same indices and avoid copying a duplicate
0229       // sequence of lumi numbers. It is sufficient to check the
0230       // size only. As you go back in the time sequence of processes
0231       // the only thing that can happen is more lumi numbers appear
0232       // at steps where a run was only partially processed.
0233       bool found = false;
0234       for (unsigned long long kProcess = beginProcess; kProcess < endProcess; ++kProcess) {
0235         StoredMergeableRunProductMetadata::SingleRunEntryAndProcess const& storedSingleRunEntryAndProcess =
0236             storedMetadata.singleRunEntryAndProcesses().at(kProcess);
0237 
0238         if (metadataForProcess.lumis().size() ==
0239             (storedSingleRunEntryAndProcess.endLumi() - storedSingleRunEntryAndProcess.beginLumi())) {
0240           iBeginLumi = storedSingleRunEntryAndProcess.beginLumi();
0241           iEndLumi = storedSingleRunEntryAndProcess.endLumi();
0242           found = true;
0243           break;
0244         }
0245       }
0246       if (!found) {
0247         std::vector<LuminosityBlockNumber_t>& storedLumis = storedMetadata.lumis();
0248         std::vector<LuminosityBlockNumber_t> const& metdataLumis = metadataForProcess.lumis();
0249         iBeginLumi = storedLumis.size();
0250         storedLumis.insert(storedLumis.end(), metdataLumis.begin(), metdataLumis.end());
0251         iEndLumi = storedLumis.size();
0252       }
0253     }
0254     storedMetadata.singleRunEntryAndProcesses().emplace_back(
0255         iBeginLumi, iEndLumi, storedProcessIndex, metadataForProcess.valid(), metadataForProcess.allLumisProcessed());
0256     return true;
0257   }
0258 
0259   MergeableRunProductMetadata::MergeDecision MergeableRunProductMetadata::getMergeDecision(
0260       std::string const& processThatCreatedProduct) const {
0261     MetadataForProcess const* metadataForProcess = metadataForOneProcess(processThatCreatedProduct);
0262     if (metadataForProcess) {
0263       return metadataForProcess->mergeDecision();
0264     }
0265     throw Exception(errors::LogicError) << "MergeableRunProductMetadata::getMergeDecision could not find process.\n"
0266                                         << "It should not be possible for this error to occur.\n"
0267                                         << "Contact a Framework developer\n";
0268     return MERGE;
0269   }
0270 
0271   bool MergeableRunProductMetadata::knownImproperlyMerged(std::string const& processThatCreatedProduct) const {
0272     MetadataForProcess const* metadataForProcess = metadataForOneProcess(processThatCreatedProduct);
0273     if (metadataForProcess) {
0274       return !metadataForProcess->valid();
0275     }
0276     return false;
0277   }
0278 
0279   void MergeableRunProductMetadata::MetadataForProcess::reset() {
0280     lumis_.clear();
0281     mergeDecision_ = MERGE;
0282     useIndexIntoFile_ = false;
0283     valid_ = true;
0284     allLumisProcessed_ = false;
0285   }
0286 
0287   MergeableRunProductMetadata::MetadataForProcess const* MergeableRunProductMetadata::metadataForOneProcess(
0288       std::string const& processName) const {
0289     unsigned int processIndex = 0;
0290     for (auto const& metadataForProcess : metadataForProcesses_) {
0291       // This string comparison could be optimized away by storing an index in
0292       // ProductDescription as a transient calculated once early in a job.
0293       if (getProcessName(processIndex) == processName) {
0294         return &metadataForProcess;
0295       }
0296       ++processIndex;
0297     }
0298     return nullptr;
0299   }
0300 
0301   void MergeableRunProductMetadata::mergeLumisFromIndexIntoFile() {
0302     for (auto& metadataForProcess : metadataForProcesses_) {
0303       if (metadataForProcess.useIndexIntoFile()) {
0304         metadataForProcess.setUseIndexIntoFile(false);
0305 
0306         std::vector<LuminosityBlockNumber_t> temp;
0307         temp.reserve(metadataForProcess.lumis().size() + lumisFromIndexIntoFile_.size());
0308         std::vector<LuminosityBlockNumber_t>::const_iterator end1 = metadataForProcess.lumis().end();
0309         std::vector<LuminosityBlockNumber_t>::const_iterator end2 = lumisFromIndexIntoFile_.end();
0310         for (std::vector<LuminosityBlockNumber_t>::const_iterator iter1 = metadataForProcess.lumis().begin(),
0311                                                                   iter2 = lumisFromIndexIntoFile_.begin();
0312              iter1 != end1 || iter2 != end2;) {
0313           if (iter1 == end1) {
0314             temp.push_back(*iter2);
0315             ++iter2;
0316             continue;
0317           } else if (iter2 == end2) {
0318             temp.push_back(*iter1);
0319             ++iter1;
0320             continue;
0321           } else if (*iter1 < *iter2) {
0322             temp.push_back(*iter1);
0323             ++iter1;
0324           } else if (*iter1 > *iter2) {
0325             temp.push_back(*iter2);
0326             ++iter2;
0327           } else {
0328             // they must be equal
0329             temp.push_back(*iter1);
0330             ++iter1;
0331             ++iter2;
0332           }
0333         }
0334         metadataForProcess.lumis().swap(temp);
0335       }
0336     }
0337     lumisFromIndexIntoFile_.clear();
0338     gotLumisFromIndexIntoFile_ = false;
0339   }
0340 }  // namespace edm