Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-07-28 22:48:22

0001 #ifndef FWCore_Framework_SubProcess_h
0002 #define FWCore_Framework_SubProcess_h
0003 
0004 #include "DataFormats/Provenance/interface/BranchID.h"
0005 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0006 #include "FWCore/Framework/interface/EventSetupProvider.h"
0007 #include "FWCore/Framework/interface/EDConsumerBase.h"
0008 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0009 #include "FWCore/Framework/interface/PrincipalCache.h"
0010 #include "FWCore/Framework/interface/Schedule.h"
0011 #include "FWCore/Framework/interface/TriggerResultsBasedEventSelector.h"
0012 #include "FWCore/Framework/interface/ProductSelectorRules.h"
0013 #include "FWCore/Framework/interface/ProductSelector.h"
0014 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0015 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
0016 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0017 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0018 #include "FWCore/Utilities/interface/Algorithms.h"
0019 #include "FWCore/Utilities/interface/BranchType.h"
0020 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0021 #include "FWCore/Utilities/interface/propagate_const.h"
0022 
0023 #include "DataFormats/Provenance/interface/SelectedProducts.h"
0024 
0025 #include <map>
0026 #include <memory>
0027 #include <mutex>
0028 #include <set>
0029 #include <vector>
0030 
0031 namespace edm {
0032   class ActivityRegistry;
0033   class BranchDescription;
0034   class BranchIDListHelper;
0035   class EventPrincipal;
0036   class EventSetupImpl;
0037   class ExceptionCollector;
0038   class HistoryAppender;
0039   class LuminosityBlockPrincipal;
0040   class LumiTransitionInfo;
0041   class MergeableRunProductMetadata;
0042   class ModuleTypeResolverMaker;
0043   class ParameterSet;
0044   class Principal;
0045   class ProcessBlockTransitionInfo;
0046   class ProductRegistry;
0047   class PreallocationConfiguration;
0048   class RunTransitionInfo;
0049   class ThinnedAssociationsHelper;
0050   class SubProcessParentageHelper;
0051   class WaitingTaskHolder;
0052 
0053   namespace eventsetup {
0054     class EventSetupsController;
0055   }
0056   class SubProcess : public EDConsumerBase {
0057   public:
0058     SubProcess(ParameterSet& parameterSet,
0059                ParameterSet const& topLevelParameterSet,
0060                std::shared_ptr<ProductRegistry const> parentProductRegistry,
0061                std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
0062                ProcessBlockHelperBase const& parentProcessBlockHelper,
0063                ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
0064                SubProcessParentageHelper const& parentSubProcessParentageHelper,
0065                eventsetup::EventSetupsController& esController,
0066                ActivityRegistry& parentActReg,
0067                ServiceToken const& token,
0068                serviceregistry::ServiceLegacy iLegacy,
0069                PreallocationConfiguration const& preallocConfig,
0070                ProcessContext const* parentProcessContext,
0071                ModuleTypeResolverMaker const* typeResolverMaker);
0072 
0073     ~SubProcess() override;
0074 
0075     SubProcess(SubProcess const&) = delete;             // Disallow copying
0076     SubProcess& operator=(SubProcess const&) = delete;  // Disallow copying
0077     SubProcess(SubProcess&&) = default;                 // Allow Moving
0078     SubProcess& operator=(SubProcess&&) = delete;       // Move not supported by PrincipalCache
0079 
0080     //From OutputModule
0081     void selectProducts(ProductRegistry const& preg,
0082                         ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
0083                         std::map<BranchID, bool>& keepAssociation);
0084 
0085     SelectedProductsForBranchType const& keptProducts() const { return keptProducts_; }
0086 
0087     // Returns the set of modules whose products may be consumed by
0088     // modules in this SubProcess or its child SubProcesses
0089     std::vector<ModuleProcessName> keepOnlyConsumedUnscheduledModules(bool deleteModules);
0090 
0091     void doBeginJob();
0092     void doEndJob(ExceptionCollector&);
0093 
0094     void doEventAsync(WaitingTaskHolder iHolder,
0095                       EventPrincipal const& principal,
0096                       std::vector<std::shared_ptr<const EventSetupImpl>> const*);
0097 
0098     template <typename Traits>
0099     void doBeginProcessBlockAsync(WaitingTaskHolder iHolder,
0100                                   ProcessBlockTransitionInfo const& iTransitionInfo,
0101                                   bool cleaningUpAfterException);
0102 
0103     void doEndProcessBlockAsync(WaitingTaskHolder iHolder,
0104                                 ProcessBlockTransitionInfo const& iTransitionInfo,
0105                                 bool cleaningUpAfterException);
0106 
0107     void doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const& iTransitionInfo);
0108 
0109     void doEndRunAsync(WaitingTaskHolder iHolder,
0110                        RunTransitionInfo const& iTransitionInfo,
0111                        bool cleaningUpAfterException);
0112 
0113     void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const& iTransitionInfo);
0114 
0115     void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder,
0116                                    LumiTransitionInfo const& iTransitionInfo,
0117                                    bool cleaningUpAfterException);
0118 
0119     void doBeginStream(unsigned int streamID);
0120     void doEndStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
0121     void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const&);
0122 
0123     void doStreamEndRunAsync(WaitingTaskHolder iHolder,
0124                              unsigned int iID,
0125                              RunTransitionInfo const&,
0126                              bool cleaningUpAfterException);
0127 
0128     void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const&);
0129 
0130     void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder,
0131                                          unsigned int iID,
0132                                          LumiTransitionInfo const&,
0133                                          bool cleaningUpAfterException);
0134 
0135     void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal&);
0136 
0137     void clearLumiPrincipal(LuminosityBlockPrincipal&);
0138 
0139     using ProcessBlockType = PrincipalCache::ProcessBlockType;
0140     void writeProcessBlockAsync(edm::WaitingTaskHolder task, ProcessBlockType);
0141 
0142     void writeRunAsync(WaitingTaskHolder, RunPrincipal const&, MergeableRunProductMetadata const*);
0143 
0144     void clearRunPrincipal(RunPrincipal&);
0145 
0146     void clearProcessBlockPrincipal(ProcessBlockType);
0147 
0148     // Call closeFile() on all OutputModules.
0149     void closeOutputFiles() {
0150       ServiceRegistry::Operate operate(serviceToken_);
0151       schedule_->closeOutputFiles();
0152       for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
0153     }
0154 
0155     // Call openFiles() on all OutputModules
0156     void openOutputFiles(FileBlock& fb) {
0157       ServiceRegistry::Operate operate(serviceToken_);
0158       schedule_->openOutputFiles(fb);
0159       for_all(subProcesses_, [&fb](auto& subProcess) { subProcess.openOutputFiles(fb); });
0160     }
0161 
0162     void updateBranchIDListHelper(BranchIDLists const&);
0163 
0164     // Call respondToOpenInputFile() on all Modules
0165     void respondToOpenInputFile(FileBlock const& fb);
0166 
0167     // Call respondToCloseInputFile() on all Modules
0168     void respondToCloseInputFile(FileBlock const& fb) {
0169       ServiceRegistry::Operate operate(serviceToken_);
0170       schedule_->respondToCloseInputFile(fb);
0171       for_all(subProcesses_, [&fb](auto& subProcess) { subProcess.respondToCloseInputFile(fb); });
0172     }
0173 
0174     // Call shouldWeCloseFile() on all OutputModules.
0175     bool shouldWeCloseOutput() const {
0176       ServiceRegistry::Operate operate(serviceToken_);
0177       if (schedule_->shouldWeCloseOutput()) {
0178         return true;
0179       }
0180       for (auto const& subProcess : subProcesses_) {
0181         if (subProcess.shouldWeCloseOutput()) {
0182           return true;
0183         }
0184       }
0185       return false;
0186     }
0187 
0188     /// Return a vector allowing const access to all the ModuleDescriptions for this SubProcess
0189 
0190     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0191     /// *** passed to the caller. Do not call delete on these
0192     /// *** pointers!
0193     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0194 
0195     /// Return the number of events this SubProcess has tried to process
0196     /// (inclues both successes and failures, including failures due
0197     /// to exceptions during processing).
0198     int totalEvents() const { return schedule_->totalEvents(); }
0199 
0200     /// Return the number of events which have been passed by one or more trigger paths.
0201     int totalEventsPassed() const {
0202       ServiceRegistry::Operate operate(serviceToken_);
0203       return schedule_->totalEventsPassed();
0204     }
0205 
0206     /// Return the number of events that have not passed any trigger.
0207     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0208     int totalEventsFailed() const {
0209       ServiceRegistry::Operate operate(serviceToken_);
0210       return schedule_->totalEventsFailed();
0211     }
0212 
0213     /// Return the trigger report information on paths,
0214     /// modules-in-path, modules-in-endpath, and modules.
0215     void getTriggerReport(TriggerReport& rep) const {
0216       ServiceRegistry::Operate operate(serviceToken_);
0217       schedule_->getTriggerReport(rep);
0218     }
0219 
0220     /// Return whether each output module has reached its maximum count.
0221     /// If there is a subprocess, get this information from the subprocess.
0222     bool terminate() const {
0223       ServiceRegistry::Operate operate(serviceToken_);
0224       if (schedule_->terminate()) {
0225         return true;
0226       }
0227       for (auto const& subProcess : subProcesses_) {
0228         if (subProcess.terminate()) {
0229           return true;
0230         }
0231       }
0232       return false;
0233     }
0234 
0235     ///  Clear all the counters in the trigger report.
0236     void clearCounters() {
0237       ServiceRegistry::Operate operate(serviceToken_);
0238       schedule_->clearCounters();
0239       for_all(subProcesses_, [](auto& subProcess) { subProcess.clearCounters(); });
0240     }
0241 
0242   private:
0243     void beginJob();
0244     void endJob(ExceptionCollector&);
0245     void processAsync(WaitingTaskHolder iHolder,
0246                       EventPrincipal const& e,
0247                       std::vector<std::shared_ptr<const EventSetupImpl>> const*);
0248 
0249     void propagateProducts(BranchType type, Principal const& parentPrincipal, Principal& principal) const;
0250     bool parentProducedProductIsKept(Principal const& parentPrincipal, Principal& principal) const;
0251     void fixBranchIDListsForEDAliases(
0252         std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID);
0253     void keepThisBranch(BranchDescription const& desc,
0254                         std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
0255                         std::set<BranchID>& keptProductsInEvent);
0256 
0257     std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID() {
0258       return droppedBranchIDToKeptBranchID_;
0259     }
0260 
0261     std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0262       return get_underlying_safe(branchIDListHelper_);
0263     }
0264     std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0265     std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0266       return get_underlying_safe(thinnedAssociationsHelper_);
0267     }
0268     std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper() {
0269       return get_underlying_safe(thinnedAssociationsHelper_);
0270     }
0271 
0272     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0273     ServiceToken serviceToken_;
0274     std::shared_ptr<ProductRegistry const> parentPreg_;
0275     std::shared_ptr<ProductRegistry const> preg_;
0276     edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0277     edm::propagate_const<std::shared_ptr<SubProcessBlockHelper>> processBlockHelper_;
0278     edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0279     edm::propagate_const<std::shared_ptr<SubProcessParentageHelper>> subProcessParentageHelper_;
0280     std::unique_ptr<ExceptionToActionTable const> act_table_;
0281     std::shared_ptr<ProcessConfiguration const> processConfiguration_;
0282     ProcessContext processContext_;
0283     PathsAndConsumesOfModules pathsAndConsumesOfModules_;
0284     //We require 1 history for each Run, Lumi and Stream
0285     // The vectors first hold Stream info, then Lumi then Run
0286     unsigned int historyLumiOffset_;
0287     unsigned int historyRunOffset_;
0288     std::vector<ProcessHistoryRegistry> processHistoryRegistries_;
0289     std::vector<HistoryAppender> historyAppenders_;
0290     PrincipalCache principalCache_;
0291     //vector index is principal's index value
0292     std::vector<std::shared_ptr<RunPrincipal>> inUseRunPrincipals_;
0293     std::vector<std::shared_ptr<LuminosityBlockPrincipal>> inUseLumiPrincipals_;
0294     edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
0295     edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
0296     std::vector<SubProcess> subProcesses_;
0297     edm::propagate_const<std::unique_ptr<ParameterSet>> processParameterSet_;
0298 
0299     // keptProducts_ are pointers to the BranchDescription objects describing
0300     // the branches we are to write.
0301     //
0302     // We do not own the BranchDescriptions to which we point.
0303     SelectedProductsForBranchType keptProducts_;
0304     ProductSelectorRules productSelectorRules_;
0305     ProductSelector productSelector_;
0306 
0307     //EventSelection
0308     bool wantAllEvents_;
0309     ParameterSetID selector_config_id_;
0310     mutable detail::TriggerResultsBasedEventSelector selectors_;
0311 
0312     // needed because of possible EDAliases.
0313     // filled in only if key and value are different.
0314     std::map<BranchID::value_type, BranchID::value_type> droppedBranchIDToKeptBranchID_;
0315   };
0316 
0317   // free function
0318   std::vector<ParameterSet> popSubProcessVParameterSet(ParameterSet& parameterSet);
0319 }  // namespace edm
0320 #endif