Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-31 02:19:24

0001 
0002 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0003 
0004 #include "DataFormats/Provenance/interface/IndexIntoFile.h"
0005 #include "DataFormats/Provenance/interface/StoredMergeableRunProductMetadata.h"
0006 #include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
0007 #include "FWCore/Utilities/interface/EDMException.h"
0008 
0009 #include <algorithm>
0010 #include <memory>
0011 
0012 namespace edm {
0013 
0014   MergeableRunProductMetadata::MergeableRunProductMetadata(
0015       MergeableRunProductProcesses const& mergeableRunProductProcesses)
0016       : mergeableRunProductProcesses_(&mergeableRunProductProcesses),
0017         metadataForProcesses_(mergeableRunProductProcesses.size()) {}
0018 
0019   MergeableRunProductMetadata::~MergeableRunProductMetadata() {}
0020 
0021   void MergeableRunProductMetadata::preReadFile() { mergeLumisFromIndexIntoFile(); }
0022 
0023   void MergeableRunProductMetadata::readRun(
0024       long long inputRunEntry,
0025       StoredMergeableRunProductMetadata const& inputStoredMergeableRunProductMetadata,
0026       IndexIntoFileItrHolder const& inputIndexIntoFileItr) {
0027     unsigned int processIndex{0};
0028     for (auto& metadataForProcess : metadataForProcesses_) {
0029       bool valid = true;
0030       std::vector<LuminosityBlockNumber_t>::const_iterator lumisInRunBeingReadBegin;
0031       std::vector<LuminosityBlockNumber_t>::const_iterator lumisInRunBeingReadEnd;
0032 
0033       std::string const& processName = mergeableRunProductProcesses_->processesWithMergeableRunProducts()[processIndex];
0034 
0035       if (inputStoredMergeableRunProductMetadata.getLumiContent(
0036               inputRunEntry, processName, valid, lumisInRunBeingReadBegin, lumisInRunBeingReadEnd)) {
0037         // This is a reference to the container accumulating the luminosity
0038         // block numbers for the run entries read associated with the current
0039         // run being processed that correspond to the luminosity block content
0040         // for the mergeable run products created in the process.
0041         std::vector<LuminosityBlockNumber_t>& lumis = metadataForProcess.lumis();
0042 
0043         // In the following, iter1 refers to the lumis associated with run entries already read
0044         // and iter2 refers to the lumis associated with the current run entry being read.
0045 
0046         bool elementsIn2NotIn1 = false;
0047         bool elementsIn1NotIn2 = false;
0048         bool sharedElements = false;
0049 
0050         std::vector<LuminosityBlockNumber_t> temp;
0051         temp.reserve(lumis.size() + (lumisInRunBeingReadEnd - lumisInRunBeingReadBegin));
0052         std::vector<LuminosityBlockNumber_t>::const_iterator end1 = lumis.end();
0053         std::vector<LuminosityBlockNumber_t>::const_iterator end2 = lumisInRunBeingReadEnd;
0054         for (std::vector<LuminosityBlockNumber_t>::const_iterator iter1 = lumis.begin(),
0055                                                                   iter2 = lumisInRunBeingReadBegin;
0056              iter1 != end1 || iter2 != end2;) {
0057           if (iter1 == end1) {
0058             temp.push_back(*iter2);
0059             ++iter2;
0060             elementsIn2NotIn1 = true;
0061             continue;
0062           } else if (iter2 == end2) {
0063             temp.push_back(*iter1);
0064             ++iter1;
0065             elementsIn1NotIn2 = true;
0066             continue;
0067           } else if (*iter1 < *iter2) {
0068             temp.push_back(*iter1);
0069             ++iter1;
0070             elementsIn1NotIn2 = true;
0071           } else if (*iter1 > *iter2) {
0072             temp.push_back(*iter2);
0073             ++iter2;
0074             elementsIn2NotIn1 = true;
0075           } else {
0076             // they must be equal
0077             sharedElements = true;
0078             temp.push_back(*iter1);
0079             ++iter1;
0080             ++iter2;
0081           }
0082         }
0083         lumis.swap(temp);
0084         if (!sharedElements && elementsIn2NotIn1 && elementsIn1NotIn2) {
0085           metadataForProcess.setMergeDecision(MERGE);
0086           if (!valid) {
0087             metadataForProcess.setValid(false);
0088           }
0089         } else if (!elementsIn2NotIn1) {
0090           metadataForProcess.setMergeDecision(IGNORE);
0091         } else if (!elementsIn1NotIn2) {
0092           metadataForProcess.setMergeDecision(REPLACE);
0093           if (!valid) {
0094             metadataForProcess.setValid(false);
0095           }
0096         } else {
0097           // In this case there is no way to get the correct answer.
0098           // The result will always be invalid.
0099           metadataForProcess.setMergeDecision(MERGE);
0100           metadataForProcess.setValid(false);
0101         }
0102 
0103       } else {
0104         metadataForProcess.setMergeDecision(MERGE);
0105         if (!valid) {
0106           metadataForProcess.setValid(false);
0107         }
0108         metadataForProcess.setUseIndexIntoFile(true);
0109         if (!gotLumisFromIndexIntoFile_) {
0110           inputIndexIntoFileItr.getLumisInRun(lumisFromIndexIntoFile_);
0111           gotLumisFromIndexIntoFile_ = true;
0112         }
0113       }
0114       ++processIndex;
0115     }  // end of loop over processes
0116   }  // end of readRun function
0117 
0118   void MergeableRunProductMetadata::writeLumi(LuminosityBlockNumber_t lumi) {
0119     if (metadataForProcesses_.empty()) {
0120       return;
0121     }
0122     lumisProcessed_.push_back(lumi);
0123   }
0124 
0125   void MergeableRunProductMetadata::preWriteRun() {
0126     if (metadataForProcesses_.empty()) {
0127       return;
0128     }
0129 
0130     mergeLumisFromIndexIntoFile();
0131 
0132     // Sort the lumiProcessed vector and ignore the duplicate
0133     // entries
0134 
0135     // Not sure if this copy is necessary. I'm copying because
0136     // I am not sure the standard algorithms work on TBB containers.
0137     // I couldn't find anything saying they did when I searched ...
0138     std::vector<LuminosityBlockNumber_t> lumisProcessed;
0139     lumisProcessed.reserve(lumisProcessed_.size());
0140     for (auto const& lumi : lumisProcessed_) {
0141       lumisProcessed.push_back(lumi);
0142     }
0143 
0144     std::sort(lumisProcessed.begin(), lumisProcessed.end());
0145     auto uniqueEnd = std::unique(lumisProcessed.begin(), lumisProcessed.end());
0146 
0147     for (auto& metadataForProcess : metadataForProcesses_) {
0148       // Did we process all the lumis in this process that were processed
0149       // in the process that created the mergeable run products.
0150       metadataForProcess.setAllLumisProcessed(std::includes(
0151           lumisProcessed.begin(), uniqueEnd, metadataForProcess.lumis().begin(), metadataForProcess.lumis().end()));
0152     }
0153   }
0154 
0155   void MergeableRunProductMetadata::postWriteRun() {
0156     lumisProcessed_.clear();
0157     for (auto& metadataForProcess : metadataForProcesses_) {
0158       metadataForProcess.reset();
0159     }
0160   }
0161 
0162   void MergeableRunProductMetadata::addEntryToStoredMetadata(StoredMergeableRunProductMetadata& storedMetadata) const {
0163     if (metadataForProcesses_.empty()) {
0164       return;
0165     }
0166 
0167     std::vector<std::string> const& storedProcesses = storedMetadata.processesWithMergeableRunProducts();
0168     if (storedProcesses.empty()) {
0169       return;
0170     }
0171 
0172     unsigned long long beginProcess = storedMetadata.singleRunEntryAndProcesses().size();
0173     unsigned long long endProcess = beginProcess;
0174 
0175     std::vector<std::string> const& processesWithMergeableRunProducts =
0176         mergeableRunProductProcesses_->processesWithMergeableRunProducts();
0177 
0178     for (unsigned int storedProcessIndex = 0; storedProcessIndex < storedProcesses.size(); ++storedProcessIndex) {
0179       // Look for a matching process. It is intentional that no process
0180       // is added when there is no match. storedProcesses only contains
0181       // processes which created mergeable run products selected by the
0182       // output module to be written out. processesWithMergeableRunProducts_
0183       // only has processes which created mergeable run products that were
0184       // read from the input data files. Note storedProcesses may be
0185       // missing processes because the output module dropped products.
0186       // The other vector may be missing processes because of SubProcesses.
0187       for (unsigned int transientProcessIndex = 0; transientProcessIndex < processesWithMergeableRunProducts.size();
0188            ++transientProcessIndex) {
0189         // This string comparison could be optimized away by storing an index mapping in
0190         // OutputModuleBase calculated once early in a job. (? Given how rare
0191         // mergeable run products are this optimization may not be worth doing)
0192         if (processesWithMergeableRunProducts[transientProcessIndex] == storedProcesses[storedProcessIndex]) {
0193           if (addProcess(storedMetadata,
0194                          metadataForProcesses_.at(transientProcessIndex),
0195                          storedProcessIndex,
0196                          beginProcess,
0197                          endProcess)) {
0198             ++endProcess;
0199           }
0200           break;
0201         }
0202       }
0203     }
0204     storedMetadata.singleRunEntries().emplace_back(beginProcess, endProcess);
0205   }
0206 
0207   bool MergeableRunProductMetadata::addProcess(StoredMergeableRunProductMetadata& storedMetadata,
0208                                                MetadataForProcess const& metadataForProcess,
0209                                                unsigned int storedProcessIndex,
0210                                                unsigned long long beginProcess,
0211                                                unsigned long long endProcess) const {
0212     if (metadataForProcess.valid() && metadataForProcess.allLumisProcessed()) {
0213       return false;
0214     }
0215 
0216     storedMetadata.allValidAndUseIndexIntoFile() = false;
0217 
0218     unsigned long long iBeginLumi = 0;
0219     unsigned long long iEndLumi = 0;
0220 
0221     // See if we need to store the set of lumi numbers corresponding
0222     // to this process and run entry. If they were all processed then
0223     // we can just get the lumi numbers out of IndexIntoFile and do
0224     // not need to store them here
0225     if (!metadataForProcess.allLumisProcessed()) {
0226       // If we need to store the numbers, then we can check to
0227       // make sure this does not duplicate the lumi numbers we
0228       // stored for another process. If we did then we can just
0229       // just reference same indices and avoid copying a duplicate
0230       // sequence of lumi numbers. It is sufficient to check the
0231       // size only. As you go back in the time sequence of processes
0232       // the only thing that can happen is more lumi numbers appear
0233       // at steps where a run was only partially processed.
0234       bool found = false;
0235       for (unsigned long long kProcess = beginProcess; kProcess < endProcess; ++kProcess) {
0236         StoredMergeableRunProductMetadata::SingleRunEntryAndProcess const& storedSingleRunEntryAndProcess =
0237             storedMetadata.singleRunEntryAndProcesses().at(kProcess);
0238 
0239         if (metadataForProcess.lumis().size() ==
0240             (storedSingleRunEntryAndProcess.endLumi() - storedSingleRunEntryAndProcess.beginLumi())) {
0241           iBeginLumi = storedSingleRunEntryAndProcess.beginLumi();
0242           iEndLumi = storedSingleRunEntryAndProcess.endLumi();
0243           found = true;
0244           break;
0245         }
0246       }
0247       if (!found) {
0248         std::vector<LuminosityBlockNumber_t>& storedLumis = storedMetadata.lumis();
0249         std::vector<LuminosityBlockNumber_t> const& metdataLumis = metadataForProcess.lumis();
0250         iBeginLumi = storedLumis.size();
0251         storedLumis.insert(storedLumis.end(), metdataLumis.begin(), metdataLumis.end());
0252         iEndLumi = storedLumis.size();
0253       }
0254     }
0255     storedMetadata.singleRunEntryAndProcesses().emplace_back(
0256         iBeginLumi, iEndLumi, storedProcessIndex, metadataForProcess.valid(), metadataForProcess.allLumisProcessed());
0257     return true;
0258   }
0259 
0260   MergeableRunProductMetadata::MergeDecision MergeableRunProductMetadata::getMergeDecision(
0261       std::string const& processThatCreatedProduct) const {
0262     MetadataForProcess const* metadataForProcess = metadataForOneProcess(processThatCreatedProduct);
0263     if (metadataForProcess) {
0264       return metadataForProcess->mergeDecision();
0265     }
0266     throw Exception(errors::LogicError) << "MergeableRunProductMetadata::getMergeDecision could not find process.\n"
0267                                         << "It should not be possible for this error to occur.\n"
0268                                         << "Contact a Framework developer\n";
0269     return MERGE;
0270   }
0271 
0272   bool MergeableRunProductMetadata::knownImproperlyMerged(std::string const& processThatCreatedProduct) const {
0273     MetadataForProcess const* metadataForProcess = metadataForOneProcess(processThatCreatedProduct);
0274     if (metadataForProcess) {
0275       return !metadataForProcess->valid();
0276     }
0277     return false;
0278   }
0279 
0280   void MergeableRunProductMetadata::MetadataForProcess::reset() {
0281     lumis_.clear();
0282     mergeDecision_ = MERGE;
0283     useIndexIntoFile_ = false;
0284     valid_ = true;
0285     allLumisProcessed_ = false;
0286   }
0287 
0288   MergeableRunProductMetadata::MetadataForProcess const* MergeableRunProductMetadata::metadataForOneProcess(
0289       std::string const& processName) const {
0290     unsigned int processIndex = 0;
0291     for (auto const& metadataForProcess : metadataForProcesses_) {
0292       // This string comparison could be optimized away by storing an index in
0293       // ProductDescription as a transient calculated once early in a job.
0294       if (getProcessName(processIndex) == processName) {
0295         return &metadataForProcess;
0296       }
0297       ++processIndex;
0298     }
0299     return nullptr;
0300   }
0301 
0302   void MergeableRunProductMetadata::mergeLumisFromIndexIntoFile() {
0303     for (auto& metadataForProcess : metadataForProcesses_) {
0304       if (metadataForProcess.useIndexIntoFile()) {
0305         metadataForProcess.setUseIndexIntoFile(false);
0306 
0307         std::vector<LuminosityBlockNumber_t> temp;
0308         temp.reserve(metadataForProcess.lumis().size() + lumisFromIndexIntoFile_.size());
0309         std::vector<LuminosityBlockNumber_t>::const_iterator end1 = metadataForProcess.lumis().end();
0310         std::vector<LuminosityBlockNumber_t>::const_iterator end2 = lumisFromIndexIntoFile_.end();
0311         for (std::vector<LuminosityBlockNumber_t>::const_iterator iter1 = metadataForProcess.lumis().begin(),
0312                                                                   iter2 = lumisFromIndexIntoFile_.begin();
0313              iter1 != end1 || iter2 != end2;) {
0314           if (iter1 == end1) {
0315             temp.push_back(*iter2);
0316             ++iter2;
0317             continue;
0318           } else if (iter2 == end2) {
0319             temp.push_back(*iter1);
0320             ++iter1;
0321             continue;
0322           } else if (*iter1 < *iter2) {
0323             temp.push_back(*iter1);
0324             ++iter1;
0325           } else if (*iter1 > *iter2) {
0326             temp.push_back(*iter2);
0327             ++iter2;
0328           } else {
0329             // they must be equal
0330             temp.push_back(*iter1);
0331             ++iter1;
0332             ++iter2;
0333           }
0334         }
0335         metadataForProcess.lumis().swap(temp);
0336       }
0337     }
0338     lumisFromIndexIntoFile_.clear();
0339     gotLumisFromIndexIntoFile_ = false;
0340   }
0341 }  // namespace edm