Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-11 03:34:12

0001 #ifndef FWCore_Framework_stream_ProducingModuleAdaptor_h
0002 #define FWCore_Framework_stream_ProducingModuleAdaptor_h
0003 // -*- C++ -*-
0004 //
0005 // Package:     FWCore/Framework
0006 // Class  :     ProducingModuleAdaptor
0007 //
0008 /**\class edm::stream::ProducingModuleAdaptor ProducingModuleAdaptor.h "ProducingModuleAdaptor.h"
0009 
0010  Description: Adapts an edm::stream::EDProducer<> to work with an edm::Worker
0011 
0012  Usage:
0013     <usage>
0014 
0015 */
0016 //
0017 // Original Author:  Chris Jones
0018 //         Created:  Fri, 02 Aug 2013 18:09:18 GMT
0019 //
0020 
0021 // system include files
0022 
0023 // user include files
0024 #include "FWCore/Framework/interface/ProcessBlock.h"
0025 #include "FWCore/Framework/interface/Run.h"
0026 #include "FWCore/Framework/interface/LuminosityBlock.h"
0027 #include "FWCore/Framework/interface/RunPrincipal.h"
0028 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0029 #include "FWCore/Framework/interface/EventSetup.h"
0030 #include "FWCore/Framework/interface/stream/callAbilities.h"
0031 #include "FWCore/Framework/interface/stream/dummy_helpers.h"
0032 #include "FWCore/Framework/interface/stream/makeGlobal.h"
0033 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0034 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0035 // forward declarations
0036 
0037 namespace edm {
0038   class ConfigurationDescriptions;
0039   namespace stream {
0040 
0041     template <typename T, typename M, typename B>
0042     class ProducingModuleAdaptor : public B {
0043     public:
0044       ProducingModuleAdaptor(edm::ParameterSet const& iPSet) : m_pset(&iPSet) {
0045         m_runs.resize(1);
0046         m_lumis.resize(1);
0047         m_runSummaries.resize(1);
0048         m_lumiSummaries.resize(1);
0049         typename T::GlobalCache const* dummy = nullptr;
0050         m_global = impl::makeGlobal<T>(iPSet, dummy);
0051         typename T::InputProcessBlockCache const* dummyInputProcessBlockCacheImpl = nullptr;
0052         m_inputProcessBlocks = impl::makeInputProcessBlockCacheImpl(dummyInputProcessBlockCacheImpl);
0053       }
0054       ProducingModuleAdaptor(const ProducingModuleAdaptor&) = delete;                   // stop default
0055       const ProducingModuleAdaptor& operator=(const ProducingModuleAdaptor&) = delete;  // stop default
0056       ~ProducingModuleAdaptor() override { this->deleteModulesEarly(); }
0057 
0058       static void fillDescriptions(ConfigurationDescriptions& descriptions) { T::fillDescriptions(descriptions); }
0059       static void prevalidate(ConfigurationDescriptions& descriptions) { T::prevalidate(descriptions); }
0060 
0061       bool wantsProcessBlocks() const noexcept final {
0062         return T::HasAbility::kWatchProcessBlock or T::HasAbility::kBeginProcessBlockProducer or
0063                T::HasAbility::kEndProcessBlockProducer;
0064       }
0065       bool wantsInputProcessBlocks() const noexcept final { return T::HasAbility::kInputProcessBlockCache; }
0066       bool wantsGlobalRuns() const noexcept final {
0067         return T::HasAbility::kRunCache or T::HasAbility::kRunSummaryCache or T::HasAbility::kBeginRunProducer or
0068                T::HasAbility::kEndRunProducer;
0069       }
0070       bool wantsStreamRuns() const noexcept final { return T::HasAbility::kWatchRuns; }
0071 
0072       bool wantsGlobalLuminosityBlocks() const noexcept final {
0073         return T::HasAbility::kLuminosityBlockCache or T::HasAbility::kLuminosityBlockSummaryCache or
0074                T::HasAbility::kBeginLuminosityBlockProducer or T::HasAbility::kEndLuminosityBlockProducer;
0075       }
0076       bool wantsStreamLuminosityBlocks() const noexcept final { return T::HasAbility::kWatchLuminosityBlocks; }
0077 
0078       bool hasAcquire() const noexcept final { return T::HasAbility::kExternalWork; }
0079 
0080       bool hasAccumulator() const noexcept final { return T::HasAbility::kAccumulator; }
0081 
0082     private:
0083       using MyGlobal = CallGlobal<T>;
0084       using MyInputProcessBlock = CallInputProcessBlock<T>;
0085       using MyWatchProcessBlock = CallWatchProcessBlock<T>;
0086       using MyBeginProcessBlockProduce = CallBeginProcessBlockProduce<T>;
0087       using MyEndProcessBlockProduce = CallEndProcessBlockProduce<T>;
0088       using MyGlobalRun = CallGlobalRun<T>;
0089       using MyGlobalRunSummary = CallGlobalRunSummary<T>;
0090       using MyBeginRunProduce = CallBeginRunProduce<T>;
0091       using MyEndRunProduce = CallEndRunProduce<T>;
0092       using MyGlobalLuminosityBlock = CallGlobalLuminosityBlock<T>;
0093       using MyGlobalLuminosityBlockSummary = CallGlobalLuminosityBlockSummary<T>;
0094       using MyBeginLuminosityBlockProduce = CallBeginLuminosityBlockProduce<T>;
0095       using MyEndLuminosityBlockProduce = CallEndLuminosityBlockProduce<T>;
0096 
0097       void setupStreamModules() final {
0098         this->createStreamModules([this](unsigned int iStreamModule) -> M* {
0099           auto tmp = impl::makeStreamModule<T>(*m_pset, m_global.get());
0100           MyGlobal::set(tmp, m_global.get());
0101           MyInputProcessBlock::set(tmp, &m_inputProcessBlocks, iStreamModule);
0102           return tmp;
0103         });
0104         m_pset = nullptr;
0105       }
0106 
0107       void preallocRuns(unsigned int iNRuns) final {
0108         m_runs.resize(iNRuns);
0109         m_runSummaries.resize(iNRuns);
0110       }
0111       void preallocLumis(unsigned int iNLumis) final {
0112         m_lumis.resize(iNLumis);
0113         m_lumiSummaries.resize(iNLumis);
0114       }
0115       void doBeginJob() final { MyGlobal::beginJob(m_global.get()); }
0116       void doEndJob() final { MyGlobal::endJob(m_global.get()); }
0117       void setupRun(M* iProd, RunIndex iIndex) final { MyGlobalRun::set(iProd, m_runs[iIndex].get()); }
0118       void streamEndRunSummary(M* iProd, edm::Run const& iRun, edm::EventSetup const& iES) final {
0119         auto s = m_runSummaries[iRun.index()].get();
0120         std::lock_guard<decltype(m_runSummaryLock)> guard(m_runSummaryLock);
0121         MyGlobalRunSummary::streamEndRunSummary(iProd, iRun, iES, s);
0122       }
0123 
0124       void setupLuminosityBlock(M* iProd, LuminosityBlockIndex iIndex) final {
0125         MyGlobalLuminosityBlock::set(iProd, m_lumis[iIndex].get());
0126       }
0127       void streamEndLuminosityBlockSummary(M* iProd,
0128                                            edm::LuminosityBlock const& iLumi,
0129                                            edm::EventSetup const& iES) final {
0130         auto s = m_lumiSummaries[iLumi.index()].get();
0131         std::lock_guard<decltype(m_lumiSummaryLock)> guard(m_lumiSummaryLock);
0132         MyGlobalLuminosityBlockSummary::streamEndLuminosityBlockSummary(iProd, iLumi, iES, s);
0133       }
0134 
0135       void doBeginProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) final {
0136         if constexpr (T::HasAbility::kWatchProcessBlock or T::HasAbility::kBeginProcessBlockProducer) {
0137           ProcessBlock processBlock(pbp, this->moduleDescription(), mcc, false);
0138           ProcessBlock const& cnstProcessBlock = processBlock;
0139           processBlock.setConsumer(this->consumer());
0140           MyWatchProcessBlock::beginProcessBlock(cnstProcessBlock, m_global.get());
0141           if constexpr (T::HasAbility::kBeginProcessBlockProducer) {
0142             processBlock.setProducer(this->producer());
0143             MyBeginProcessBlockProduce::produce(processBlock, m_global.get());
0144             this->commit(processBlock);
0145           }
0146         }
0147       }
0148 
0149       void doAccessInputProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) final {
0150         if constexpr (T::HasAbility::kInputProcessBlockCache) {
0151           ProcessBlock processBlock(pbp, this->moduleDescription(), mcc, false);
0152           ProcessBlock const& cnstProcessBlock = processBlock;
0153           processBlock.setConsumer(this->consumer());
0154           MyInputProcessBlock::accessInputProcessBlock(cnstProcessBlock, m_global.get(), m_inputProcessBlocks);
0155         }
0156       }
0157 
0158       void doEndProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) final {
0159         if constexpr (T::HasAbility::kWatchProcessBlock or T::HasAbility::kEndProcessBlockProducer) {
0160           ProcessBlock processBlock(pbp, this->moduleDescription(), mcc, true);
0161           ProcessBlock const& cnstProcessBlock = processBlock;
0162           processBlock.setConsumer(this->consumer());
0163           MyWatchProcessBlock::endProcessBlock(cnstProcessBlock, m_global.get());
0164           if constexpr (T::HasAbility::kEndProcessBlockProducer) {
0165             processBlock.setProducer(this->producer());
0166             MyEndProcessBlockProduce::produce(processBlock, m_global.get());
0167             this->commit(processBlock);
0168           }
0169         }
0170       }
0171 
0172       void doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) final {
0173         if constexpr (T::HasAbility::kRunCache or T::HasAbility::kRunSummaryCache or T::HasAbility::kBeginRunProducer) {
0174           RunPrincipal const& rp = info.principal();
0175           Run r(rp, this->moduleDescription(), mcc, false);
0176           r.setConsumer(this->consumer());
0177           r.setProducer(this->producer());
0178           Run const& cnstR = r;
0179           RunIndex ri = rp.index();
0180           ESParentContext parentC(mcc);
0181           const EventSetup c{info,
0182                              static_cast<unsigned int>(Transition::BeginRun),
0183                              this->consumer()->esGetTokenIndices(Transition::BeginRun),
0184                              parentC};
0185           MyGlobalRun::beginRun(cnstR, c, m_global.get(), m_runs[ri]);
0186           typename T::RunContext rc(m_runs[ri].get(), m_global.get());
0187           MyGlobalRunSummary::beginRun(cnstR, c, &rc, m_runSummaries[ri]);
0188           if constexpr (T::HasAbility::kBeginRunProducer) {
0189             MyBeginRunProduce::produce(r, c, &rc);
0190             this->commit(r);
0191           }
0192         }
0193       }
0194 
0195       void doEndRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) final {
0196         if constexpr (T::HasAbility::kRunCache or T::HasAbility::kRunSummaryCache or T::HasAbility::kEndRunProducer) {
0197           RunPrincipal const& rp = info.principal();
0198           Run r(rp, this->moduleDescription(), mcc, true);
0199           r.setConsumer(this->consumer());
0200           r.setProducer(this->producer());
0201 
0202           RunIndex ri = rp.index();
0203           typename T::RunContext rc(m_runs[ri].get(), m_global.get());
0204           ESParentContext parentC(mcc);
0205           const EventSetup c{info,
0206                              static_cast<unsigned int>(Transition::EndRun),
0207                              this->consumer()->esGetTokenIndices(Transition::EndRun),
0208                              parentC};
0209           MyGlobalRunSummary::globalEndRun(r, c, &rc, m_runSummaries[ri].get());
0210           if constexpr (T::HasAbility::kEndRunProducer) {
0211             MyEndRunProduce::produce(r, c, &rc, m_runSummaries[ri].get());
0212             this->commit(r);
0213           }
0214           MyGlobalRun::endRun(r, c, &rc);
0215         }
0216       }
0217 
0218       void doBeginLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) final {
0219         if constexpr (T::HasAbility::kLuminosityBlockCache or T::HasAbility::kLuminosityBlockSummaryCache or
0220                       T::HasAbility::kBeginLuminosityBlockProducer) {
0221           LuminosityBlockPrincipal const& lbp = info.principal();
0222           LuminosityBlock lb(lbp, this->moduleDescription(), mcc, false);
0223           lb.setConsumer(this->consumer());
0224           lb.setProducer(this->producer());
0225           LuminosityBlock const& cnstLb = lb;
0226           LuminosityBlockIndex li = lbp.index();
0227           RunIndex ri = lbp.runPrincipal().index();
0228           typename T::RunContext rc(m_runs[ri].get(), m_global.get());
0229           ESParentContext parentC(mcc);
0230           const EventSetup c{info,
0231                              static_cast<unsigned int>(Transition::BeginLuminosityBlock),
0232                              this->consumer()->esGetTokenIndices(Transition::BeginLuminosityBlock),
0233                              parentC};
0234 
0235           MyGlobalLuminosityBlock::beginLuminosityBlock(cnstLb, c, &rc, m_lumis[li]);
0236           typename T::LuminosityBlockContext lc(m_lumis[li].get(), m_runs[ri].get(), m_global.get());
0237           MyGlobalLuminosityBlockSummary::beginLuminosityBlock(cnstLb, c, &lc, m_lumiSummaries[li]);
0238           if constexpr (T::HasAbility::kBeginLuminosityBlockProducer) {
0239             MyBeginLuminosityBlockProduce::produce(lb, c, &lc);
0240             this->commit(lb);
0241           }
0242         }
0243       }
0244       void doEndLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) final {
0245         if constexpr (T::HasAbility::kLuminosityBlockCache or T::HasAbility::kLuminosityBlockSummaryCache or
0246                       T::HasAbility::kEndLuminosityBlockProducer) {
0247           LuminosityBlockPrincipal const& lbp = info.principal();
0248           LuminosityBlock lb(lbp, this->moduleDescription(), mcc, true);
0249           lb.setConsumer(this->consumer());
0250           lb.setProducer(this->producer());
0251 
0252           LuminosityBlockIndex li = lbp.index();
0253           RunIndex ri = lbp.runPrincipal().index();
0254           typename T::LuminosityBlockContext lc(m_lumis[li].get(), m_runs[ri].get(), m_global.get());
0255           ESParentContext parentC(mcc);
0256           const EventSetup c{info,
0257                              static_cast<unsigned int>(Transition::EndLuminosityBlock),
0258                              this->consumer()->esGetTokenIndices(Transition::EndLuminosityBlock),
0259                              parentC};
0260           MyGlobalLuminosityBlockSummary::globalEndLuminosityBlock(lb, c, &lc, m_lumiSummaries[li].get());
0261           if constexpr (T::HasAbility::kEndLuminosityBlockProducer) {
0262             MyEndLuminosityBlockProduce::produce(lb, c, &lc, m_lumiSummaries[li].get());
0263             this->commit(lb);
0264           }
0265           MyGlobalLuminosityBlock::endLuminosityBlock(lb, c, &lc);
0266         }
0267       }
0268 
0269       void doRespondToCloseOutputFile() final { MyInputProcessBlock::clearCaches(m_inputProcessBlocks); }
0270 
0271       using B::consumer;
0272 
0273       void selectInputProcessBlocks(ProductRegistry const& productRegistry,
0274                                     ProcessBlockHelperBase const& processBlockHelperBase) final {
0275         MyInputProcessBlock::selectInputProcessBlocks(
0276             m_inputProcessBlocks, productRegistry, processBlockHelperBase, *consumer());
0277       }
0278 
0279       // ---------- member data --------------------------------
0280       typename impl::choose_unique_ptr<typename T::GlobalCache>::type m_global;
0281       typename impl::choose_unique_ptr<typename T::InputProcessBlockCache>::type m_inputProcessBlocks;
0282       typename impl::choose_shared_vec<typename T::RunCache const>::type m_runs;
0283       typename impl::choose_shared_vec<typename T::LuminosityBlockCache const>::type m_lumis;
0284       typename impl::choose_shared_vec<typename T::RunSummaryCache>::type m_runSummaries;
0285       typename impl::choose_mutex<typename T::RunSummaryCache>::type m_runSummaryLock;
0286       typename impl::choose_shared_vec<typename T::LuminosityBlockSummaryCache>::type m_lumiSummaries;
0287       typename impl::choose_mutex<typename T::LuminosityBlockSummaryCache>::type m_lumiSummaryLock;
0288       ParameterSet const* m_pset;
0289     };
0290   }  // namespace stream
0291 }  // namespace edm
0292 
0293 #endif