Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-31 02:19:24

0001 // -*- C++ -*-
0002 //
0003 // Package:     FWCore/Framework
0004 // Class  :     OutputModuleCore
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Wed, 31 Jul 2013 15:59:19 GMT
0011 //
0012 
0013 // system include files
0014 #include <cassert>
0015 
0016 // user include files
0017 #include "FWCore/Framework/interface/OutputModuleCore.h"
0018 
0019 #include "DataFormats/Common/interface/Handle.h"
0020 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0021 #include "DataFormats/Common/interface/EndPathStatus.h"
0022 #include "DataFormats/Provenance/interface/ProductDescription.h"
0023 #include "DataFormats/Provenance/interface/BranchKey.h"
0024 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0025 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0026 #include "FWCore/Framework/interface/SignallingProductRegistry.h"
0027 #include "FWCore/Framework/interface/EventForOutput.h"
0028 #include "FWCore/Framework/interface/EventPrincipal.h"
0029 #include "FWCore/Framework/src/insertSelectedProcesses.h"
0030 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0031 #include "FWCore/Framework/interface/ProcessBlockForOutput.h"
0032 #include "FWCore/Framework/interface/RunForOutput.h"
0033 #include "FWCore/Framework/src/OutputModuleDescription.h"
0034 #include "FWCore/Framework/interface/TriggerNamesService.h"
0035 #include "FWCore/Framework/src/EventSignalsSentry.h"
0036 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0037 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0038 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0039 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0040 #include "FWCore/ServiceRegistry/interface/Service.h"
0041 #include "FWCore/Utilities/interface/DebugMacros.h"
0042 #include "FWCore/Reflection/interface/DictionaryTools.h"
0043 
0044 namespace edm {
0045   namespace core {
0046 
0047     // -------------------------------------------------------
0048     OutputModuleCore::OutputModuleCore(ParameterSet const& pset)
0049         : remainingEvents_(-1),
0050           maxEvents_(-1),
0051           keptProducts_(),
0052           hasNewlyDroppedBranch_(),
0053           process_name_(),
0054           productSelectorRules_(pset, "outputCommands", "OutputModule"),
0055           productSelector_(),
0056           moduleDescription_(),
0057           wantAllEvents_(false),
0058           selectors_(),
0059           selector_config_id_(),
0060           droppedBranchIDToKeptBranchID_(),
0061           branchIDLists_(new BranchIDLists),
0062           origBranchIDLists_(nullptr),
0063           thinnedAssociationsHelper_(new ThinnedAssociationsHelper) {
0064       hasNewlyDroppedBranch_.fill(false);
0065 
0066       Service<service::TriggerNamesService> tns;
0067       process_name_ = tns->getProcessName();
0068 
0069       selectEvents_ = pset.getUntrackedParameterSet("SelectEvents", ParameterSet());
0070 
0071       selectEvents_.registerIt();  // Just in case this PSet is not registered
0072 
0073       selector_config_id_ = selectEvents_.id();
0074 
0075       //need to set wantAllEvents_ in constructor
0076       // we will make the remaining selectors once we know how many streams
0077       selectors_.resize(1);
0078       wantAllEvents_ = detail::configureEventSelector(
0079           selectEvents_, process_name_, getAllTriggerNames(), selectors_[0], consumesCollector());
0080 
0081       //Check if on final path
0082       if (pset.exists("@onFinalPath")) {
0083         onFinalPath_ = pset.getUntrackedParameter<bool>("@onFinalPath");
0084       }
0085       if (onFinalPath_) {
0086         wantAllEvents_ = false;
0087         if (not getAllTriggerNames().empty() and selectors_.front().numberOfTokens() == 0) {
0088           //need to wait for trigger paths to finish
0089           tokensForEndPaths_.push_back(consumes<TriggerResults>(edm::InputTag("TriggerResults", "", process_name_)));
0090         }
0091         //need to wait for EndPaths to finish
0092         for (auto const& n : tns->getEndPaths()) {
0093           if (n == "@finalPath") {
0094             continue;
0095           }
0096           tokensForEndPaths_.push_back(consumes<EndPathStatus>(edm::InputTag(n, "", process_name_)));
0097         }
0098       }
0099     }
0100 
0101     void OutputModuleCore::configure(OutputModuleDescription const& desc) {
0102       remainingEvents_ = maxEvents_ = desc.maxEvents_;
0103       origBranchIDLists_ = desc.branchIDLists_;
0104     }
0105 
0106     void OutputModuleCore::selectProducts(ProductRegistry const& preg,
0107                                           ThinnedAssociationsHelper const& thinnedAssociationsHelper,
0108                                           ProcessBlockHelperBase const& processBlockHelper) {
0109       if (productSelector_.initialized())
0110         return;
0111       productSelector_.initialize(productSelectorRules_, preg.allProductDescriptions());
0112 
0113       // TODO: See if we can collapse keptProducts_ and productSelector_ into a
0114       // single object. See the notes in the header for ProductSelector
0115       // for more information.
0116 
0117       std::map<BranchID, ProductDescription const*> trueBranchIDToKeptBranchDesc;
0118       std::vector<ProductDescription const*> associationDescriptions;
0119       std::set<BranchID> keptProductsInEvent;
0120       std::set<std::string> processesWithSelectedMergeableRunProducts;
0121       std::set<std::string> processesWithKeptProcessBlockProducts;
0122 
0123       for (auto const& it : preg.productList()) {
0124         ProductDescription const& desc = it.second;
0125         if (desc.transient()) {
0126           // if the class of the branch is marked transient, output nothing
0127         } else if (!desc.present() && !desc.produced()) {
0128           // else if the branch containing the product has been previously dropped,
0129           // output nothing
0130         } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) {
0131           associationDescriptions.push_back(&desc);
0132         } else if (selected(desc)) {
0133           keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0134           insertSelectedProcesses(
0135               desc, processesWithSelectedMergeableRunProducts, processesWithKeptProcessBlockProducts);
0136         } else {
0137           // otherwise, output nothing,
0138           // and mark the fact that there is a newly dropped branch of this type.
0139           hasNewlyDroppedBranch_[desc.branchType()] = true;
0140         }
0141       }
0142 
0143       setProcessesWithSelectedMergeableRunProducts(processesWithSelectedMergeableRunProducts);
0144 
0145       thinnedAssociationsHelper.selectAssociationProducts(
0146           associationDescriptions, keptProductsInEvent, keepAssociation_);
0147 
0148       for (auto association : associationDescriptions) {
0149         if (keepAssociation_[association->branchID()]) {
0150           keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0151         } else {
0152           hasNewlyDroppedBranch_[association->branchType()] = true;
0153         }
0154       }
0155 
0156       // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
0157       ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
0158 
0159       thinnedAssociationsHelper_->updateFromParentProcess(
0160           thinnedAssociationsHelper, keepAssociation_, droppedBranchIDToKeptBranchID_);
0161       outputProcessBlockHelper_.updateAfterProductSelection(processesWithKeptProcessBlockProducts, processBlockHelper);
0162 
0163       initialRegistry(preg);
0164     }
0165 
0166     void OutputModuleCore::updateBranchIDListsWithKeptAliases() {
0167       if (!droppedBranchIDToKeptBranchID_.empty()) {
0168         // Make a private copy of the BranchIDLists.
0169         *branchIDLists_ = *origBranchIDLists_;
0170         // Check for branches dropped while an EDAlias was kept.
0171         for (BranchIDList& branchIDList : *branchIDLists_) {
0172           for (BranchID::value_type& branchID : branchIDList) {
0173             // Replace BranchID of each dropped branch with that of the kept
0174             // alias, so the alias branch will have the product ID of the original branch.
0175             std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
0176                 droppedBranchIDToKeptBranchID_.find(branchID);
0177             if (iter != droppedBranchIDToKeptBranchID_.end()) {
0178               branchID = iter->second;
0179             }
0180           }
0181         }
0182       }
0183     }
0184 
0185     void OutputModuleCore::keepThisBranch(ProductDescription const& desc,
0186                                           std::map<BranchID, ProductDescription const*>& trueBranchIDToKeptBranchDesc,
0187                                           std::set<BranchID>& keptProductsInEvent) {
0188       ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc);
0189 
0190       EDGetToken token;
0191 
0192       std::vector<std::string> missingDictionaries;
0193       if (!checkDictionary(missingDictionaries, desc.className(), desc.unwrappedType())) {
0194         std::string context("Calling OutputModuleCore::keepThisBranch, checking dictionaries for kept types");
0195         throwMissingDictionariesException(missingDictionaries, context);
0196       }
0197 
0198       switch (desc.branchType()) {
0199         case InEvent: {
0200           if (desc.produced()) {
0201             keptProductsInEvent.insert(desc.originalBranchID());
0202           } else {
0203             keptProductsInEvent.insert(desc.branchID());
0204           }
0205           token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0206                            InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()});
0207           break;
0208         }
0209         case InLumi: {
0210           token = consumes<InLumi>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0211                                    InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
0212           break;
0213         }
0214         case InRun: {
0215           token = consumes<InRun>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0216                                   InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
0217           break;
0218         }
0219         case InProcess: {
0220           token = consumes<InProcess>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0221                                       InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
0222           break;
0223         }
0224         default:
0225           assert(false);
0226           break;
0227       }
0228       // Now put it in the list of selected branches.
0229       keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
0230     }
0231 
0232     OutputModuleCore::~OutputModuleCore() {}
0233 
0234     void OutputModuleCore::doPreallocate_(PreallocationConfiguration const& iPC) {
0235       auto nstreams = iPC.numberOfStreams();
0236       selectors_.resize(nstreams);
0237 
0238       preallocLumis(iPC.numberOfLuminosityBlocks());
0239 
0240       bool seenFirst = false;
0241       for (auto& s : selectors_) {
0242         if (seenFirst) {
0243           detail::configureEventSelector(selectEvents_, process_name_, getAllTriggerNames(), s, consumesCollector());
0244         } else {
0245           seenFirst = true;
0246         }
0247       }
0248     }
0249 
0250     void OutputModuleCore::preallocLumis(unsigned int) {}
0251 
0252     void OutputModuleCore::doBeginJob_() {
0253       this->beginJob();
0254       if (onFinalPath_) {
0255         //this stops prefetching of the data products
0256         resetItemsToGetFrom(edm::InEvent);
0257       }
0258     }
0259 
0260     void OutputModuleCore::doEndJob() { endJob(); }
0261 
0262     void OutputModuleCore::registerProductsAndCallbacks(OutputModuleCore const*, SignallingProductRegistry* reg) {
0263       if (callWhenNewProductsRegistered_) {
0264         reg->callForEachBranch(callWhenNewProductsRegistered_);
0265 
0266         reg->watchProductAdditions(callWhenNewProductsRegistered_);
0267       }
0268     }
0269 
0270     bool OutputModuleCore::needToRunSelection() const noexcept { return !wantAllEvents_; }
0271 
0272     std::vector<ProductResolverIndexAndSkipBit> OutputModuleCore::productsUsedBySelection() const noexcept {
0273       std::vector<ProductResolverIndexAndSkipBit> returnValue;
0274       auto const& s = selectors_[0];
0275       auto const n = s.numberOfTokens();
0276       returnValue.reserve(n + tokensForEndPaths_.size());
0277 
0278       for (unsigned int i = 0; i < n; ++i) {
0279         returnValue.emplace_back(uncheckedIndexFrom(s.token(i)));
0280       }
0281       for (auto token : tokensForEndPaths_) {
0282         returnValue.emplace_back(uncheckedIndexFrom(token));
0283       }
0284       return returnValue;
0285     }
0286 
0287     bool OutputModuleCore::prePrefetchSelection(StreamID id,
0288                                                 EventPrincipal const& ep,
0289                                                 ModuleCallingContext const* mcc) {
0290       if (wantAllEvents_)
0291         return true;
0292       auto& s = selectors_[id.value()];
0293       EventForOutput e(ep, moduleDescription_, mcc);
0294       e.setConsumer(this);
0295       return s.wantEvent(e);
0296     }
0297 
0298     bool OutputModuleCore::doEvent_(EventTransitionInfo const& info,
0299                                     ActivityRegistry* act,
0300                                     ModuleCallingContext const* mcc) {
0301       {
0302         EventSignalsSentry sentry(act, mcc);
0303         EventForOutput e(info, moduleDescription_, mcc);
0304         e.setConsumer(this);
0305         write(e);
0306       }
0307       //remainingEvents_ is decremented by inheriting classes
0308       return true;
0309     }
0310 
0311     bool OutputModuleCore::doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) {
0312       RunForOutput r(info, moduleDescription_, mcc, false);
0313       r.setConsumer(this);
0314       doBeginRun_(r);
0315       return true;
0316     }
0317 
0318     bool OutputModuleCore::doEndRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) {
0319       RunForOutput r(info, moduleDescription_, mcc, true);
0320       r.setConsumer(this);
0321       doEndRun_(r);
0322       return true;
0323     }
0324 
0325     void OutputModuleCore::doWriteProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) {
0326       ProcessBlockForOutput pb(pbp, moduleDescription_, mcc, true);
0327       pb.setConsumer(this);
0328       writeProcessBlock(pb);
0329     }
0330 
0331     void OutputModuleCore::doWriteRun(RunPrincipal const& rp,
0332                                       ModuleCallingContext const* mcc,
0333                                       MergeableRunProductMetadata const* mrpm) {
0334       RunForOutput r(rp, moduleDescription_, mcc, true, mrpm);
0335       r.setConsumer(this);
0336       writeRun(r);
0337     }
0338 
0339     bool OutputModuleCore::doBeginLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) {
0340       LuminosityBlockForOutput lb(info, moduleDescription_, mcc, false);
0341       lb.setConsumer(this);
0342       doBeginLuminosityBlock_(lb);
0343       return true;
0344     }
0345 
0346     bool OutputModuleCore::doEndLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) {
0347       LuminosityBlockForOutput lb(info, moduleDescription_, mcc, true);
0348       lb.setConsumer(this);
0349       doEndLuminosityBlock_(lb);
0350 
0351       return true;
0352     }
0353 
0354     void OutputModuleCore::doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp,
0355                                                   ModuleCallingContext const* mcc) {
0356       LuminosityBlockForOutput lb(lbp, moduleDescription_, mcc, true);
0357       lb.setConsumer(this);
0358       writeLuminosityBlock(lb);
0359     }
0360 
0361     void OutputModuleCore::doOpenFile(FileBlock const& fb) { openFile(fb); }
0362 
0363     void OutputModuleCore::doRespondToOpenInputFile(FileBlock const& fb) {
0364       updateBranchIDListsWithKeptAliases();
0365       doRespondToOpenInputFile_(fb);
0366     }
0367 
0368     void OutputModuleCore::doRespondToCloseInputFile(FileBlock const& fb) { doRespondToCloseInputFile_(fb); }
0369 
0370     void OutputModuleCore::doCloseFile() {
0371       if (isFileOpen()) {
0372         reallyCloseFile();
0373       }
0374     }
0375 
0376     void OutputModuleCore::reallyCloseFile() {}
0377 
0378     BranchIDLists const* OutputModuleCore::branchIDLists() const {
0379       if (!droppedBranchIDToKeptBranchID_.empty()) {
0380         return branchIDLists_.get();
0381       }
0382       return origBranchIDLists_;
0383     }
0384 
0385     ThinnedAssociationsHelper const* OutputModuleCore::thinnedAssociationsHelper() const {
0386       return thinnedAssociationsHelper_.get();
0387     }
0388 
0389     ModuleDescription const& OutputModuleCore::description() const { return moduleDescription_; }
0390 
0391     bool OutputModuleCore::selected(ProductDescription const& desc) const { return productSelector_.selected(desc); }
0392 
0393     void OutputModuleCore::fillDescriptions(ConfigurationDescriptions& descriptions) {
0394       ParameterSetDescription desc;
0395       desc.setUnknown();
0396       descriptions.addDefault(desc);
0397     }
0398 
0399     void OutputModuleCore::fillDescription(ParameterSetDescription& desc,
0400                                            std::vector<std::string> const& defaultOutputCommands) {
0401       ProductSelectorRules::fillDescription(desc, "outputCommands", defaultOutputCommands);
0402       EventSelector::fillDescription(desc);
0403       desc.addOptionalNode(ParameterDescription<bool>("@onFinalPath", false, false), false);
0404     }
0405 
0406     void OutputModuleCore::prevalidate(ConfigurationDescriptions&) {}
0407 
0408     static const std::string kBaseType("OutputModule");
0409     const std::string& OutputModuleCore::baseType() { return kBaseType; }
0410 
0411     void OutputModuleCore::setEventSelectionInfo(
0412         std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
0413         bool anyProductProduced) {
0414       selector_config_id_ = detail::registerProperSelectionInfo(getParameterSet(selector_config_id_),
0415                                                                 description().moduleLabel(),
0416                                                                 outputModulePathPositions,
0417                                                                 anyProductProduced);
0418     }
0419   }  // namespace core
0420 }  // namespace edm