Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-11 03:34:15

0001 // -*- C++ -*-
0002 //
0003 // Package:     FWCore/Framework
0004 // Class  :     OutputModuleCore
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Wed, 31 Jul 2013 15:59:19 GMT
0011 //
0012 
0013 // system include files
0014 #include <cassert>
0015 
0016 // user include files
0017 #include "FWCore/Framework/interface/OutputModuleCore.h"
0018 
0019 #include "DataFormats/Common/interface/Handle.h"
0020 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0021 #include "DataFormats/Common/interface/EndPathStatus.h"
0022 #include "DataFormats/Provenance/interface/BranchDescription.h"
0023 #include "DataFormats/Provenance/interface/BranchKey.h"
0024 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0025 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0026 #include "FWCore/Framework/interface/ConstProductRegistry.h"
0027 #include "FWCore/Framework/interface/EventForOutput.h"
0028 #include "FWCore/Framework/interface/EventPrincipal.h"
0029 #include "FWCore/Framework/src/insertSelectedProcesses.h"
0030 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0031 #include "FWCore/Framework/interface/ProcessBlockForOutput.h"
0032 #include "FWCore/Framework/interface/RunForOutput.h"
0033 #include "FWCore/Framework/src/OutputModuleDescription.h"
0034 #include "FWCore/Framework/interface/TriggerNamesService.h"
0035 #include "FWCore/Framework/src/EventSignalsSentry.h"
0036 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0037 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0038 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0039 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0040 #include "FWCore/ServiceRegistry/interface/Service.h"
0041 #include "FWCore/Utilities/interface/DebugMacros.h"
0042 #include "FWCore/Reflection/interface/DictionaryTools.h"
0043 
0044 namespace edm {
0045   namespace core {
0046 
0047     // -------------------------------------------------------
0048     OutputModuleCore::OutputModuleCore(ParameterSet const& pset)
0049         : remainingEvents_(-1),
0050           maxEvents_(-1),
0051           keptProducts_(),
0052           hasNewlyDroppedBranch_(),
0053           process_name_(),
0054           productSelectorRules_(pset, "outputCommands", "OutputModule"),
0055           productSelector_(),
0056           moduleDescription_(),
0057           wantAllEvents_(false),
0058           selectors_(),
0059           selector_config_id_(),
0060           droppedBranchIDToKeptBranchID_(),
0061           branchIDLists_(new BranchIDLists),
0062           origBranchIDLists_(nullptr),
0063           thinnedAssociationsHelper_(new ThinnedAssociationsHelper) {
0064       hasNewlyDroppedBranch_.fill(false);
0065 
0066       Service<service::TriggerNamesService> tns;
0067       process_name_ = tns->getProcessName();
0068 
0069       selectEvents_ = pset.getUntrackedParameterSet("SelectEvents", ParameterSet());
0070 
0071       selectEvents_.registerIt();  // Just in case this PSet is not registered
0072 
0073       selector_config_id_ = selectEvents_.id();
0074 
0075       //need to set wantAllEvents_ in constructor
0076       // we will make the remaining selectors once we know how many streams
0077       selectors_.resize(1);
0078       wantAllEvents_ = detail::configureEventSelector(
0079           selectEvents_, process_name_, getAllTriggerNames(), selectors_[0], consumesCollector());
0080 
0081       //Check if on final path
0082       if (pset.exists("@onFinalPath")) {
0083         onFinalPath_ = pset.getUntrackedParameter<bool>("@onFinalPath");
0084       }
0085       if (onFinalPath_) {
0086         wantAllEvents_ = false;
0087         if (not getAllTriggerNames().empty() and selectors_.front().numberOfTokens() == 0) {
0088           //need to wait for trigger paths to finish
0089           tokensForEndPaths_.push_back(consumes<TriggerResults>(edm::InputTag("TriggerResults", "", process_name_)));
0090         }
0091         //need to wait for EndPaths to finish
0092         for (auto const& n : tns->getEndPaths()) {
0093           if (n == "@finalPath") {
0094             continue;
0095           }
0096           tokensForEndPaths_.push_back(consumes<EndPathStatus>(edm::InputTag(n, "", process_name_)));
0097         }
0098       }
0099     }
0100 
0101     void OutputModuleCore::configure(OutputModuleDescription const& desc) {
0102       remainingEvents_ = maxEvents_ = desc.maxEvents_;
0103       origBranchIDLists_ = desc.branchIDLists_;
0104     }
0105 
0106     void OutputModuleCore::selectProducts(ProductRegistry const& preg,
0107                                           ThinnedAssociationsHelper const& thinnedAssociationsHelper,
0108                                           ProcessBlockHelperBase const& processBlockHelper) {
0109       if (productSelector_.initialized())
0110         return;
0111       productSelector_.initialize(productSelectorRules_, preg.allBranchDescriptions());
0112 
0113       // TODO: See if we can collapse keptProducts_ and productSelector_ into a
0114       // single object. See the notes in the header for ProductSelector
0115       // for more information.
0116 
0117       std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
0118       std::vector<BranchDescription const*> associationDescriptions;
0119       std::set<BranchID> keptProductsInEvent;
0120       std::set<std::string> processesWithSelectedMergeableRunProducts;
0121       std::set<std::string> processesWithKeptProcessBlockProducts;
0122 
0123       for (auto const& it : preg.productList()) {
0124         BranchDescription const& desc = it.second;
0125         if (desc.transient()) {
0126           // if the class of the branch is marked transient, output nothing
0127         } else if (!desc.present() && !desc.produced()) {
0128           // else if the branch containing the product has been previously dropped,
0129           // output nothing
0130         } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) {
0131           associationDescriptions.push_back(&desc);
0132         } else if (selected(desc)) {
0133           keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0134           insertSelectedProcesses(
0135               desc, processesWithSelectedMergeableRunProducts, processesWithKeptProcessBlockProducts);
0136         } else {
0137           // otherwise, output nothing,
0138           // and mark the fact that there is a newly dropped branch of this type.
0139           hasNewlyDroppedBranch_[desc.branchType()] = true;
0140         }
0141       }
0142 
0143       setProcessesWithSelectedMergeableRunProducts(processesWithSelectedMergeableRunProducts);
0144 
0145       thinnedAssociationsHelper.selectAssociationProducts(
0146           associationDescriptions, keptProductsInEvent, keepAssociation_);
0147 
0148       for (auto association : associationDescriptions) {
0149         if (keepAssociation_[association->branchID()]) {
0150           keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0151         } else {
0152           hasNewlyDroppedBranch_[association->branchType()] = true;
0153         }
0154       }
0155 
0156       // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
0157       ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
0158 
0159       thinnedAssociationsHelper_->updateFromParentProcess(
0160           thinnedAssociationsHelper, keepAssociation_, droppedBranchIDToKeptBranchID_);
0161       outputProcessBlockHelper_.updateAfterProductSelection(processesWithKeptProcessBlockProducts, processBlockHelper);
0162     }
0163 
0164     void OutputModuleCore::updateBranchIDListsWithKeptAliases() {
0165       if (!droppedBranchIDToKeptBranchID_.empty()) {
0166         // Make a private copy of the BranchIDLists.
0167         *branchIDLists_ = *origBranchIDLists_;
0168         // Check for branches dropped while an EDAlias was kept.
0169         for (BranchIDList& branchIDList : *branchIDLists_) {
0170           for (BranchID::value_type& branchID : branchIDList) {
0171             // Replace BranchID of each dropped branch with that of the kept
0172             // alias, so the alias branch will have the product ID of the original branch.
0173             std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
0174                 droppedBranchIDToKeptBranchID_.find(branchID);
0175             if (iter != droppedBranchIDToKeptBranchID_.end()) {
0176               branchID = iter->second;
0177             }
0178           }
0179         }
0180       }
0181     }
0182 
0183     void OutputModuleCore::keepThisBranch(BranchDescription const& desc,
0184                                           std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
0185                                           std::set<BranchID>& keptProductsInEvent) {
0186       ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc);
0187 
0188       EDGetToken token;
0189 
0190       std::vector<std::string> missingDictionaries;
0191       if (!checkDictionary(missingDictionaries, desc.className(), desc.unwrappedType())) {
0192         std::string context("Calling OutputModuleCore::keepThisBranch, checking dictionaries for kept types");
0193         throwMissingDictionariesException(missingDictionaries, context);
0194       }
0195 
0196       switch (desc.branchType()) {
0197         case InEvent: {
0198           if (desc.produced()) {
0199             keptProductsInEvent.insert(desc.originalBranchID());
0200           } else {
0201             keptProductsInEvent.insert(desc.branchID());
0202           }
0203           token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0204                            InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()});
0205           break;
0206         }
0207         case InLumi: {
0208           token = consumes<InLumi>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0209                                    InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
0210           break;
0211         }
0212         case InRun: {
0213           token = consumes<InRun>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0214                                   InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
0215           break;
0216         }
0217         case InProcess: {
0218           token = consumes<InProcess>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0219                                       InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
0220           break;
0221         }
0222         default:
0223           assert(false);
0224           break;
0225       }
0226       // Now put it in the list of selected branches.
0227       keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
0228     }
0229 
0230     OutputModuleCore::~OutputModuleCore() {}
0231 
0232     void OutputModuleCore::doPreallocate_(PreallocationConfiguration const& iPC) {
0233       auto nstreams = iPC.numberOfStreams();
0234       selectors_.resize(nstreams);
0235 
0236       preallocLumis(iPC.numberOfLuminosityBlocks());
0237 
0238       bool seenFirst = false;
0239       for (auto& s : selectors_) {
0240         if (seenFirst) {
0241           detail::configureEventSelector(selectEvents_, process_name_, getAllTriggerNames(), s, consumesCollector());
0242         } else {
0243           seenFirst = true;
0244         }
0245       }
0246     }
0247 
0248     void OutputModuleCore::preallocLumis(unsigned int) {}
0249 
0250     void OutputModuleCore::doBeginJob_() {
0251       this->beginJob();
0252       if (onFinalPath_) {
0253         //this stops prefetching of the data products
0254         resetItemsToGetFrom(edm::InEvent);
0255       }
0256     }
0257 
0258     void OutputModuleCore::doEndJob() { endJob(); }
0259 
0260     void OutputModuleCore::registerProductsAndCallbacks(OutputModuleCore const*, ProductRegistry* reg) {
0261       if (callWhenNewProductsRegistered_) {
0262         reg->callForEachBranch(callWhenNewProductsRegistered_);
0263 
0264         Service<ConstProductRegistry> regService;
0265         regService->watchProductAdditions(callWhenNewProductsRegistered_);
0266       }
0267     }
0268 
0269     bool OutputModuleCore::needToRunSelection() const noexcept { return !wantAllEvents_; }
0270 
0271     std::vector<ProductResolverIndexAndSkipBit> OutputModuleCore::productsUsedBySelection() const noexcept {
0272       std::vector<ProductResolverIndexAndSkipBit> returnValue;
0273       auto const& s = selectors_[0];
0274       auto const n = s.numberOfTokens();
0275       returnValue.reserve(n + tokensForEndPaths_.size());
0276 
0277       for (unsigned int i = 0; i < n; ++i) {
0278         returnValue.emplace_back(uncheckedIndexFrom(s.token(i)));
0279       }
0280       for (auto token : tokensForEndPaths_) {
0281         returnValue.emplace_back(uncheckedIndexFrom(token));
0282       }
0283       return returnValue;
0284     }
0285 
0286     bool OutputModuleCore::prePrefetchSelection(StreamID id,
0287                                                 EventPrincipal const& ep,
0288                                                 ModuleCallingContext const* mcc) {
0289       if (wantAllEvents_)
0290         return true;
0291       auto& s = selectors_[id.value()];
0292       EventForOutput e(ep, moduleDescription_, mcc);
0293       e.setConsumer(this);
0294       return s.wantEvent(e);
0295     }
0296 
0297     bool OutputModuleCore::doEvent_(EventTransitionInfo const& info,
0298                                     ActivityRegistry* act,
0299                                     ModuleCallingContext const* mcc) {
0300       {
0301         EventForOutput e(info, moduleDescription_, mcc);
0302         e.setConsumer(this);
0303         EventSignalsSentry sentry(act, mcc);
0304         write(e);
0305       }
0306       //remainingEvents_ is decremented by inheriting classes
0307       return true;
0308     }
0309 
0310     bool OutputModuleCore::doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) {
0311       RunForOutput r(info, moduleDescription_, mcc, false);
0312       r.setConsumer(this);
0313       doBeginRun_(r);
0314       return true;
0315     }
0316 
0317     bool OutputModuleCore::doEndRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) {
0318       RunForOutput r(info, moduleDescription_, mcc, true);
0319       r.setConsumer(this);
0320       doEndRun_(r);
0321       return true;
0322     }
0323 
0324     void OutputModuleCore::doWriteProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) {
0325       ProcessBlockForOutput pb(pbp, moduleDescription_, mcc, true);
0326       pb.setConsumer(this);
0327       writeProcessBlock(pb);
0328     }
0329 
0330     void OutputModuleCore::doWriteRun(RunPrincipal const& rp,
0331                                       ModuleCallingContext const* mcc,
0332                                       MergeableRunProductMetadata const* mrpm) {
0333       RunForOutput r(rp, moduleDescription_, mcc, true, mrpm);
0334       r.setConsumer(this);
0335       writeRun(r);
0336     }
0337 
0338     bool OutputModuleCore::doBeginLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) {
0339       LuminosityBlockForOutput lb(info, moduleDescription_, mcc, false);
0340       lb.setConsumer(this);
0341       doBeginLuminosityBlock_(lb);
0342       return true;
0343     }
0344 
0345     bool OutputModuleCore::doEndLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) {
0346       LuminosityBlockForOutput lb(info, moduleDescription_, mcc, true);
0347       lb.setConsumer(this);
0348       doEndLuminosityBlock_(lb);
0349 
0350       return true;
0351     }
0352 
0353     void OutputModuleCore::doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp,
0354                                                   ModuleCallingContext const* mcc) {
0355       LuminosityBlockForOutput lb(lbp, moduleDescription_, mcc, true);
0356       lb.setConsumer(this);
0357       writeLuminosityBlock(lb);
0358     }
0359 
0360     void OutputModuleCore::doOpenFile(FileBlock const& fb) { openFile(fb); }
0361 
0362     void OutputModuleCore::doRespondToOpenInputFile(FileBlock const& fb) {
0363       updateBranchIDListsWithKeptAliases();
0364       doRespondToOpenInputFile_(fb);
0365     }
0366 
0367     void OutputModuleCore::doRespondToCloseInputFile(FileBlock const& fb) { doRespondToCloseInputFile_(fb); }
0368 
0369     void OutputModuleCore::doCloseFile() {
0370       if (isFileOpen()) {
0371         reallyCloseFile();
0372       }
0373     }
0374 
0375     void OutputModuleCore::reallyCloseFile() {}
0376 
0377     BranchIDLists const* OutputModuleCore::branchIDLists() const {
0378       if (!droppedBranchIDToKeptBranchID_.empty()) {
0379         return branchIDLists_.get();
0380       }
0381       return origBranchIDLists_;
0382     }
0383 
0384     ThinnedAssociationsHelper const* OutputModuleCore::thinnedAssociationsHelper() const {
0385       return thinnedAssociationsHelper_.get();
0386     }
0387 
0388     ModuleDescription const& OutputModuleCore::description() const { return moduleDescription_; }
0389 
0390     bool OutputModuleCore::selected(BranchDescription const& desc) const { return productSelector_.selected(desc); }
0391 
0392     void OutputModuleCore::fillDescriptions(ConfigurationDescriptions& descriptions) {
0393       ParameterSetDescription desc;
0394       desc.setUnknown();
0395       descriptions.addDefault(desc);
0396     }
0397 
0398     void OutputModuleCore::fillDescription(ParameterSetDescription& desc,
0399                                            std::vector<std::string> const& defaultOutputCommands) {
0400       ProductSelectorRules::fillDescription(desc, "outputCommands", defaultOutputCommands);
0401       EventSelector::fillDescription(desc);
0402       desc.addOptionalNode(ParameterDescription<bool>("@onFinalPath", false, false), false);
0403     }
0404 
0405     void OutputModuleCore::prevalidate(ConfigurationDescriptions&) {}
0406 
0407     static const std::string kBaseType("OutputModule");
0408     const std::string& OutputModuleCore::baseType() { return kBaseType; }
0409 
0410     void OutputModuleCore::setEventSelectionInfo(
0411         std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
0412         bool anyProductProduced) {
0413       selector_config_id_ = detail::registerProperSelectionInfo(getParameterSet(selector_config_id_),
0414                                                                 description().moduleLabel(),
0415                                                                 outputModulePathPositions,
0416                                                                 anyProductProduced);
0417     }
0418   }  // namespace core
0419 }  // namespace edm