Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-03 05:27:00

0001 
0002 /*----------------------------------------------------------------------
0003 
0004 Toy edm::stream::EDProducer modules of
0005 edm::*Cache templates and edm::*Producer classes
0006 for testing purposes only.
0007 
0008 ----------------------------------------------------------------------*/
0009 
0010 #include <atomic>
0011 #include <functional>
0012 #include <iostream>
0013 #include <map>
0014 #include <tuple>
0015 #include <unistd.h>
0016 #include <vector>
0017 #include <chrono>
0018 
0019 #include "FWCore/Framework/interface/CacheHandle.h"
0020 #include "FWCore/Framework/interface/stream/EDProducer.h"
0021 #include "FWCore/Framework/interface/maker/WorkerT.h"
0022 #include "FWCore/Framework/interface/HistoryAppender.h"
0023 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0024 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0025 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0026 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0027 #include "FWCore/Framework/interface/Event.h"
0028 #include "FWCore/Framework/interface/MakerMacros.h"
0029 #include "FWCore/Framework/interface/ProcessBlock.h"
0030 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0031 #include "FWCore/Utilities/interface/EDMException.h"
0032 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0033 
0034 namespace edmtest {
0035   namespace stream {
0036 
0037     // anonymous namespace here causes build warnings
0038     namespace cache {
0039       struct Cache {
0040         Cache() : value(0), run(0), lumi(0) {}
0041         //Using mutable since we want to update the value.
0042         mutable std::atomic<unsigned int> value;
0043         mutable std::atomic<unsigned int> run;
0044         mutable std::atomic<unsigned int> lumi;
0045       };
0046 
0047       struct SummaryCache {
0048         // Intentionally not thread safe, not atomic
0049         unsigned int value = 0;
0050       };
0051 
0052       struct TestGlobalCache {
0053         CMS_THREAD_SAFE mutable edm::EDPutTokenT<unsigned int> token_;
0054         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenBegin_;
0055         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenEnd_;
0056         unsigned int trans_{0};
0057         mutable std::atomic<unsigned int> m_count{0};
0058       };
0059 
0060     }  // namespace cache
0061 
0062     using Cache = cache::Cache;
0063     using SummaryCache = cache::SummaryCache;
0064     using TestGlobalCache = cache::TestGlobalCache;
0065 
0066     class GlobalIntProducer : public edm::stream::EDProducer<edm::GlobalCache<Cache>> {
0067     public:
0068       static std::atomic<unsigned int> m_count;
0069       unsigned int trans_;
0070       static std::atomic<unsigned int> cvalue_;
0071 
0072       static std::unique_ptr<Cache> initializeGlobalCache(edm::ParameterSet const&) {
0073         ++m_count;
0074         return std::make_unique<Cache>();
0075       }
0076 
0077       GlobalIntProducer(edm::ParameterSet const& p, const Cache* iGlobal) {
0078         trans_ = p.getParameter<int>("transitions");
0079         cvalue_ = p.getParameter<int>("cachevalue");
0080         produces<unsigned int>();
0081       }
0082 
0083       static void globalBeginJob(Cache* iGlobal) {
0084         ++m_count;
0085         if (iGlobal->value != 0) {
0086           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
0087         }
0088       }
0089 
0090       void produce(edm::Event&, edm::EventSetup const&) override {
0091         ++m_count;
0092         ++((globalCache())->value);
0093       }
0094 
0095       static void globalEndJob(Cache* iGlobal) {
0096         ++m_count;
0097         if (iGlobal->value != cvalue_) {
0098           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be " << cvalue_;
0099         }
0100       }
0101 
0102       ~GlobalIntProducer() {
0103         if (m_count != trans_) {
0104           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0105         }
0106       }
0107     };
0108 
0109     class RunIntProducer : public edm::stream::EDProducer<edm::RunCache<Cache>, edm::stream::WatchRuns> {
0110     public:
0111       static std::atomic<unsigned int> m_count;
0112       unsigned int trans_;
0113       static std::atomic<unsigned int> cvalue_;
0114 
0115       RunIntProducer(edm::ParameterSet const& p) {
0116         trans_ = p.getParameter<int>("transitions");
0117         cvalue_ = p.getParameter<int>("cachevalue");
0118         produces<unsigned int>();
0119       }
0120 
0121       void produce(edm::Event&, edm::EventSetup const&) override {
0122         ++m_count;
0123         ++(runCache()->value);
0124       }
0125 
0126       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0127         ++m_count;
0128         auto pCache = std::make_shared<Cache>();
0129         pCache->run = iRun.runAuxiliary().run();
0130         return pCache;
0131       }
0132 
0133       void beginRun(edm::Run const& iRun, edm::EventSetup const&) override {
0134         if (runCache()->run != iRun.runAuxiliary().run()) {
0135           throw cms::Exception("begin out of sequence") << "beginRun seen before globalBeginRun";
0136         }
0137       }
0138 
0139       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0140         ++m_count;
0141         auto pCache = iContext->run();
0142         if (pCache->run != iRun.runAuxiliary().run()) {
0143           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0144         }
0145         pCache->run = 0;
0146         if (iContext->run()->value != cvalue_) {
0147           throw cms::Exception("cache value") << iContext->run()->value << " but it was supposed to be " << cvalue_;
0148         }
0149       }
0150 
0151       void endRun(edm::Run const& iRun, edm::EventSetup const&) override {
0152         if (runCache()->run != iRun.runAuxiliary().run()) {
0153           throw cms::Exception("end out of sequence") << "globalEndRun seen before endRun";
0154         }
0155       }
0156 
0157       ~RunIntProducer() {
0158         if (m_count != trans_) {
0159           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0160         }
0161       }
0162     };
0163 
0164     class LumiIntProducer
0165         : public edm::stream::EDProducer<edm::LuminosityBlockCache<Cache>, edm::stream::WatchLuminosityBlocks> {
0166     public:
0167       static std::atomic<unsigned int> m_count;
0168       unsigned int trans_;
0169       static std::atomic<unsigned int> cvalue_;
0170 
0171       LumiIntProducer(edm::ParameterSet const& p) {
0172         trans_ = p.getParameter<int>("transitions");
0173         cvalue_ = p.getParameter<int>("cachevalue");
0174         produces<unsigned int>();
0175       }
0176 
0177       void produce(edm::Event&, edm::EventSetup const&) override {
0178         ++m_count;
0179         ++(luminosityBlockCache()->value);
0180       }
0181 
0182       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0183                                                                edm::EventSetup const&,
0184                                                                RunContext const*) {
0185         ++m_count;
0186         auto pCache = std::make_shared<Cache>();
0187         pCache->run = iLB.luminosityBlockAuxiliary().run();
0188         pCache->lumi = iLB.luminosityBlockAuxiliary().luminosityBlock();
0189         return pCache;
0190       }
0191 
0192       void beginLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0193         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0194             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0195           throw cms::Exception("begin out of sequence")
0196               << "beginLuminosityBlock seen before globalBeginLuminosityBlock " << luminosityBlockCache()->run << " "
0197               << iLB.luminosityBlockAuxiliary().run();
0198         }
0199       }
0200 
0201       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0202                                            edm::EventSetup const&,
0203                                            LuminosityBlockContext const* iLBContext) {
0204         ++m_count;
0205         auto pCache = iLBContext->luminosityBlock();
0206         if (pCache->run != iLB.luminosityBlockAuxiliary().run() ||
0207             pCache->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0208           throw cms::Exception("end out of sequence")
0209               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0210               << iLB.luminosityBlock();
0211         }
0212         pCache->run = 0;
0213         pCache->lumi = 0;
0214         if (iLBContext->luminosityBlock()->value != cvalue_) {
0215           throw cms::Exception("cache value")
0216               << iLBContext->luminosityBlock()->value << " but it was supposed to be " << cvalue_;
0217         }
0218       }
0219 
0220       void endLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0221         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0222             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0223           throw cms::Exception("end out of sequence") << "globalEndLuminosityBlock seen before endLuminosityBlock";
0224         }
0225       }
0226 
0227       ~LumiIntProducer() {
0228         if (m_count != trans_) {
0229           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0230         }
0231       }
0232     };
0233 
0234     class RunSummaryIntProducer : public edm::stream::EDProducer<edm::RunCache<Cache>,
0235                                                                  edm::RunSummaryCache<SummaryCache>,
0236                                                                  edm::stream::WatchRuns> {
0237     public:
0238       static std::atomic<unsigned int> m_count;
0239       unsigned int trans_;
0240       static std::atomic<unsigned int> cvalue_;
0241       static std::atomic<bool> globalBeginRunCalled_;
0242       unsigned int valueAccumulatedForStream_ = 0;
0243       bool endRunWasCalled_ = false;
0244 
0245       RunSummaryIntProducer(edm::ParameterSet const& p) {
0246         trans_ = p.getParameter<int>("transitions");
0247         cvalue_ = p.getParameter<int>("cachevalue");
0248         produces<unsigned int>();
0249       }
0250 
0251       void beginRun(edm::Run const&, edm::EventSetup const&) override {
0252         valueAccumulatedForStream_ = 0;
0253         endRunWasCalled_ = false;
0254       }
0255 
0256       void produce(edm::Event&, edm::EventSetup const&) override {
0257         ++m_count;
0258         ++(runCache()->value);
0259         ++valueAccumulatedForStream_;
0260       }
0261 
0262       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0263         ++m_count;
0264         globalBeginRunCalled_ = true;
0265         auto pCache = std::make_shared<Cache>();
0266         ++(pCache->run);
0267         return pCache;
0268       }
0269 
0270       static std::shared_ptr<SummaryCache> globalBeginRunSummary(edm::Run const&,
0271                                                                  edm::EventSetup const&,
0272                                                                  GlobalCache const*) {
0273         ++m_count;
0274         if (!globalBeginRunCalled_) {
0275           throw cms::Exception("begin out of sequence") << "globalBeginRunSummary seen before globalBeginRun";
0276         }
0277         globalBeginRunCalled_ = false;
0278         return std::make_shared<SummaryCache>();
0279       }
0280 
0281       void endRunSummary(edm::Run const&, edm::EventSetup const&, SummaryCache* runSummaryCache) const override {
0282         runSummaryCache->value += valueAccumulatedForStream_;
0283         if (!endRunWasCalled_) {
0284           throw cms::Exception("end out of sequence") << "endRunSummary seen before endRun";
0285         }
0286       }
0287 
0288       static void globalEndRunSummary(edm::Run const&,
0289                                       edm::EventSetup const&,
0290                                       RunContext const*,
0291                                       SummaryCache* runSummaryCache) {
0292         ++m_count;
0293         if (runSummaryCache->value != cvalue_) {
0294           throw cms::Exception("unexpectedValue")
0295               << "run summary cache value = " << runSummaryCache->value << " but it was supposed to be " << cvalue_;
0296         }
0297       }
0298 
0299       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0300         ++m_count;
0301         auto pCache = iContext->run();
0302         if (pCache->value != cvalue_) {
0303           throw cms::Exception("unExpectedValue")
0304               << "run cache value " << pCache->value << " but it was supposed to be " << cvalue_;
0305         }
0306         if (pCache->run != 1) {
0307           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0308         }
0309       }
0310 
0311       void endRun(edm::Run const&, edm::EventSetup const&) override { endRunWasCalled_ = true; }
0312 
0313       ~RunSummaryIntProducer() {
0314         if (m_count != trans_) {
0315           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0316         }
0317       }
0318     };
0319 
0320     class LumiSummaryIntProducer : public edm::stream::EDProducer<edm::LuminosityBlockCache<Cache>,
0321                                                                   edm::LuminosityBlockSummaryCache<SummaryCache>,
0322                                                                   edm::stream::WatchLuminosityBlocks> {
0323     public:
0324       static std::atomic<unsigned int> m_count;
0325       unsigned int trans_;
0326       static std::atomic<unsigned int> cvalue_;
0327       static std::atomic<bool> globalBeginLumiCalled_;
0328       unsigned int valueAccumulatedForStream_ = 0;
0329       bool endLumiWasCalled_ = false;
0330 
0331       LumiSummaryIntProducer(edm::ParameterSet const& p) {
0332         trans_ = p.getParameter<int>("transitions");
0333         cvalue_ = p.getParameter<int>("cachevalue");
0334         produces<unsigned int>();
0335       }
0336 
0337       void produce(edm::Event&, edm::EventSetup const&) override {
0338         ++m_count;
0339         ++(luminosityBlockCache()->value);
0340         ++valueAccumulatedForStream_;
0341       }
0342 
0343       void beginLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0344         valueAccumulatedForStream_ = 0;
0345         endLumiWasCalled_ = false;
0346       }
0347 
0348       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0349                                                                edm::EventSetup const&,
0350                                                                RunContext const*) {
0351         ++m_count;
0352         globalBeginLumiCalled_ = true;
0353         auto pCache = std::make_shared<Cache>();
0354         ++(pCache->lumi);
0355         return pCache;
0356       }
0357 
0358       static std::shared_ptr<SummaryCache> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
0359                                                                              edm::EventSetup const&,
0360                                                                              LuminosityBlockContext const*) {
0361         ++m_count;
0362         if (!globalBeginLumiCalled_) {
0363           throw cms::Exception("begin out of sequence")
0364               << "globalBeginLuminosityBlockSummary seen before globalBeginLuminosityBlock";
0365         }
0366         globalBeginLumiCalled_ = false;
0367         return std::make_shared<SummaryCache>();
0368       }
0369 
0370       void endLuminosityBlockSummary(edm::LuminosityBlock const&,
0371                                      edm::EventSetup const&,
0372                                      SummaryCache* lumiSummaryCache) const override {
0373         lumiSummaryCache->value += valueAccumulatedForStream_;
0374         if (!endLumiWasCalled_) {
0375           throw cms::Exception("end out of sequence") << "endLuminosityBlockSummary seen before endLuminosityBlock";
0376         }
0377       }
0378 
0379       static void globalEndLuminosityBlockSummary(edm::LuminosityBlock const&,
0380                                                   edm::EventSetup const&,
0381                                                   LuminosityBlockContext const* iLBContext,
0382                                                   SummaryCache* lumiSummaryCache) {
0383         ++m_count;
0384         if (lumiSummaryCache->value != cvalue_) {
0385           throw cms::Exception("unexpectedValue")
0386               << "lumi summary cache value = " << lumiSummaryCache->value << " but it was supposed to be " << cvalue_;
0387         }
0388         auto pCache = iLBContext->luminosityBlock();
0389         // Add one so globalEndLuminosityBlock can check this function was called first
0390         ++pCache->value;
0391       }
0392 
0393       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0394                                            edm::EventSetup const&,
0395                                            LuminosityBlockContext const* iLBContext) {
0396         ++m_count;
0397         auto pCache = iLBContext->luminosityBlock();
0398         if (pCache->value != cvalue_ + 1) {
0399           throw cms::Exception("unexpectedValue")
0400               << "lumi cache value " << pCache->value << " but it was supposed to be " << cvalue_ + 1;
0401         }
0402         if (pCache->lumi != 1) {
0403           throw cms::Exception("end out of sequence")
0404               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0405               << iLB.luminosityBlock();
0406         }
0407       }
0408 
0409       void endLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0410         endLumiWasCalled_ = true;
0411       }
0412 
0413       ~LumiSummaryIntProducer() {
0414         if (m_count != trans_) {
0415           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0416         }
0417       }
0418     };
0419 
0420     class ProcessBlockIntProducer
0421         : public edm::stream::EDProducer<edm::WatchProcessBlock, edm::GlobalCache<TestGlobalCache>> {
0422     public:
0423       explicit ProcessBlockIntProducer(edm::ParameterSet const& pset, TestGlobalCache const* testGlobalCache) {
0424         {
0425           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0426           if (not tag.label().empty()) {
0427             testGlobalCache->getTokenBegin_ = consumes<unsigned int, edm::InProcess>(tag);
0428           }
0429         }
0430         {
0431           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0432           if (not tag.label().empty()) {
0433             testGlobalCache->getTokenEnd_ = consumes<unsigned int, edm::InProcess>(tag);
0434           }
0435         }
0436       }
0437 
0438       static std::unique_ptr<TestGlobalCache> initializeGlobalCache(edm::ParameterSet const& pset) {
0439         auto testGlobalCache = std::make_unique<TestGlobalCache>();
0440         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0441         return testGlobalCache;
0442       }
0443 
0444       static void beginProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCache* testGlobalCache) {
0445         if (testGlobalCache->m_count != 0) {
0446           throw cms::Exception("transitions") << "ProcessBlockIntProducer::begin transitions "
0447                                               << testGlobalCache->m_count << " but it was supposed to be " << 0;
0448         }
0449         ++testGlobalCache->m_count;
0450 
0451         const unsigned int valueToGet = 51;
0452         if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0453           if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0454             throw cms::Exception("BadValue")
0455                 << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0456           }
0457         }
0458       }
0459 
0460       void produce(edm::Event&, edm::EventSetup const&) override {
0461         TestGlobalCache const* testGlobalCache = globalCache();
0462         if (testGlobalCache->m_count < 1u) {
0463           throw cms::Exception("out of sequence") << "produce before beginProcessBlock " << testGlobalCache->m_count;
0464         }
0465         ++testGlobalCache->m_count;
0466       }
0467 
0468       static void endProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCache* testGlobalCache) {
0469         ++testGlobalCache->m_count;
0470         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0471           throw cms::Exception("transitions") << "ProcessBlockIntProducer::end transitions " << testGlobalCache->m_count
0472                                               << " but it was supposed to be " << testGlobalCache->trans_;
0473         }
0474         {
0475           const unsigned int valueToGet = 51;
0476           if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0477             if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0478               throw cms::Exception("BadValue")
0479                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0480             }
0481           }
0482         }
0483         {
0484           const unsigned int valueToGet = 61;
0485           if (not testGlobalCache->getTokenEnd_.isUninitialized()) {
0486             if (processBlock.get(testGlobalCache->getTokenEnd_) != valueToGet) {
0487               throw cms::Exception("BadValue")
0488                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenEnd_);
0489             }
0490           }
0491         }
0492       }
0493 
0494       static void globalEndJob(TestGlobalCache* testGlobalCache) {
0495         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0496           throw cms::Exception("transitions")
0497               << "TestBeginProcessBlockProducer transitions " << testGlobalCache->m_count
0498               << " but it was supposed to be " << testGlobalCache->trans_;
0499         }
0500       }
0501 
0502       ~ProcessBlockIntProducer() {
0503         TestGlobalCache const* testGlobalCache = globalCache();
0504         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0505           throw cms::Exception("transitions") << "ProcessBlockIntProducer transitions " << testGlobalCache->m_count
0506                                               << " but it was supposed to be " << testGlobalCache->trans_;
0507         }
0508       }
0509     };
0510 
0511     class TestBeginProcessBlockProducer
0512         : public edm::stream::EDProducer<edm::BeginProcessBlockProducer, edm::GlobalCache<TestGlobalCache>> {
0513     public:
0514       explicit TestBeginProcessBlockProducer(edm::ParameterSet const& pset, TestGlobalCache const* testGlobalCache) {
0515         testGlobalCache->token_ = produces<unsigned int, edm::Transition::BeginProcessBlock>("begin");
0516 
0517         auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0518         if (not tag.label().empty()) {
0519           testGlobalCache->getTokenBegin_ = consumes<unsigned int, edm::InProcess>(tag);
0520         }
0521       }
0522 
0523       static std::unique_ptr<TestGlobalCache> initializeGlobalCache(edm::ParameterSet const& pset) {
0524         auto testGlobalCache = std::make_unique<TestGlobalCache>();
0525         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0526         return testGlobalCache;
0527       }
0528 
0529       static void beginProcessBlockProduce(edm::ProcessBlock& processBlock, TestGlobalCache const* testGlobalCache) {
0530         if (testGlobalCache->m_count != 0) {
0531           throw cms::Exception("transitions") << "TestBeginProcessBlockProducer transitions "
0532                                               << testGlobalCache->m_count << " but it was supposed to be " << 0;
0533         }
0534         ++testGlobalCache->m_count;
0535 
0536         const unsigned int valueToPutAndGet = 51;
0537         processBlock.emplace(testGlobalCache->token_, valueToPutAndGet);
0538 
0539         if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0540           if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToPutAndGet) {
0541             throw cms::Exception("BadValue")
0542                 << "expected " << valueToPutAndGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0543           }
0544         }
0545       }
0546 
0547       void produce(edm::Event&, edm::EventSetup const&) override {
0548         TestGlobalCache const* testGlobalCache = globalCache();
0549         if (testGlobalCache->m_count < 1u) {
0550           throw cms::Exception("out of sequence")
0551               << "produce before beginProcessBlockProduce " << testGlobalCache->m_count;
0552         }
0553         ++testGlobalCache->m_count;
0554       }
0555 
0556       static void globalEndJob(TestGlobalCache* testGlobalCache) {
0557         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0558           throw cms::Exception("transitions")
0559               << "TestBeginProcessBlockProducer transitions " << testGlobalCache->m_count
0560               << " but it was supposed to be " << testGlobalCache->trans_;
0561         }
0562       }
0563 
0564       ~TestBeginProcessBlockProducer() {
0565         TestGlobalCache const* testGlobalCache = globalCache();
0566         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0567           throw cms::Exception("transitions")
0568               << "TestBeginProcessBlockProducer transitions " << testGlobalCache->m_count
0569               << " but it was supposed to be " << testGlobalCache->trans_;
0570         }
0571       }
0572     };
0573 
0574     class TestEndProcessBlockProducer
0575         : public edm::stream::EDProducer<edm::EndProcessBlockProducer, edm::GlobalCache<TestGlobalCache>> {
0576     public:
0577       explicit TestEndProcessBlockProducer(edm::ParameterSet const& pset, TestGlobalCache const* testGlobalCache) {
0578         testGlobalCache->token_ = produces<unsigned int, edm::Transition::EndProcessBlock>("end");
0579 
0580         auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0581         if (not tag.label().empty()) {
0582           testGlobalCache->getTokenEnd_ = consumes<unsigned int, edm::InProcess>(tag);
0583         }
0584       }
0585 
0586       static std::unique_ptr<TestGlobalCache> initializeGlobalCache(edm::ParameterSet const& pset) {
0587         auto testGlobalCache = std::make_unique<TestGlobalCache>();
0588         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0589         return testGlobalCache;
0590       }
0591 
0592       void produce(edm::Event&, edm::EventSetup const&) override {
0593         TestGlobalCache const* testGlobalCache = globalCache();
0594         ++testGlobalCache->m_count;
0595       }
0596 
0597       static void endProcessBlockProduce(edm::ProcessBlock& processBlock, TestGlobalCache const* testGlobalCache) {
0598         ++testGlobalCache->m_count;
0599         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0600           throw cms::Exception("transitions") << "TestEndProcessBlockProducer transitions " << testGlobalCache->m_count
0601                                               << " but it was supposed to be " << testGlobalCache->trans_;
0602         }
0603         const unsigned int valueToPutAndGet = 61;
0604         processBlock.emplace(testGlobalCache->token_, valueToPutAndGet);
0605 
0606         if (not testGlobalCache->getTokenEnd_.isUninitialized()) {
0607           if (processBlock.get(testGlobalCache->getTokenEnd_) != valueToPutAndGet) {
0608             throw cms::Exception("BadValue")
0609                 << "expected " << valueToPutAndGet << " but got " << processBlock.get(testGlobalCache->getTokenEnd_);
0610           }
0611         }
0612       }
0613 
0614       static void globalEndJob(TestGlobalCache* testGlobalCache) {
0615         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0616           throw cms::Exception("transitions")
0617               << "TestBeginProcessBlockProducer transitions " << testGlobalCache->m_count
0618               << " but it was supposed to be " << testGlobalCache->trans_;
0619         }
0620       }
0621 
0622       ~TestEndProcessBlockProducer() {
0623         TestGlobalCache const* testGlobalCache = globalCache();
0624         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0625           throw cms::Exception("transitions") << "~TestEndProcessBlockProducer transitions " << testGlobalCache->m_count
0626                                               << " but it was supposed to be " << testGlobalCache->trans_;
0627         }
0628       }
0629     };
0630 
0631     class ProcessBlockIntProducerNoGlobalCache : public edm::stream::EDProducer<edm::WatchProcessBlock> {
0632     public:
0633       explicit ProcessBlockIntProducerNoGlobalCache(edm::ParameterSet const&) {}
0634 
0635       static void beginProcessBlock(edm::ProcessBlock const&) {}
0636 
0637       void produce(edm::Event&, edm::EventSetup const&) override {}
0638 
0639       static void endProcessBlock(edm::ProcessBlock const&) {}
0640     };
0641 
0642     class TestBeginProcessBlockProducerNoGlobalCache : public edm::stream::EDProducer<edm::BeginProcessBlockProducer> {
0643     public:
0644       explicit TestBeginProcessBlockProducerNoGlobalCache(edm::ParameterSet const&) {}
0645 
0646       static void beginProcessBlockProduce(edm::ProcessBlock&) {}
0647 
0648       void produce(edm::Event&, edm::EventSetup const&) override {}
0649     };
0650 
0651     class TestEndProcessBlockProducerNoGlobalCache : public edm::stream::EDProducer<edm::EndProcessBlockProducer> {
0652     public:
0653       explicit TestEndProcessBlockProducerNoGlobalCache(edm::ParameterSet const&) {}
0654 
0655       void produce(edm::Event&, edm::EventSetup const&) override {}
0656 
0657       static void endProcessBlockProduce(edm::ProcessBlock&) {}
0658     };
0659 
0660     class TestBeginRunProducer : public edm::stream::EDProducer<edm::RunCache<bool>, edm::BeginRunProducer> {
0661     public:
0662       static std::atomic<unsigned int> m_count;
0663       unsigned int trans_;
0664       static std::atomic<unsigned int> cvalue_;
0665       static std::atomic<bool> gbr;
0666       static std::atomic<bool> ger;
0667       static std::atomic<bool> gbrp;
0668 
0669       TestBeginRunProducer(edm::ParameterSet const& p) {
0670         trans_ = p.getParameter<int>("transitions");
0671         cvalue_ = p.getParameter<int>("cachevalue");
0672         produces<unsigned int>();
0673         produces<unsigned int, edm::Transition::BeginRun>("a");
0674       }
0675 
0676       static std::shared_ptr<bool> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0677         gbr = true;
0678         ger = false;
0679         gbrp = false;
0680         return std::shared_ptr<bool>{};
0681       }
0682 
0683       void produce(edm::Event&, edm::EventSetup const&) override {
0684         if (!gbrp) {
0685           throw cms::Exception("out of sequence") << "produce before globalBeginRunProduce";
0686         }
0687       }
0688 
0689       static void globalBeginRunProduce(edm::Run& iRun, edm::EventSetup const&, RunContext const*) {
0690         gbrp = true;
0691         ++m_count;
0692         if (!gbr) {
0693           throw cms::Exception("begin out of sequence") << "globalBeginRunProduce seen before globalBeginRun";
0694         }
0695       }
0696 
0697       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0698         if (!gbr) {
0699           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0700         }
0701         gbr = false;
0702         ger = true;
0703       }
0704 
0705       ~TestBeginRunProducer() {
0706         if (m_count != trans_) {
0707           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0708         }
0709       }
0710     };
0711 
0712     class TestEndRunProducer : public edm::stream::EDProducer<edm::RunCache<bool>, edm::EndRunProducer> {
0713     public:
0714       static std::atomic<unsigned int> m_count;
0715       unsigned int trans_;
0716       static std::atomic<unsigned int> cvalue_;
0717       static std::atomic<bool> gbr;
0718       static std::atomic<bool> ger;
0719       static std::atomic<bool> p;
0720 
0721       static std::shared_ptr<bool> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0722         gbr = true;
0723         ger = false;
0724         p = false;
0725         return std::shared_ptr<bool>{};
0726       }
0727 
0728       TestEndRunProducer(edm::ParameterSet const& p) {
0729         trans_ = p.getParameter<int>("transitions");
0730         cvalue_ = p.getParameter<int>("cachevalue");
0731         produces<unsigned int>();
0732         produces<unsigned int, edm::Transition::EndRun>("a");
0733       }
0734 
0735       void produce(edm::Event&, edm::EventSetup const&) override { p = true; }
0736 
0737       static void globalEndRunProduce(edm::Run& iRun, edm::EventSetup const&, RunContext const*) {
0738         ++m_count;
0739         if (!p) {
0740           throw cms::Exception("out of sequence") << "globalEndRunProduce seen before produce";
0741         }
0742         if (ger) {
0743           throw cms::Exception("out of sequence") << "globalEndRun seen before globalEndRunProduce";
0744         }
0745       }
0746 
0747       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0748         if (!gbr) {
0749           throw cms::Exception("out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0750         }
0751         gbr = false;
0752         ger = true;
0753       }
0754 
0755       ~TestEndRunProducer() {
0756         if (m_count != trans_) {
0757           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0758         }
0759       }
0760     };
0761 
0762     class TestBeginLumiBlockProducer
0763         : public edm::stream::EDProducer<edm::LuminosityBlockCache<bool>, edm::BeginLuminosityBlockProducer> {
0764     public:
0765       static std::atomic<unsigned int> m_count;
0766       unsigned int trans_;
0767       static std::atomic<unsigned int> cvalue_;
0768       static std::atomic<bool> gbl;
0769       static std::atomic<bool> gel;
0770       static std::atomic<bool> gblp;
0771 
0772       TestBeginLumiBlockProducer(edm::ParameterSet const& p) {
0773         trans_ = p.getParameter<int>("transitions");
0774         cvalue_ = p.getParameter<int>("cachevalue");
0775         produces<unsigned int>();
0776         produces<unsigned int, edm::Transition::BeginLuminosityBlock>("a");
0777       }
0778 
0779       void produce(edm::Event&, edm::EventSetup const&) override {
0780         if (!gblp) {
0781           throw cms::Exception("begin out of sequence") << "produce seen before globalBeginLumiBlockProduce";
0782         }
0783       }
0784 
0785       static void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&,
0786                                                     edm::EventSetup const&,
0787                                                     LuminosityBlockContext const*) {
0788         ++m_count;
0789         if (!gbl) {
0790           throw cms::Exception("begin out of sequence")
0791               << "globalBeginLumiBlockProduce seen before globalBeginLumiBlock";
0792         }
0793         gblp = true;
0794       }
0795 
0796       static std::shared_ptr<bool> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0797                                                               edm::EventSetup const&,
0798                                                               RunContext const*) {
0799         gbl = true;
0800         gel = false;
0801         gblp = false;
0802         return std::shared_ptr<bool>();
0803       }
0804 
0805       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0806                                            edm::EventSetup const&,
0807                                            LuminosityBlockContext const* iLBContext) {
0808         if (!gbl) {
0809           throw cms::Exception("end out of sequence")
0810               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0811               << iLB.luminosityBlock();
0812         }
0813         gel = true;
0814         gbl = false;
0815       }
0816 
0817       ~TestBeginLumiBlockProducer() {
0818         if (m_count != trans_) {
0819           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0820         }
0821       }
0822     };
0823 
0824     class TestEndLumiBlockProducer
0825         : public edm::stream::EDProducer<edm::LuminosityBlockCache<bool>, edm::EndLuminosityBlockProducer> {
0826     public:
0827       static std::atomic<unsigned int> m_count;
0828       unsigned int trans_;
0829       static std::atomic<unsigned int> cvalue_;
0830       static std::atomic<bool> gbl;
0831       static std::atomic<bool> gel;
0832       static std::atomic<bool> p;
0833 
0834       TestEndLumiBlockProducer(edm::ParameterSet const& p) {
0835         trans_ = p.getParameter<int>("transitions");
0836         cvalue_ = p.getParameter<int>("cachevalue");
0837         produces<unsigned int>();
0838         produces<unsigned int, edm::Transition::EndLuminosityBlock>("a");
0839       }
0840 
0841       void produce(edm::Event&, edm::EventSetup const&) override { p = true; }
0842 
0843       static void globalEndLuminosityBlockProduce(edm::LuminosityBlock&,
0844                                                   edm::EventSetup const&,
0845                                                   LuminosityBlockContext const*) {
0846         ++m_count;
0847         if (!p) {
0848           throw cms::Exception("out of sequence") << "globalEndLumiBlockProduce seen before produce";
0849         }
0850       }
0851 
0852       static std::shared_ptr<bool> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0853                                                               edm::EventSetup const&,
0854                                                               RunContext const*) {
0855         gbl = true;
0856         gel = false;
0857         p = false;
0858         return std::shared_ptr<bool>{};
0859       }
0860 
0861       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0862                                            edm::EventSetup const&,
0863                                            LuminosityBlockContext const* iLBContext) {
0864         if (!gbl) {
0865           throw cms::Exception("end out of sequence")
0866               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0867               << iLB.luminosityBlock();
0868         }
0869       }
0870 
0871       ~TestEndLumiBlockProducer() {
0872         if (m_count != trans_) {
0873           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0874         }
0875       }
0876     };
0877 
0878     struct Count {
0879       Count() : m_value(0), m_expectedValue(0) {}
0880       //Using mutable since we want to update the value.
0881       mutable std::atomic<unsigned int> m_value;
0882       mutable std::atomic<unsigned int> m_expectedValue;
0883       // Set only in constructor, framework runs constructors serially
0884       CMS_THREAD_SAFE mutable edm::EDPutTokenT<unsigned int> m_putToken;
0885     };
0886 
0887     class TestAccumulator
0888         : public edm::stream::EDProducer<edm::GlobalCache<Count>, edm::Accumulator, edm::EndLuminosityBlockProducer> {
0889     public:
0890       static std::atomic<unsigned int> m_expectedCount;
0891 
0892       explicit TestAccumulator(edm::ParameterSet const& p, Count const* iCount) {
0893         iCount->m_expectedValue = p.getParameter<unsigned int>("expectedCount");
0894         if (iCount->m_putToken.isUninitialized()) {
0895           iCount->m_putToken = produces<unsigned int, edm::Transition::EndLuminosityBlock>();
0896         }
0897       }
0898 
0899       static std::unique_ptr<Count> initializeGlobalCache(edm::ParameterSet const&) {
0900         return std::unique_ptr<Count>(new Count());
0901       }
0902 
0903       void accumulate(edm::Event const&, edm::EventSetup const&) override { ++(globalCache()->m_value); }
0904 
0905       static void globalEndLuminosityBlockProduce(edm::LuminosityBlock& l,
0906                                                   edm::EventSetup const&,
0907                                                   LuminosityBlockContext const* ctx) {
0908         Count const* count = ctx->global();
0909         l.emplace(count->m_putToken, count->m_value.load());
0910       }
0911 
0912       static void globalEndJob(Count const* iCount) {
0913         if (iCount->m_value != iCount->m_expectedValue) {
0914           throw cms::Exception("CountEvents") << "Number of events seen = " << iCount->m_value
0915                                               << " but it was supposed to be " << iCount->m_expectedValue;
0916         }
0917       }
0918 
0919       ~TestAccumulator() {}
0920     };
0921 
0922     class TestInputProcessBlockCache {
0923     public:
0924       long long int value_ = 0;
0925     };
0926 
0927     class TestInputProcessBlockCache1 {
0928     public:
0929       long long int value_ = 0;
0930     };
0931 
0932     class InputProcessBlockIntProducer
0933         : public edm::stream::EDProducer<
0934               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>> {
0935     public:
0936       explicit InputProcessBlockIntProducer(edm::ParameterSet const& pset) {
0937         {
0938           expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
0939           sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
0940           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0941           if (not tag.label().empty()) {
0942             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0943           }
0944         }
0945         {
0946           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0947           if (not tag.label().empty()) {
0948             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0949           }
0950         }
0951         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
0952             getTokenBegin_,
0953             [this](edm::ProcessBlock const& processBlock,
0954                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
0955               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
0956               returnValue->value_ += processBlock.get(getTokenBegin_).value;
0957               returnValue->value_ += processBlock.get(getTokenEnd_).value;
0958               return returnValue;
0959             });
0960       }
0961 
0962       static void accessInputProcessBlock(edm::ProcessBlock const&) {
0963         edm::LogAbsolute("InputProcessBlockIntProducer") << "InputProcessBlockIntProducer::accessInputProcessBlock";
0964       }
0965 
0966       void produce(edm::Event& event, edm::EventSetup const&) override {
0967         auto cacheTuple = processBlockCaches(event);
0968         if (!expectedByRun_.empty()) {
0969           if (expectedByRun_.at(event.run() - 1) !=
0970               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
0971             throw cms::Exception("UnexpectedValue")
0972                 << "InputProcessBlockIntProducer::produce cached value was "
0973                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
0974                 << " but it was supposed to be " << expectedByRun_.at(event.run() - 1);
0975           }
0976         }
0977         // Force events to be processed concurrently
0978         if (sleepTime_ > 0) {
0979           std::this_thread::sleep_for(std::chrono::microseconds(sleepTime_));
0980         }
0981       }
0982 
0983     private:
0984       edm::EDGetTokenT<IntProduct> getTokenBegin_;
0985       edm::EDGetTokenT<IntProduct> getTokenEnd_;
0986       std::vector<int> expectedByRun_;
0987       unsigned int sleepTime_{0};
0988     };
0989 
0990     struct InputProcessBlockGlobalCacheAn {
0991       // The tokens are duplicated in this test module to prove that they
0992       // work both as GlobalCache members and module data members.
0993       // We need them as GlobalCache members for accessInputProcessBlock.
0994       // In registerProcessBlockCacheFiller we use tokens that are member
0995       // variables of the class and because the lambda captures the "this"
0996       // pointer of the zeroth stream module instance. We always
0997       // use the zeroth EDConsumer. In the case of registerProcessBlockCacheFiller,
0998       // either set of tokens would work. Note that in the GlobalCache case
0999       // there is a slight weirdness that the zeroth consumer is used but
1000       // the token comes from the last consumer instance. It works because
1001       // all the stream module instances have EDConsumer base classes with
1002       // containers with the same contents in the same order (not 100% guaranteed,
1003       // but it would be difficult to implement a module where this isn't true).
1004       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBegin_;
1005       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEnd_;
1006       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBeginM_;
1007       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEndM_;
1008       mutable std::atomic<unsigned int> transitions_{0};
1009       int sum_{0};
1010       unsigned int expectedTransitions_{0};
1011       std::vector<int> expectedByRun_;
1012       int expectedSum_{0};
1013       unsigned int sleepTime_{0};
1014     };
1015 
1016     // Same thing as previous class except with a GlobalCache added
1017     class InputProcessBlockIntProducerG
1018         : public edm::stream::EDProducer<
1019               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>,
1020               edm::GlobalCache<InputProcessBlockGlobalCacheAn>> {
1021     public:
1022       explicit InputProcessBlockIntProducerG(edm::ParameterSet const& pset,
1023                                              InputProcessBlockGlobalCacheAn const* testGlobalCache) {
1024         {
1025           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
1026           if (not tag.label().empty()) {
1027             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
1028             testGlobalCache->getTokenBegin_ = getTokenBegin_;
1029           }
1030         }
1031         {
1032           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
1033           if (not tag.label().empty()) {
1034             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
1035             testGlobalCache->getTokenEnd_ = getTokenEnd_;
1036           }
1037         }
1038         {
1039           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlockM");
1040           if (not tag.label().empty()) {
1041             getTokenBeginM_ = consumes<IntProduct, edm::InProcess>(tag);
1042             testGlobalCache->getTokenBeginM_ = getTokenBeginM_;
1043           }
1044         }
1045         {
1046           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlockM");
1047           if (not tag.label().empty()) {
1048             getTokenEndM_ = consumes<IntProduct, edm::InProcess>(tag);
1049             testGlobalCache->getTokenEndM_ = getTokenEndM_;
1050           }
1051         }
1052         registerProcessBlockCacheFiller<int>(
1053             getTokenBegin_, [this](edm::ProcessBlock const& processBlock, std::shared_ptr<int> const& previousCache) {
1054               auto returnValue = std::make_shared<int>(0);
1055               *returnValue += processBlock.get(getTokenBegin_).value;
1056               *returnValue += processBlock.get(getTokenEnd_).value;
1057               ++globalCache()->transitions_;
1058               return returnValue;
1059             });
1060         registerProcessBlockCacheFiller<1>(getTokenBegin_,
1061                                            [this](edm::ProcessBlock const& processBlock,
1062                                                   std::shared_ptr<TestInputProcessBlockCache> const& previousCache) {
1063                                              auto returnValue = std::make_shared<TestInputProcessBlockCache>();
1064                                              returnValue->value_ += processBlock.get(getTokenBegin_).value;
1065                                              returnValue->value_ += processBlock.get(getTokenEnd_).value;
1066                                              ++globalCache()->transitions_;
1067                                              return returnValue;
1068                                            });
1069         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
1070             getTokenBegin_,
1071             [this](edm::ProcessBlock const& processBlock,
1072                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
1073               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
1074               returnValue->value_ += processBlock.get(getTokenBegin_).value;
1075               returnValue->value_ += processBlock.get(getTokenEnd_).value;
1076               ++globalCache()->transitions_;
1077               return returnValue;
1078             });
1079       }
1080 
1081       static std::unique_ptr<InputProcessBlockGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const& pset) {
1082         auto testGlobalCache = std::make_unique<InputProcessBlockGlobalCacheAn>();
1083         testGlobalCache->expectedTransitions_ = pset.getParameter<int>("transitions");
1084         testGlobalCache->expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
1085         testGlobalCache->expectedSum_ = pset.getParameter<int>("expectedSum");
1086         testGlobalCache->sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
1087         return testGlobalCache;
1088       }
1089 
1090       static void accessInputProcessBlock(edm::ProcessBlock const& processBlock,
1091                                           InputProcessBlockGlobalCacheAn* testGlobalCache) {
1092         if (processBlock.processName() == "PROD1") {
1093           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBegin_).value;
1094           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEnd_).value;
1095         }
1096         if (processBlock.processName() == "MERGE") {
1097           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBeginM_).value;
1098           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEndM_).value;
1099         }
1100         ++testGlobalCache->transitions_;
1101       }
1102 
1103       void produce(edm::Event& event, edm::EventSetup const&) override {
1104         auto cacheTuple = processBlockCaches(event);
1105         auto testGlobalCache = globalCache();
1106         if (!testGlobalCache->expectedByRun_.empty()) {
1107           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != *std::get<edm::CacheHandle<int>>(cacheTuple)) {
1108             throw cms::Exception("UnexpectedValue")
1109                 << "InputProcessBlockIntProducerG::produce cached value was "
1110                 << *std::get<edm::CacheHandle<int>>(cacheTuple) << " but it was supposed to be "
1111                 << testGlobalCache->expectedByRun_.at(event.run() - 1);
1112           }
1113           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != std::get<1>(cacheTuple)->value_) {
1114             throw cms::Exception("UnexpectedValue")
1115                 << "InputProcessBlockIntProducerG::produce second cached value was " << std::get<1>(cacheTuple)->value_
1116                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
1117           }
1118           if (testGlobalCache->expectedByRun_.at(event.run() - 1) !=
1119               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
1120             throw cms::Exception("UnexpectedValue")
1121                 << "InputProcessBlockIntProducerG::produce third cached value was "
1122                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
1123                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
1124           }
1125         }
1126         ++testGlobalCache->transitions_;
1127 
1128         // Force events to be processed concurrently
1129         if (testGlobalCache->sleepTime_ > 0) {
1130           std::this_thread::sleep_for(std::chrono::microseconds(testGlobalCache->sleepTime_));
1131         }
1132       }
1133 
1134       static void globalEndJob(InputProcessBlockGlobalCacheAn* testGlobalCache) {
1135         if (testGlobalCache->transitions_ != testGlobalCache->expectedTransitions_) {
1136           throw cms::Exception("transitions")
1137               << "InputProcessBlockIntProducerG transitions " << testGlobalCache->transitions_
1138               << " but it was supposed to be " << testGlobalCache->expectedTransitions_;
1139         }
1140 
1141         if (testGlobalCache->sum_ != testGlobalCache->expectedSum_) {
1142           throw cms::Exception("UnexpectedValue") << "InputProcessBlockIntProducerG sum " << testGlobalCache->sum_
1143                                                   << " but it was supposed to be " << testGlobalCache->expectedSum_;
1144         }
1145       }
1146 
1147     private:
1148       edm::EDGetTokenT<IntProduct> getTokenBegin_;
1149       edm::EDGetTokenT<IntProduct> getTokenEnd_;
1150       edm::EDGetTokenT<IntProduct> getTokenBeginM_;
1151       edm::EDGetTokenT<IntProduct> getTokenEndM_;
1152     };
1153 
1154   }  // namespace stream
1155 }  // namespace edmtest
1156 std::atomic<unsigned int> edmtest::stream::GlobalIntProducer::m_count{0};
1157 std::atomic<unsigned int> edmtest::stream::RunIntProducer::m_count{0};
1158 std::atomic<unsigned int> edmtest::stream::LumiIntProducer::m_count{0};
1159 std::atomic<unsigned int> edmtest::stream::RunSummaryIntProducer::m_count{0};
1160 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntProducer::m_count{0};
1161 std::atomic<unsigned int> edmtest::stream::TestBeginRunProducer::m_count{0};
1162 std::atomic<unsigned int> edmtest::stream::TestEndRunProducer::m_count{0};
1163 std::atomic<unsigned int> edmtest::stream::TestBeginLumiBlockProducer::m_count{0};
1164 std::atomic<unsigned int> edmtest::stream::TestEndLumiBlockProducer::m_count{0};
1165 std::atomic<unsigned int> edmtest::stream::GlobalIntProducer::cvalue_{0};
1166 std::atomic<unsigned int> edmtest::stream::RunIntProducer::cvalue_{0};
1167 std::atomic<unsigned int> edmtest::stream::LumiIntProducer::cvalue_{0};
1168 std::atomic<unsigned int> edmtest::stream::RunSummaryIntProducer::cvalue_{0};
1169 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntProducer::cvalue_{0};
1170 std::atomic<unsigned int> edmtest::stream::TestBeginRunProducer::cvalue_{0};
1171 std::atomic<unsigned int> edmtest::stream::TestEndRunProducer::cvalue_{0};
1172 std::atomic<unsigned int> edmtest::stream::TestBeginLumiBlockProducer::cvalue_{0};
1173 std::atomic<unsigned int> edmtest::stream::TestEndLumiBlockProducer::cvalue_{0};
1174 std::atomic<bool> edmtest::stream::RunSummaryIntProducer::globalBeginRunCalled_{false};
1175 std::atomic<bool> edmtest::stream::LumiSummaryIntProducer::globalBeginLumiCalled_{false};
1176 std::atomic<bool> edmtest::stream::TestBeginRunProducer::gbr{false};
1177 std::atomic<bool> edmtest::stream::TestBeginRunProducer::gbrp{false};
1178 std::atomic<bool> edmtest::stream::TestBeginRunProducer::ger{false};
1179 std::atomic<bool> edmtest::stream::TestEndRunProducer::gbr{false};
1180 std::atomic<bool> edmtest::stream::TestEndRunProducer::ger{false};
1181 std::atomic<bool> edmtest::stream::TestEndRunProducer::p{false};
1182 std::atomic<bool> edmtest::stream::TestBeginLumiBlockProducer::gbl{false};
1183 std::atomic<bool> edmtest::stream::TestBeginLumiBlockProducer::gblp{false};
1184 std::atomic<bool> edmtest::stream::TestBeginLumiBlockProducer::gel{false};
1185 std::atomic<bool> edmtest::stream::TestEndLumiBlockProducer::gbl{false};
1186 std::atomic<bool> edmtest::stream::TestEndLumiBlockProducer::gel{false};
1187 std::atomic<bool> edmtest::stream::TestEndLumiBlockProducer::p{false};
1188 DEFINE_FWK_MODULE(edmtest::stream::GlobalIntProducer);
1189 DEFINE_FWK_MODULE(edmtest::stream::RunIntProducer);
1190 DEFINE_FWK_MODULE(edmtest::stream::LumiIntProducer);
1191 DEFINE_FWK_MODULE(edmtest::stream::RunSummaryIntProducer);
1192 DEFINE_FWK_MODULE(edmtest::stream::LumiSummaryIntProducer);
1193 DEFINE_FWK_MODULE(edmtest::stream::ProcessBlockIntProducer);
1194 DEFINE_FWK_MODULE(edmtest::stream::TestBeginProcessBlockProducer);
1195 DEFINE_FWK_MODULE(edmtest::stream::TestEndProcessBlockProducer);
1196 DEFINE_FWK_MODULE(edmtest::stream::ProcessBlockIntProducerNoGlobalCache);
1197 DEFINE_FWK_MODULE(edmtest::stream::TestBeginProcessBlockProducerNoGlobalCache);
1198 DEFINE_FWK_MODULE(edmtest::stream::TestEndProcessBlockProducerNoGlobalCache);
1199 DEFINE_FWK_MODULE(edmtest::stream::TestBeginRunProducer);
1200 DEFINE_FWK_MODULE(edmtest::stream::TestEndRunProducer);
1201 DEFINE_FWK_MODULE(edmtest::stream::TestBeginLumiBlockProducer);
1202 DEFINE_FWK_MODULE(edmtest::stream::TestEndLumiBlockProducer);
1203 DEFINE_FWK_MODULE(edmtest::stream::TestAccumulator);
1204 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntProducer);
1205 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntProducerG);