File indexing completed on 2024-05-11 03:34:12
0001 #ifndef FWCore_Framework_stream_ProducingModuleAdaptor_h
0002 #define FWCore_Framework_stream_ProducingModuleAdaptor_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024 #include "FWCore/Framework/interface/ProcessBlock.h"
0025 #include "FWCore/Framework/interface/Run.h"
0026 #include "FWCore/Framework/interface/LuminosityBlock.h"
0027 #include "FWCore/Framework/interface/RunPrincipal.h"
0028 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0029 #include "FWCore/Framework/interface/EventSetup.h"
0030 #include "FWCore/Framework/interface/stream/callAbilities.h"
0031 #include "FWCore/Framework/interface/stream/dummy_helpers.h"
0032 #include "FWCore/Framework/interface/stream/makeGlobal.h"
0033 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0034 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0035
0036
0037 namespace edm {
0038 class ConfigurationDescriptions;
0039 namespace stream {
0040
0041 template <typename T, typename M, typename B>
0042 class ProducingModuleAdaptor : public B {
0043 public:
0044 ProducingModuleAdaptor(edm::ParameterSet const& iPSet) : m_pset(&iPSet) {
0045 m_runs.resize(1);
0046 m_lumis.resize(1);
0047 m_runSummaries.resize(1);
0048 m_lumiSummaries.resize(1);
0049 typename T::GlobalCache const* dummy = nullptr;
0050 m_global = impl::makeGlobal<T>(iPSet, dummy);
0051 typename T::InputProcessBlockCache const* dummyInputProcessBlockCacheImpl = nullptr;
0052 m_inputProcessBlocks = impl::makeInputProcessBlockCacheImpl(dummyInputProcessBlockCacheImpl);
0053 }
0054 ProducingModuleAdaptor(const ProducingModuleAdaptor&) = delete;
0055 const ProducingModuleAdaptor& operator=(const ProducingModuleAdaptor&) = delete;
0056 ~ProducingModuleAdaptor() override { this->deleteModulesEarly(); }
0057
0058 static void fillDescriptions(ConfigurationDescriptions& descriptions) { T::fillDescriptions(descriptions); }
0059 static void prevalidate(ConfigurationDescriptions& descriptions) { T::prevalidate(descriptions); }
0060
0061 bool wantsProcessBlocks() const noexcept final {
0062 return T::HasAbility::kWatchProcessBlock or T::HasAbility::kBeginProcessBlockProducer or
0063 T::HasAbility::kEndProcessBlockProducer;
0064 }
0065 bool wantsInputProcessBlocks() const noexcept final { return T::HasAbility::kInputProcessBlockCache; }
0066 bool wantsGlobalRuns() const noexcept final {
0067 return T::HasAbility::kRunCache or T::HasAbility::kRunSummaryCache or T::HasAbility::kBeginRunProducer or
0068 T::HasAbility::kEndRunProducer;
0069 }
0070 bool wantsStreamRuns() const noexcept final { return T::HasAbility::kWatchRuns; }
0071
0072 bool wantsGlobalLuminosityBlocks() const noexcept final {
0073 return T::HasAbility::kLuminosityBlockCache or T::HasAbility::kLuminosityBlockSummaryCache or
0074 T::HasAbility::kBeginLuminosityBlockProducer or T::HasAbility::kEndLuminosityBlockProducer;
0075 }
0076 bool wantsStreamLuminosityBlocks() const noexcept final { return T::HasAbility::kWatchLuminosityBlocks; }
0077
0078 bool hasAcquire() const noexcept final { return T::HasAbility::kExternalWork; }
0079
0080 bool hasAccumulator() const noexcept final { return T::HasAbility::kAccumulator; }
0081
0082 private:
0083 using MyGlobal = CallGlobal<T>;
0084 using MyInputProcessBlock = CallInputProcessBlock<T>;
0085 using MyWatchProcessBlock = CallWatchProcessBlock<T>;
0086 using MyBeginProcessBlockProduce = CallBeginProcessBlockProduce<T>;
0087 using MyEndProcessBlockProduce = CallEndProcessBlockProduce<T>;
0088 using MyGlobalRun = CallGlobalRun<T>;
0089 using MyGlobalRunSummary = CallGlobalRunSummary<T>;
0090 using MyBeginRunProduce = CallBeginRunProduce<T>;
0091 using MyEndRunProduce = CallEndRunProduce<T>;
0092 using MyGlobalLuminosityBlock = CallGlobalLuminosityBlock<T>;
0093 using MyGlobalLuminosityBlockSummary = CallGlobalLuminosityBlockSummary<T>;
0094 using MyBeginLuminosityBlockProduce = CallBeginLuminosityBlockProduce<T>;
0095 using MyEndLuminosityBlockProduce = CallEndLuminosityBlockProduce<T>;
0096
0097 void setupStreamModules() final {
0098 this->createStreamModules([this](unsigned int iStreamModule) -> M* {
0099 auto tmp = impl::makeStreamModule<T>(*m_pset, m_global.get());
0100 MyGlobal::set(tmp, m_global.get());
0101 MyInputProcessBlock::set(tmp, &m_inputProcessBlocks, iStreamModule);
0102 return tmp;
0103 });
0104 m_pset = nullptr;
0105 }
0106
0107 void preallocRuns(unsigned int iNRuns) final {
0108 m_runs.resize(iNRuns);
0109 m_runSummaries.resize(iNRuns);
0110 }
0111 void preallocLumis(unsigned int iNLumis) final {
0112 m_lumis.resize(iNLumis);
0113 m_lumiSummaries.resize(iNLumis);
0114 }
0115 void doBeginJob() final { MyGlobal::beginJob(m_global.get()); }
0116 void doEndJob() final { MyGlobal::endJob(m_global.get()); }
0117 void setupRun(M* iProd, RunIndex iIndex) final { MyGlobalRun::set(iProd, m_runs[iIndex].get()); }
0118 void streamEndRunSummary(M* iProd, edm::Run const& iRun, edm::EventSetup const& iES) final {
0119 auto s = m_runSummaries[iRun.index()].get();
0120 std::lock_guard<decltype(m_runSummaryLock)> guard(m_runSummaryLock);
0121 MyGlobalRunSummary::streamEndRunSummary(iProd, iRun, iES, s);
0122 }
0123
0124 void setupLuminosityBlock(M* iProd, LuminosityBlockIndex iIndex) final {
0125 MyGlobalLuminosityBlock::set(iProd, m_lumis[iIndex].get());
0126 }
0127 void streamEndLuminosityBlockSummary(M* iProd,
0128 edm::LuminosityBlock const& iLumi,
0129 edm::EventSetup const& iES) final {
0130 auto s = m_lumiSummaries[iLumi.index()].get();
0131 std::lock_guard<decltype(m_lumiSummaryLock)> guard(m_lumiSummaryLock);
0132 MyGlobalLuminosityBlockSummary::streamEndLuminosityBlockSummary(iProd, iLumi, iES, s);
0133 }
0134
0135 void doBeginProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) final {
0136 if constexpr (T::HasAbility::kWatchProcessBlock or T::HasAbility::kBeginProcessBlockProducer) {
0137 ProcessBlock processBlock(pbp, this->moduleDescription(), mcc, false);
0138 ProcessBlock const& cnstProcessBlock = processBlock;
0139 processBlock.setConsumer(this->consumer());
0140 MyWatchProcessBlock::beginProcessBlock(cnstProcessBlock, m_global.get());
0141 if constexpr (T::HasAbility::kBeginProcessBlockProducer) {
0142 processBlock.setProducer(this->producer());
0143 MyBeginProcessBlockProduce::produce(processBlock, m_global.get());
0144 this->commit(processBlock);
0145 }
0146 }
0147 }
0148
0149 void doAccessInputProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) final {
0150 if constexpr (T::HasAbility::kInputProcessBlockCache) {
0151 ProcessBlock processBlock(pbp, this->moduleDescription(), mcc, false);
0152 ProcessBlock const& cnstProcessBlock = processBlock;
0153 processBlock.setConsumer(this->consumer());
0154 MyInputProcessBlock::accessInputProcessBlock(cnstProcessBlock, m_global.get(), m_inputProcessBlocks);
0155 }
0156 }
0157
0158 void doEndProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) final {
0159 if constexpr (T::HasAbility::kWatchProcessBlock or T::HasAbility::kEndProcessBlockProducer) {
0160 ProcessBlock processBlock(pbp, this->moduleDescription(), mcc, true);
0161 ProcessBlock const& cnstProcessBlock = processBlock;
0162 processBlock.setConsumer(this->consumer());
0163 MyWatchProcessBlock::endProcessBlock(cnstProcessBlock, m_global.get());
0164 if constexpr (T::HasAbility::kEndProcessBlockProducer) {
0165 processBlock.setProducer(this->producer());
0166 MyEndProcessBlockProduce::produce(processBlock, m_global.get());
0167 this->commit(processBlock);
0168 }
0169 }
0170 }
0171
0172 void doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) final {
0173 if constexpr (T::HasAbility::kRunCache or T::HasAbility::kRunSummaryCache or T::HasAbility::kBeginRunProducer) {
0174 RunPrincipal const& rp = info.principal();
0175 Run r(rp, this->moduleDescription(), mcc, false);
0176 r.setConsumer(this->consumer());
0177 r.setProducer(this->producer());
0178 Run const& cnstR = r;
0179 RunIndex ri = rp.index();
0180 ESParentContext parentC(mcc);
0181 const EventSetup c{info,
0182 static_cast<unsigned int>(Transition::BeginRun),
0183 this->consumer()->esGetTokenIndices(Transition::BeginRun),
0184 parentC};
0185 MyGlobalRun::beginRun(cnstR, c, m_global.get(), m_runs[ri]);
0186 typename T::RunContext rc(m_runs[ri].get(), m_global.get());
0187 MyGlobalRunSummary::beginRun(cnstR, c, &rc, m_runSummaries[ri]);
0188 if constexpr (T::HasAbility::kBeginRunProducer) {
0189 MyBeginRunProduce::produce(r, c, &rc);
0190 this->commit(r);
0191 }
0192 }
0193 }
0194
0195 void doEndRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) final {
0196 if constexpr (T::HasAbility::kRunCache or T::HasAbility::kRunSummaryCache or T::HasAbility::kEndRunProducer) {
0197 RunPrincipal const& rp = info.principal();
0198 Run r(rp, this->moduleDescription(), mcc, true);
0199 r.setConsumer(this->consumer());
0200 r.setProducer(this->producer());
0201
0202 RunIndex ri = rp.index();
0203 typename T::RunContext rc(m_runs[ri].get(), m_global.get());
0204 ESParentContext parentC(mcc);
0205 const EventSetup c{info,
0206 static_cast<unsigned int>(Transition::EndRun),
0207 this->consumer()->esGetTokenIndices(Transition::EndRun),
0208 parentC};
0209 MyGlobalRunSummary::globalEndRun(r, c, &rc, m_runSummaries[ri].get());
0210 if constexpr (T::HasAbility::kEndRunProducer) {
0211 MyEndRunProduce::produce(r, c, &rc, m_runSummaries[ri].get());
0212 this->commit(r);
0213 }
0214 MyGlobalRun::endRun(r, c, &rc);
0215 }
0216 }
0217
0218 void doBeginLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) final {
0219 if constexpr (T::HasAbility::kLuminosityBlockCache or T::HasAbility::kLuminosityBlockSummaryCache or
0220 T::HasAbility::kBeginLuminosityBlockProducer) {
0221 LuminosityBlockPrincipal const& lbp = info.principal();
0222 LuminosityBlock lb(lbp, this->moduleDescription(), mcc, false);
0223 lb.setConsumer(this->consumer());
0224 lb.setProducer(this->producer());
0225 LuminosityBlock const& cnstLb = lb;
0226 LuminosityBlockIndex li = lbp.index();
0227 RunIndex ri = lbp.runPrincipal().index();
0228 typename T::RunContext rc(m_runs[ri].get(), m_global.get());
0229 ESParentContext parentC(mcc);
0230 const EventSetup c{info,
0231 static_cast<unsigned int>(Transition::BeginLuminosityBlock),
0232 this->consumer()->esGetTokenIndices(Transition::BeginLuminosityBlock),
0233 parentC};
0234
0235 MyGlobalLuminosityBlock::beginLuminosityBlock(cnstLb, c, &rc, m_lumis[li]);
0236 typename T::LuminosityBlockContext lc(m_lumis[li].get(), m_runs[ri].get(), m_global.get());
0237 MyGlobalLuminosityBlockSummary::beginLuminosityBlock(cnstLb, c, &lc, m_lumiSummaries[li]);
0238 if constexpr (T::HasAbility::kBeginLuminosityBlockProducer) {
0239 MyBeginLuminosityBlockProduce::produce(lb, c, &lc);
0240 this->commit(lb);
0241 }
0242 }
0243 }
0244 void doEndLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) final {
0245 if constexpr (T::HasAbility::kLuminosityBlockCache or T::HasAbility::kLuminosityBlockSummaryCache or
0246 T::HasAbility::kEndLuminosityBlockProducer) {
0247 LuminosityBlockPrincipal const& lbp = info.principal();
0248 LuminosityBlock lb(lbp, this->moduleDescription(), mcc, true);
0249 lb.setConsumer(this->consumer());
0250 lb.setProducer(this->producer());
0251
0252 LuminosityBlockIndex li = lbp.index();
0253 RunIndex ri = lbp.runPrincipal().index();
0254 typename T::LuminosityBlockContext lc(m_lumis[li].get(), m_runs[ri].get(), m_global.get());
0255 ESParentContext parentC(mcc);
0256 const EventSetup c{info,
0257 static_cast<unsigned int>(Transition::EndLuminosityBlock),
0258 this->consumer()->esGetTokenIndices(Transition::EndLuminosityBlock),
0259 parentC};
0260 MyGlobalLuminosityBlockSummary::globalEndLuminosityBlock(lb, c, &lc, m_lumiSummaries[li].get());
0261 if constexpr (T::HasAbility::kEndLuminosityBlockProducer) {
0262 MyEndLuminosityBlockProduce::produce(lb, c, &lc, m_lumiSummaries[li].get());
0263 this->commit(lb);
0264 }
0265 MyGlobalLuminosityBlock::endLuminosityBlock(lb, c, &lc);
0266 }
0267 }
0268
0269 void doRespondToCloseOutputFile() final { MyInputProcessBlock::clearCaches(m_inputProcessBlocks); }
0270
0271 using B::consumer;
0272
0273 void selectInputProcessBlocks(ProductRegistry const& productRegistry,
0274 ProcessBlockHelperBase const& processBlockHelperBase) final {
0275 MyInputProcessBlock::selectInputProcessBlocks(
0276 m_inputProcessBlocks, productRegistry, processBlockHelperBase, *consumer());
0277 }
0278
0279
0280 typename impl::choose_unique_ptr<typename T::GlobalCache>::type m_global;
0281 typename impl::choose_unique_ptr<typename T::InputProcessBlockCache>::type m_inputProcessBlocks;
0282 typename impl::choose_shared_vec<typename T::RunCache const>::type m_runs;
0283 typename impl::choose_shared_vec<typename T::LuminosityBlockCache const>::type m_lumis;
0284 typename impl::choose_shared_vec<typename T::RunSummaryCache>::type m_runSummaries;
0285 typename impl::choose_mutex<typename T::RunSummaryCache>::type m_runSummaryLock;
0286 typename impl::choose_shared_vec<typename T::LuminosityBlockSummaryCache>::type m_lumiSummaries;
0287 typename impl::choose_mutex<typename T::LuminosityBlockSummaryCache>::type m_lumiSummaryLock;
0288 ParameterSet const* m_pset;
0289 };
0290 }
0291 }
0292
0293 #endif