Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-12-10 02:50:58

0001 // -*- C++ -*-
0002 //
0003 // Package:     FWCore/Framework
0004 // Class  :     OutputModuleCore
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Wed, 31 Jul 2013 15:59:19 GMT
0011 //
0012 
0013 // system include files
0014 #include <cassert>
0015 
0016 // user include files
0017 #include "FWCore/Framework/interface/OutputModuleCore.h"
0018 
0019 #include "DataFormats/Common/interface/Handle.h"
0020 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0021 #include "DataFormats/Common/interface/EndPathStatus.h"
0022 #include "DataFormats/Provenance/interface/BranchDescription.h"
0023 #include "DataFormats/Provenance/interface/BranchKey.h"
0024 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0025 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0026 #include "FWCore/Framework/interface/EventForOutput.h"
0027 #include "FWCore/Framework/interface/EventPrincipal.h"
0028 #include "FWCore/Framework/src/insertSelectedProcesses.h"
0029 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0030 #include "FWCore/Framework/interface/ProcessBlockForOutput.h"
0031 #include "FWCore/Framework/interface/RunForOutput.h"
0032 #include "FWCore/Framework/src/OutputModuleDescription.h"
0033 #include "FWCore/Framework/interface/TriggerNamesService.h"
0034 #include "FWCore/Framework/src/EventSignalsSentry.h"
0035 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0036 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0037 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0038 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0039 #include "FWCore/ServiceRegistry/interface/Service.h"
0040 #include "FWCore/Utilities/interface/DebugMacros.h"
0041 #include "FWCore/Reflection/interface/DictionaryTools.h"
0042 
0043 namespace edm {
0044   namespace core {
0045 
0046     // -------------------------------------------------------
0047     OutputModuleCore::OutputModuleCore(ParameterSet const& pset)
0048         : remainingEvents_(-1),
0049           maxEvents_(-1),
0050           keptProducts_(),
0051           hasNewlyDroppedBranch_(),
0052           process_name_(),
0053           productSelectorRules_(pset, "outputCommands", "OutputModule"),
0054           productSelector_(),
0055           moduleDescription_(),
0056           wantAllEvents_(false),
0057           selectors_(),
0058           selector_config_id_(),
0059           droppedBranchIDToKeptBranchID_(),
0060           branchIDLists_(new BranchIDLists),
0061           origBranchIDLists_(nullptr),
0062           thinnedAssociationsHelper_(new ThinnedAssociationsHelper) {
0063       hasNewlyDroppedBranch_.fill(false);
0064 
0065       Service<service::TriggerNamesService> tns;
0066       process_name_ = tns->getProcessName();
0067 
0068       selectEvents_ = pset.getUntrackedParameterSet("SelectEvents", ParameterSet());
0069 
0070       selectEvents_.registerIt();  // Just in case this PSet is not registered
0071 
0072       selector_config_id_ = selectEvents_.id();
0073 
0074       //need to set wantAllEvents_ in constructor
0075       // we will make the remaining selectors once we know how many streams
0076       selectors_.resize(1);
0077       wantAllEvents_ = detail::configureEventSelector(
0078           selectEvents_, process_name_, getAllTriggerNames(), selectors_[0], consumesCollector());
0079 
0080       //Check if on final path
0081       if (pset.exists("@onFinalPath")) {
0082         onFinalPath_ = pset.getUntrackedParameter<bool>("@onFinalPath");
0083       }
0084       if (onFinalPath_) {
0085         wantAllEvents_ = false;
0086         if (not getAllTriggerNames().empty() and selectors_.front().numberOfTokens() == 0) {
0087           //need to wait for trigger paths to finish
0088           tokensForEndPaths_.push_back(consumes<TriggerResults>(edm::InputTag("TriggerResults", "", process_name_)));
0089         }
0090         //need to wait for EndPaths to finish
0091         for (auto const& n : tns->getEndPaths()) {
0092           if (n == "@finalPath") {
0093             continue;
0094           }
0095           tokensForEndPaths_.push_back(consumes<EndPathStatus>(edm::InputTag(n, "", process_name_)));
0096         }
0097       }
0098     }
0099 
0100     void OutputModuleCore::configure(OutputModuleDescription const& desc) {
0101       remainingEvents_ = maxEvents_ = desc.maxEvents_;
0102       origBranchIDLists_ = desc.branchIDLists_;
0103     }
0104 
0105     void OutputModuleCore::selectProducts(ProductRegistry const& preg,
0106                                           ThinnedAssociationsHelper const& thinnedAssociationsHelper,
0107                                           ProcessBlockHelperBase const& processBlockHelper) {
0108       if (productSelector_.initialized())
0109         return;
0110       productSelector_.initialize(productSelectorRules_, preg.allBranchDescriptions());
0111 
0112       // TODO: See if we can collapse keptProducts_ and productSelector_ into a
0113       // single object. See the notes in the header for ProductSelector
0114       // for more information.
0115 
0116       std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
0117       std::vector<BranchDescription const*> associationDescriptions;
0118       std::set<BranchID> keptProductsInEvent;
0119       std::set<std::string> processesWithSelectedMergeableRunProducts;
0120       std::set<std::string> processesWithKeptProcessBlockProducts;
0121 
0122       for (auto const& it : preg.productList()) {
0123         BranchDescription const& desc = it.second;
0124         if (desc.transient()) {
0125           // if the class of the branch is marked transient, output nothing
0126         } else if (!desc.present() && !desc.produced()) {
0127           // else if the branch containing the product has been previously dropped,
0128           // output nothing
0129         } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) {
0130           associationDescriptions.push_back(&desc);
0131         } else if (selected(desc)) {
0132           keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0133           insertSelectedProcesses(
0134               desc, processesWithSelectedMergeableRunProducts, processesWithKeptProcessBlockProducts);
0135         } else {
0136           // otherwise, output nothing,
0137           // and mark the fact that there is a newly dropped branch of this type.
0138           hasNewlyDroppedBranch_[desc.branchType()] = true;
0139         }
0140       }
0141 
0142       setProcessesWithSelectedMergeableRunProducts(processesWithSelectedMergeableRunProducts);
0143 
0144       thinnedAssociationsHelper.selectAssociationProducts(
0145           associationDescriptions, keptProductsInEvent, keepAssociation_);
0146 
0147       for (auto association : associationDescriptions) {
0148         if (keepAssociation_[association->branchID()]) {
0149           keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0150         } else {
0151           hasNewlyDroppedBranch_[association->branchType()] = true;
0152         }
0153       }
0154 
0155       // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
0156       ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
0157 
0158       thinnedAssociationsHelper_->updateFromParentProcess(
0159           thinnedAssociationsHelper, keepAssociation_, droppedBranchIDToKeptBranchID_);
0160       outputProcessBlockHelper_.updateAfterProductSelection(processesWithKeptProcessBlockProducts, processBlockHelper);
0161     }
0162 
0163     void OutputModuleCore::updateBranchIDListsWithKeptAliases() {
0164       if (!droppedBranchIDToKeptBranchID_.empty()) {
0165         // Make a private copy of the BranchIDLists.
0166         *branchIDLists_ = *origBranchIDLists_;
0167         // Check for branches dropped while an EDAlias was kept.
0168         for (BranchIDList& branchIDList : *branchIDLists_) {
0169           for (BranchID::value_type& branchID : branchIDList) {
0170             // Replace BranchID of each dropped branch with that of the kept
0171             // alias, so the alias branch will have the product ID of the original branch.
0172             std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
0173                 droppedBranchIDToKeptBranchID_.find(branchID);
0174             if (iter != droppedBranchIDToKeptBranchID_.end()) {
0175               branchID = iter->second;
0176             }
0177           }
0178         }
0179       }
0180     }
0181 
0182     void OutputModuleCore::keepThisBranch(BranchDescription const& desc,
0183                                           std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
0184                                           std::set<BranchID>& keptProductsInEvent) {
0185       ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc);
0186 
0187       EDGetToken token;
0188 
0189       std::vector<std::string> missingDictionaries;
0190       if (!checkDictionary(missingDictionaries, desc.className(), desc.unwrappedType())) {
0191         std::string context("Calling OutputModuleCore::keepThisBranch, checking dictionaries for kept types");
0192         throwMissingDictionariesException(missingDictionaries, context);
0193       }
0194 
0195       switch (desc.branchType()) {
0196         case InEvent: {
0197           if (desc.produced()) {
0198             keptProductsInEvent.insert(desc.originalBranchID());
0199           } else {
0200             keptProductsInEvent.insert(desc.branchID());
0201           }
0202           token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0203                            InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()});
0204           break;
0205         }
0206         case InLumi: {
0207           token = consumes<InLumi>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0208                                    InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
0209           break;
0210         }
0211         case InRun: {
0212           token = consumes<InRun>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0213                                   InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
0214           break;
0215         }
0216         case InProcess: {
0217           token = consumes<InProcess>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0218                                       InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
0219           break;
0220         }
0221         default:
0222           assert(false);
0223           break;
0224       }
0225       // Now put it in the list of selected branches.
0226       keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
0227     }
0228 
0229     OutputModuleCore::~OutputModuleCore() {}
0230 
0231     void OutputModuleCore::doPreallocate_(PreallocationConfiguration const& iPC) {
0232       auto nstreams = iPC.numberOfStreams();
0233       selectors_.resize(nstreams);
0234 
0235       preallocLumis(iPC.numberOfLuminosityBlocks());
0236 
0237       bool seenFirst = false;
0238       for (auto& s : selectors_) {
0239         if (seenFirst) {
0240           detail::configureEventSelector(selectEvents_, process_name_, getAllTriggerNames(), s, consumesCollector());
0241         } else {
0242           seenFirst = true;
0243         }
0244       }
0245     }
0246 
0247     void OutputModuleCore::preallocLumis(unsigned int) {}
0248 
0249     void OutputModuleCore::doBeginJob_() {
0250       this->beginJob();
0251       if (onFinalPath_) {
0252         //this stops prefetching of the data products
0253         resetItemsToGetFrom(edm::InEvent);
0254       }
0255     }
0256 
0257     void OutputModuleCore::doEndJob() { endJob(); }
0258 
0259     bool OutputModuleCore::needToRunSelection() const { return !wantAllEvents_; }
0260 
0261     std::vector<ProductResolverIndexAndSkipBit> OutputModuleCore::productsUsedBySelection() const {
0262       std::vector<ProductResolverIndexAndSkipBit> returnValue;
0263       auto const& s = selectors_[0];
0264       auto const n = s.numberOfTokens();
0265       returnValue.reserve(n + tokensForEndPaths_.size());
0266 
0267       for (unsigned int i = 0; i < n; ++i) {
0268         returnValue.emplace_back(uncheckedIndexFrom(s.token(i)));
0269       }
0270       for (auto token : tokensForEndPaths_) {
0271         returnValue.emplace_back(uncheckedIndexFrom(token));
0272       }
0273       return returnValue;
0274     }
0275 
0276     bool OutputModuleCore::prePrefetchSelection(StreamID id,
0277                                                 EventPrincipal const& ep,
0278                                                 ModuleCallingContext const* mcc) {
0279       if (wantAllEvents_)
0280         return true;
0281       auto& s = selectors_[id.value()];
0282       EventForOutput e(ep, moduleDescription_, mcc);
0283       e.setConsumer(this);
0284       return s.wantEvent(e);
0285     }
0286 
0287     bool OutputModuleCore::doEvent_(EventTransitionInfo const& info,
0288                                     ActivityRegistry* act,
0289                                     ModuleCallingContext const* mcc) {
0290       {
0291         EventForOutput e(info, moduleDescription_, mcc);
0292         e.setConsumer(this);
0293         EventSignalsSentry sentry(act, mcc);
0294         write(e);
0295       }
0296       //remainingEvents_ is decremented by inheriting classes
0297       return true;
0298     }
0299 
0300     bool OutputModuleCore::doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) {
0301       RunForOutput r(info, moduleDescription_, mcc, false);
0302       r.setConsumer(this);
0303       doBeginRun_(r);
0304       return true;
0305     }
0306 
0307     bool OutputModuleCore::doEndRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) {
0308       RunForOutput r(info, moduleDescription_, mcc, true);
0309       r.setConsumer(this);
0310       doEndRun_(r);
0311       return true;
0312     }
0313 
0314     void OutputModuleCore::doWriteProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) {
0315       ProcessBlockForOutput pb(pbp, moduleDescription_, mcc, true);
0316       pb.setConsumer(this);
0317       writeProcessBlock(pb);
0318     }
0319 
0320     void OutputModuleCore::doWriteRun(RunPrincipal const& rp,
0321                                       ModuleCallingContext const* mcc,
0322                                       MergeableRunProductMetadata const* mrpm) {
0323       RunForOutput r(rp, moduleDescription_, mcc, true, mrpm);
0324       r.setConsumer(this);
0325       writeRun(r);
0326     }
0327 
0328     bool OutputModuleCore::doBeginLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) {
0329       LuminosityBlockForOutput lb(info, moduleDescription_, mcc, false);
0330       lb.setConsumer(this);
0331       doBeginLuminosityBlock_(lb);
0332       return true;
0333     }
0334 
0335     bool OutputModuleCore::doEndLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) {
0336       LuminosityBlockForOutput lb(info, moduleDescription_, mcc, true);
0337       lb.setConsumer(this);
0338       doEndLuminosityBlock_(lb);
0339 
0340       return true;
0341     }
0342 
0343     void OutputModuleCore::doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp,
0344                                                   ModuleCallingContext const* mcc) {
0345       LuminosityBlockForOutput lb(lbp, moduleDescription_, mcc, true);
0346       lb.setConsumer(this);
0347       writeLuminosityBlock(lb);
0348     }
0349 
0350     void OutputModuleCore::doOpenFile(FileBlock const& fb) { openFile(fb); }
0351 
0352     void OutputModuleCore::doRespondToOpenInputFile(FileBlock const& fb) {
0353       updateBranchIDListsWithKeptAliases();
0354       doRespondToOpenInputFile_(fb);
0355     }
0356 
0357     void OutputModuleCore::doRespondToCloseInputFile(FileBlock const& fb) { doRespondToCloseInputFile_(fb); }
0358 
0359     void OutputModuleCore::doCloseFile() {
0360       if (isFileOpen()) {
0361         reallyCloseFile();
0362       }
0363     }
0364 
0365     void OutputModuleCore::reallyCloseFile() {}
0366 
0367     BranchIDLists const* OutputModuleCore::branchIDLists() const {
0368       if (!droppedBranchIDToKeptBranchID_.empty()) {
0369         return branchIDLists_.get();
0370       }
0371       return origBranchIDLists_;
0372     }
0373 
0374     ThinnedAssociationsHelper const* OutputModuleCore::thinnedAssociationsHelper() const {
0375       return thinnedAssociationsHelper_.get();
0376     }
0377 
0378     ModuleDescription const& OutputModuleCore::description() const { return moduleDescription_; }
0379 
0380     bool OutputModuleCore::selected(BranchDescription const& desc) const { return productSelector_.selected(desc); }
0381 
0382     void OutputModuleCore::fillDescriptions(ConfigurationDescriptions& descriptions) {
0383       ParameterSetDescription desc;
0384       desc.setUnknown();
0385       descriptions.addDefault(desc);
0386     }
0387 
0388     void OutputModuleCore::fillDescription(ParameterSetDescription& desc,
0389                                            std::vector<std::string> const& defaultOutputCommands) {
0390       ProductSelectorRules::fillDescription(desc, "outputCommands", defaultOutputCommands);
0391       EventSelector::fillDescription(desc);
0392       desc.addOptionalNode(ParameterDescription<bool>("@onFinalPath", false, false), false);
0393     }
0394 
0395     void OutputModuleCore::prevalidate(ConfigurationDescriptions&) {}
0396 
0397     static const std::string kBaseType("OutputModule");
0398     const std::string& OutputModuleCore::baseType() { return kBaseType; }
0399 
0400     void OutputModuleCore::setEventSelectionInfo(
0401         std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
0402         bool anyProductProduced) {
0403       selector_config_id_ = detail::registerProperSelectionInfo(getParameterSet(selector_config_id_),
0404                                                                 description().moduleLabel(),
0405                                                                 outputModulePathPositions,
0406                                                                 anyProductProduced);
0407     }
0408   }  // namespace core
0409 }  // namespace edm