Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-03 05:27:00

0001 
0002 /*----------------------------------------------------------------------
0003 
0004 Toy edm::stream::EDFilter modules of
0005 edm::*Cache templates and edm::*Producer classes
0006 for testing purposes only.
0007 
0008 ----------------------------------------------------------------------*/
0009 
0010 #include <atomic>
0011 #include <functional>
0012 #include <iostream>
0013 #include <map>
0014 #include <tuple>
0015 #include <unistd.h>
0016 #include <vector>
0017 #include <chrono>
0018 
0019 #include "FWCore/Framework/interface/CacheHandle.h"
0020 #include "FWCore/Framework/interface/stream/EDFilter.h"
0021 #include "FWCore/Framework/interface/maker/WorkerT.h"
0022 #include "FWCore/Framework/interface/HistoryAppender.h"
0023 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0024 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0025 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0026 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0027 #include "FWCore/Framework/interface/Event.h"
0028 #include "FWCore/Framework/interface/MakerMacros.h"
0029 #include "FWCore/Framework/interface/ProcessBlock.h"
0030 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0031 #include "FWCore/Utilities/interface/EDMException.h"
0032 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0033 
0034 namespace edmtest {
0035   namespace stream {
0036 
0037     // anonymous namespace here causes build warnings
0038     namespace cache {
0039       struct Cache {
0040         Cache() : value(0), run(0), lumi(0) {}
0041         //Using mutable since we want to update the value.
0042         mutable std::atomic<unsigned int> value;
0043         mutable std::atomic<unsigned int> run;
0044         mutable std::atomic<unsigned int> lumi;
0045       };
0046 
0047       struct SummaryCache {
0048         // Intentionally not thread safe, not atomic
0049         unsigned int value = 0;
0050       };
0051 
0052       struct TestGlobalCacheFil {
0053         CMS_THREAD_SAFE mutable edm::EDPutTokenT<unsigned int> token_;
0054         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenBegin_;
0055         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenEnd_;
0056         unsigned int trans_{0};
0057         mutable std::atomic<unsigned int> m_count{0};
0058       };
0059     }  // namespace cache
0060 
0061     using Cache = cache::Cache;
0062     using SummaryCache = cache::SummaryCache;
0063     using TestGlobalCacheFil = cache::TestGlobalCacheFil;
0064 
0065     class GlobalIntFilter : public edm::stream::EDFilter<edm::GlobalCache<Cache>> {
0066     public:
0067       static std::atomic<unsigned int> m_count;
0068       unsigned int trans_;
0069       static std::atomic<unsigned int> cvalue_;
0070 
0071       static std::unique_ptr<Cache> initializeGlobalCache(edm::ParameterSet const&) {
0072         ++m_count;
0073         return std::make_unique<Cache>();
0074       }
0075 
0076       GlobalIntFilter(edm::ParameterSet const& p, const Cache* iGlobal) {
0077         trans_ = p.getParameter<int>("transitions");
0078         cvalue_ = p.getParameter<int>("cachevalue");
0079         produces<unsigned int>();
0080       }
0081 
0082       static void globalBeginJob(Cache* iGlobal) {
0083         ++m_count;
0084         if (iGlobal->value != 0) {
0085           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
0086         }
0087       }
0088 
0089       bool filter(edm::Event&, edm::EventSetup const&) override {
0090         ++m_count;
0091         ++((globalCache())->value);
0092 
0093         return true;
0094       }
0095 
0096       static void globalEndJob(Cache* iGlobal) {
0097         ++m_count;
0098         if (iGlobal->value != cvalue_) {
0099           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be " << cvalue_;
0100         }
0101       }
0102 
0103       ~GlobalIntFilter() {
0104         if (m_count != trans_) {
0105           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0106         }
0107       }
0108     };
0109 
0110     class RunIntFilter : public edm::stream::EDFilter<edm::RunCache<Cache>, edm::stream::WatchRuns> {
0111     public:
0112       static std::atomic<unsigned int> m_count;
0113       unsigned int trans_;
0114       static std::atomic<unsigned int> cvalue_;
0115 
0116       RunIntFilter(edm::ParameterSet const& p) {
0117         trans_ = p.getParameter<int>("transitions");
0118         cvalue_ = p.getParameter<int>("cachevalue");
0119         m_count = 0;
0120         produces<unsigned int>();
0121       }
0122 
0123       bool filter(edm::Event&, edm::EventSetup const&) override {
0124         ++m_count;
0125         ++(runCache()->value);
0126         return true;
0127       }
0128 
0129       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0130         ++m_count;
0131         auto pCache = std::make_shared<Cache>();
0132         pCache->run = iRun.runAuxiliary().run();
0133         return pCache;
0134       }
0135 
0136       void beginRun(edm::Run const& iRun, edm::EventSetup const&) override {
0137         if (runCache()->run != iRun.runAuxiliary().run()) {
0138           throw cms::Exception("begin out of sequence") << "beginRun seen before globalBeginRun";
0139         }
0140       }
0141 
0142       void endRun(edm::Run const& iRun, edm::EventSetup const&) override {
0143         if (runCache()->run != iRun.runAuxiliary().run()) {
0144           throw cms::Exception("end out of sequence") << "globalEndRun seen before endRun";
0145         }
0146       }
0147 
0148       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0149         ++m_count;
0150         auto pCache = iContext->run();
0151         if (pCache->run != iRun.runAuxiliary().run()) {
0152           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0153         }
0154         pCache->run = 0;
0155         if (iContext->run()->value != cvalue_) {
0156           throw cms::Exception("cache value") << iContext->run()->value << " but it was supposed to be " << cvalue_;
0157         }
0158       }
0159 
0160       ~RunIntFilter() {
0161         if (m_count != trans_) {
0162           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0163         }
0164       }
0165     };
0166 
0167     class LumiIntFilter
0168         : public edm::stream::EDFilter<edm::LuminosityBlockCache<Cache>, edm::stream::WatchLuminosityBlocks> {
0169     public:
0170       static std::atomic<unsigned int> m_count;
0171       unsigned int trans_;
0172       static std::atomic<unsigned int> cvalue_;
0173 
0174       LumiIntFilter(edm::ParameterSet const& p) {
0175         trans_ = p.getParameter<int>("transitions");
0176         cvalue_ = p.getParameter<int>("cachevalue");
0177         m_count = 0;
0178         produces<unsigned int>();
0179       }
0180 
0181       bool filter(edm::Event&, edm::EventSetup const&) override {
0182         ++m_count;
0183         ++(luminosityBlockCache()->value);
0184 
0185         return true;
0186       }
0187 
0188       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0189                                                                edm::EventSetup const&,
0190                                                                RunContext const*) {
0191         ++m_count;
0192         auto pCache = std::make_shared<Cache>();
0193         pCache->run = iLB.luminosityBlockAuxiliary().run();
0194         pCache->lumi = iLB.luminosityBlockAuxiliary().luminosityBlock();
0195         return pCache;
0196       }
0197 
0198       void beginLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0199         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0200             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0201           throw cms::Exception("begin out of sequence")
0202               << "beginLuminosityBlock seen before globalBeginLuminosityBlock " << luminosityBlockCache()->run << " "
0203               << iLB.luminosityBlockAuxiliary().run();
0204         }
0205       }
0206 
0207       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0208                                            edm::EventSetup const&,
0209                                            LuminosityBlockContext const* iLBContext) {
0210         ++m_count;
0211         auto pCache = iLBContext->luminosityBlock();
0212         if (pCache->run != iLB.luminosityBlockAuxiliary().run() ||
0213             pCache->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0214           throw cms::Exception("end out of sequence")
0215               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0216               << iLB.luminosityBlock();
0217         }
0218         pCache->run = 0;
0219         pCache->lumi = 0;
0220         if (pCache->value != cvalue_) {
0221           throw cms::Exception("cache value") << "LumiIntFilter cache value " << iLBContext->luminosityBlock()->value
0222                                               << " but it was supposed to be " << cvalue_;
0223         }
0224       }
0225 
0226       void endLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0227         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0228             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0229           throw cms::Exception("end out of sequence") << "globalEndLuminosityBlock seen before endLuminosityBlock";
0230         }
0231       }
0232 
0233       ~LumiIntFilter() {
0234         if (m_count != trans_) {
0235           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0236         }
0237       }
0238     };
0239 
0240     class RunSummaryIntFilter
0241         : public edm::stream::EDFilter<edm::RunCache<Cache>, edm::RunSummaryCache<SummaryCache>, edm::stream::WatchRuns> {
0242     public:
0243       static std::atomic<unsigned int> m_count;
0244       unsigned int trans_;
0245       static std::atomic<unsigned int> cvalue_;
0246       static std::atomic<bool> globalBeginRunCalled_;
0247       unsigned int valueAccumulatedForStream_ = 0;
0248       bool endRunWasCalled_ = false;
0249 
0250       RunSummaryIntFilter(edm::ParameterSet const& p) {
0251         trans_ = p.getParameter<int>("transitions");
0252         cvalue_ = p.getParameter<int>("cachevalue");
0253         m_count = 0;
0254         produces<unsigned int>();
0255       }
0256 
0257       void beginRun(edm::Run const&, edm::EventSetup const&) override {
0258         valueAccumulatedForStream_ = 0;
0259         endRunWasCalled_ = false;
0260       }
0261 
0262       bool filter(edm::Event&, edm::EventSetup const&) override {
0263         ++m_count;
0264         ++(runCache()->value);
0265         ++valueAccumulatedForStream_;
0266         return true;
0267       }
0268 
0269       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0270         ++m_count;
0271         globalBeginRunCalled_ = true;
0272         auto pCache = std::make_shared<Cache>();
0273         ++(pCache->run);
0274         return pCache;
0275       }
0276 
0277       static std::shared_ptr<SummaryCache> globalBeginRunSummary(edm::Run const&,
0278                                                                  edm::EventSetup const&,
0279                                                                  GlobalCache const*) {
0280         ++m_count;
0281         if (!globalBeginRunCalled_) {
0282           throw cms::Exception("begin out of sequence") << "globalBeginRunSummary seen before globalBeginRun";
0283         }
0284         globalBeginRunCalled_ = false;
0285         return std::make_shared<SummaryCache>();
0286       }
0287 
0288       void endRunSummary(edm::Run const&, edm::EventSetup const&, SummaryCache* runSummaryCache) const override {
0289         runSummaryCache->value += valueAccumulatedForStream_;
0290         if (!endRunWasCalled_) {
0291           throw cms::Exception("end out of sequence") << "endRunSummary seen before endRun";
0292         }
0293       }
0294 
0295       static void globalEndRunSummary(edm::Run const&,
0296                                       edm::EventSetup const&,
0297                                       RunContext const*,
0298                                       SummaryCache* runSummaryCache) {
0299         ++m_count;
0300         if (runSummaryCache->value != cvalue_) {
0301           throw cms::Exception("unexpectedValue")
0302               << "run summary cache value = " << runSummaryCache->value << " but it was supposed to be " << cvalue_;
0303         }
0304       }
0305 
0306       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0307         ++m_count;
0308         auto pCache = iContext->run();
0309         if (pCache->value != cvalue_) {
0310           throw cms::Exception("unExpectedValue")
0311               << "run cache value " << pCache->value << " but it was supposed to be " << cvalue_;
0312         }
0313         if (pCache->run != 1) {
0314           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0315         }
0316       }
0317 
0318       void endRun(edm::Run const&, edm::EventSetup const&) override { endRunWasCalled_ = true; }
0319 
0320       ~RunSummaryIntFilter() {
0321         if (m_count != trans_) {
0322           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0323         }
0324       }
0325     };
0326 
0327     class LumiSummaryIntFilter : public edm::stream::EDFilter<edm::LuminosityBlockCache<Cache>,
0328                                                               edm::LuminosityBlockSummaryCache<SummaryCache>,
0329                                                               edm::stream::WatchLuminosityBlocks> {
0330     public:
0331       static std::atomic<unsigned int> m_count;
0332       unsigned int trans_;
0333       static std::atomic<unsigned int> cvalue_;
0334       static std::atomic<bool> globalBeginLumiCalled_;
0335       unsigned int valueAccumulatedForStream_ = 0;
0336       bool endLumiWasCalled_ = false;
0337 
0338       LumiSummaryIntFilter(edm::ParameterSet const& p) {
0339         trans_ = p.getParameter<int>("transitions");
0340         cvalue_ = p.getParameter<int>("cachevalue");
0341         m_count = 0;
0342         produces<unsigned int>();
0343       }
0344 
0345       bool filter(edm::Event&, edm::EventSetup const&) override {
0346         ++m_count;
0347         ++(luminosityBlockCache()->value);
0348         ++valueAccumulatedForStream_;
0349         return true;
0350       }
0351 
0352       void beginLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0353         valueAccumulatedForStream_ = 0;
0354         valueAccumulatedForStream_ = 0;
0355         endLumiWasCalled_ = false;
0356       }
0357 
0358       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0359                                                                edm::EventSetup const&,
0360                                                                RunContext const*) {
0361         ++m_count;
0362         globalBeginLumiCalled_ = true;
0363         auto pCache = std::make_shared<Cache>();
0364         ++(pCache->lumi);
0365         return pCache;
0366       }
0367 
0368       static std::shared_ptr<SummaryCache> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
0369                                                                              edm::EventSetup const&,
0370                                                                              LuminosityBlockContext const*) {
0371         ++m_count;
0372         if (!globalBeginLumiCalled_) {
0373           throw cms::Exception("begin out of sequence")
0374               << "globalBeginLuminosityBlockSummary seen before globalBeginLuminosityBlock";
0375         }
0376         globalBeginLumiCalled_ = false;
0377         return std::make_shared<SummaryCache>();
0378       }
0379 
0380       void endLuminosityBlockSummary(edm::LuminosityBlock const&,
0381                                      edm::EventSetup const&,
0382                                      SummaryCache* lumiSummaryCache) const override {
0383         lumiSummaryCache->value += valueAccumulatedForStream_;
0384         if (!endLumiWasCalled_) {
0385           throw cms::Exception("end out of sequence") << "endLuminosityBlockSummary seen before endLuminosityBlock";
0386         }
0387       }
0388 
0389       static void globalEndLuminosityBlockSummary(edm::LuminosityBlock const&,
0390                                                   edm::EventSetup const&,
0391                                                   LuminosityBlockContext const* iLBContext,
0392                                                   SummaryCache* lumiSummaryCache) {
0393         ++m_count;
0394         if (lumiSummaryCache->value != cvalue_) {
0395           throw cms::Exception("unexpectedValue")
0396               << "lumi summary cache value = " << lumiSummaryCache->value << " but it was supposed to be " << cvalue_;
0397         }
0398         auto pCache = iLBContext->luminosityBlock();
0399         // Add one so globalEndLuminosityBlock can check this function was called first
0400         ++pCache->value;
0401       }
0402 
0403       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0404                                            edm::EventSetup const&,
0405                                            LuminosityBlockContext const* iLBContext) {
0406         ++m_count;
0407         auto pCache = iLBContext->luminosityBlock();
0408         if (pCache->value != cvalue_ + 1) {
0409           throw cms::Exception("unexpectedValue")
0410               << "lumi cache value " << pCache->value << " but it was supposed to be " << cvalue_ + 1;
0411         }
0412         if (pCache->lumi != 1) {
0413           throw cms::Exception("end out of sequence")
0414               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0415               << iLB.luminosityBlock();
0416         }
0417       }
0418 
0419       void endLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0420         endLumiWasCalled_ = true;
0421       }
0422 
0423       ~LumiSummaryIntFilter() {
0424         if (m_count != trans_) {
0425           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0426         }
0427       }
0428     };
0429 
0430     class ProcessBlockIntFilter
0431         : public edm::stream::EDFilter<edm::WatchProcessBlock, edm::GlobalCache<TestGlobalCacheFil>> {
0432     public:
0433       explicit ProcessBlockIntFilter(edm::ParameterSet const& pset, TestGlobalCacheFil const* testGlobalCache) {
0434         produces<unsigned int>();
0435 
0436         {
0437           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0438           if (not tag.label().empty()) {
0439             testGlobalCache->getTokenBegin_ = consumes<unsigned int, edm::InProcess>(tag);
0440           }
0441         }
0442         {
0443           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0444           if (not tag.label().empty()) {
0445             testGlobalCache->getTokenEnd_ = consumes<unsigned int, edm::InProcess>(tag);
0446           }
0447         }
0448       }
0449 
0450       static std::unique_ptr<TestGlobalCacheFil> initializeGlobalCache(edm::ParameterSet const& pset) {
0451         auto testGlobalCache = std::make_unique<TestGlobalCacheFil>();
0452         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0453         return testGlobalCache;
0454       }
0455 
0456       static void beginProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheFil* testGlobalCache) {
0457         if (testGlobalCache->m_count != 0) {
0458           throw cms::Exception("transitions") << "ProcessBlockIntFilter::begin transitions " << testGlobalCache->m_count
0459                                               << " but it was supposed to be " << 0;
0460         }
0461         ++testGlobalCache->m_count;
0462         const unsigned int valueToGet = 71;
0463         if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0464           if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0465             throw cms::Exception("BadValue")
0466                 << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0467           }
0468         }
0469       }
0470 
0471       bool filter(edm::Event&, edm::EventSetup const&) override {
0472         TestGlobalCacheFil const* testGlobalCache = globalCache();
0473         if (testGlobalCache->m_count < 1u) {
0474           throw cms::Exception("out of sequence") << "produce before beginProcessBlock " << testGlobalCache->m_count;
0475         }
0476         ++testGlobalCache->m_count;
0477         return true;
0478       }
0479 
0480       static void endProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheFil* testGlobalCache) {
0481         ++testGlobalCache->m_count;
0482         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0483           throw cms::Exception("transitions") << "ProcessBlockIntFilter::end transitions " << testGlobalCache->m_count
0484                                               << " but it was supposed to be " << testGlobalCache->trans_;
0485         }
0486         {
0487           const unsigned int valueToGet = 71;
0488           if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0489             if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0490               throw cms::Exception("BadValue")
0491                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0492             }
0493           }
0494         }
0495         {
0496           const unsigned int valueToGet = 81;
0497           if (not testGlobalCache->getTokenEnd_.isUninitialized()) {
0498             if (processBlock.get(testGlobalCache->getTokenEnd_) != valueToGet) {
0499               throw cms::Exception("BadValue")
0500                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenEnd_);
0501             }
0502           }
0503         }
0504       }
0505 
0506       static void globalEndJob(TestGlobalCacheFil* testGlobalCache) {
0507         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0508           throw cms::Exception("transitions") << "ProcessBlockIntFilter transitions " << testGlobalCache->m_count
0509                                               << " but it was supposed to be " << testGlobalCache->trans_;
0510         }
0511       }
0512 
0513       ~ProcessBlockIntFilter() {
0514         TestGlobalCacheFil const* testGlobalCache = globalCache();
0515         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0516           throw cms::Exception("transitions") << "ProcessBlockIntFilter transitions " << testGlobalCache->m_count
0517                                               << " but it was supposed to be " << testGlobalCache->trans_;
0518         }
0519       }
0520     };
0521 
0522     class TestBeginProcessBlockFilter
0523         : public edm::stream::EDFilter<edm::BeginProcessBlockProducer, edm::GlobalCache<TestGlobalCacheFil>> {
0524     public:
0525       explicit TestBeginProcessBlockFilter(edm::ParameterSet const& pset, TestGlobalCacheFil const* testGlobalCache) {
0526         testGlobalCache->token_ = produces<unsigned int, edm::Transition::BeginProcessBlock>("begin");
0527         produces<unsigned int>();
0528 
0529         auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0530         if (not tag.label().empty()) {
0531           testGlobalCache->getTokenBegin_ = consumes<unsigned int, edm::InProcess>(tag);
0532         }
0533       }
0534 
0535       static std::unique_ptr<TestGlobalCacheFil> initializeGlobalCache(edm::ParameterSet const& pset) {
0536         auto testGlobalCache = std::make_unique<TestGlobalCacheFil>();
0537         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0538         return testGlobalCache;
0539       }
0540 
0541       static void beginProcessBlockProduce(edm::ProcessBlock& processBlock, TestGlobalCacheFil const* testGlobalCache) {
0542         if (testGlobalCache->m_count != 0) {
0543           throw cms::Exception("transitions") << "TestBeginProcessBlockFilter transitions " << testGlobalCache->m_count
0544                                               << " but it was supposed to be " << 0;
0545         }
0546         ++testGlobalCache->m_count;
0547 
0548         const unsigned int valueToPutAndGet = 71;
0549         processBlock.emplace(testGlobalCache->token_, valueToPutAndGet);
0550 
0551         if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0552           if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToPutAndGet) {
0553             throw cms::Exception("BadValue")
0554                 << "expected " << valueToPutAndGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0555           }
0556         }
0557       }
0558 
0559       bool filter(edm::Event&, edm::EventSetup const&) override {
0560         TestGlobalCacheFil const* testGlobalCache = globalCache();
0561         if (testGlobalCache->m_count < 1u) {
0562           throw cms::Exception("out of sequence")
0563               << "produce before beginProcessBlockProduce " << testGlobalCache->m_count;
0564         }
0565         ++testGlobalCache->m_count;
0566         return true;
0567       }
0568 
0569       static void globalEndJob(TestGlobalCacheFil* testGlobalCache) {
0570         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0571           throw cms::Exception("transitions") << "TestBeginProcessBlockFilter transitions " << testGlobalCache->m_count
0572                                               << " but it was supposed to be " << testGlobalCache->trans_;
0573         }
0574       }
0575 
0576       ~TestBeginProcessBlockFilter() {
0577         TestGlobalCacheFil const* testGlobalCache = globalCache();
0578         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0579           throw cms::Exception("transitions") << "TestBeginProcessBlockFilter transitions " << testGlobalCache->m_count
0580                                               << " but it was supposed to be " << testGlobalCache->trans_;
0581         }
0582       }
0583     };
0584 
0585     class TestEndProcessBlockFilter
0586         : public edm::stream::EDFilter<edm::EndProcessBlockProducer, edm::GlobalCache<TestGlobalCacheFil>> {
0587     public:
0588       explicit TestEndProcessBlockFilter(edm::ParameterSet const& pset, TestGlobalCacheFil const* testGlobalCache) {
0589         testGlobalCache->token_ = produces<unsigned int, edm::Transition::EndProcessBlock>("end");
0590         produces<unsigned int>();
0591 
0592         auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0593         if (not tag.label().empty()) {
0594           testGlobalCache->getTokenEnd_ = consumes<unsigned int, edm::InProcess>(tag);
0595         }
0596       }
0597 
0598       static std::unique_ptr<TestGlobalCacheFil> initializeGlobalCache(edm::ParameterSet const& pset) {
0599         auto testGlobalCache = std::make_unique<TestGlobalCacheFil>();
0600         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0601         return testGlobalCache;
0602       }
0603 
0604       bool filter(edm::Event&, edm::EventSetup const&) override {
0605         TestGlobalCacheFil const* testGlobalCache = globalCache();
0606         ++testGlobalCache->m_count;
0607         return true;
0608       }
0609 
0610       static void endProcessBlockProduce(edm::ProcessBlock& processBlock, TestGlobalCacheFil const* testGlobalCache) {
0611         ++testGlobalCache->m_count;
0612         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0613           throw cms::Exception("transitions") << "TestEndProcessBlockFilter transitions " << testGlobalCache->m_count
0614                                               << " but it was supposed to be " << testGlobalCache->trans_;
0615         }
0616 
0617         const unsigned int valueToPutAndGet = 81;
0618         processBlock.emplace(testGlobalCache->token_, valueToPutAndGet);
0619         if (not testGlobalCache->getTokenEnd_.isUninitialized()) {
0620           if (processBlock.get(testGlobalCache->getTokenEnd_) != valueToPutAndGet) {
0621             throw cms::Exception("BadValue")
0622                 << "expected " << valueToPutAndGet << " but got " << processBlock.get(testGlobalCache->getTokenEnd_);
0623           }
0624         }
0625       }
0626 
0627       static void globalEndJob(TestGlobalCacheFil* testGlobalCache) {
0628         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0629           throw cms::Exception("transitions") << "TestEndProcessBlockFilter transitions " << testGlobalCache->m_count
0630                                               << " but it was supposed to be " << testGlobalCache->trans_;
0631         }
0632       }
0633 
0634       ~TestEndProcessBlockFilter() {
0635         TestGlobalCacheFil const* testGlobalCache = globalCache();
0636         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0637           throw cms::Exception("transitions") << "~TestEndProcessBlockFilter transitions " << testGlobalCache->m_count
0638                                               << " but it was supposed to be " << testGlobalCache->trans_;
0639         }
0640       }
0641     };
0642 
0643     class TestBeginRunFilter : public edm::stream::EDFilter<edm::RunCache<Cache>, edm::BeginRunProducer> {
0644     public:
0645       static std::atomic<unsigned int> m_count;
0646       unsigned int trans_;
0647       static std::atomic<unsigned int> cvalue_;
0648       static std::atomic<bool> gbr;
0649       static std::atomic<bool> ger;
0650 
0651       TestBeginRunFilter(edm::ParameterSet const& p) {
0652         trans_ = p.getParameter<int>("transitions");
0653         cvalue_ = p.getParameter<int>("cachevalue");
0654         m_count = 0;
0655         produces<unsigned int>();
0656         produces<unsigned int, edm::Transition::BeginRun>("a");
0657       }
0658 
0659       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0660         ++m_count;
0661         gbr = true;
0662         ger = false;
0663         auto pCache = std::make_shared<Cache>();
0664         ++(pCache->run);
0665         return pCache;
0666       }
0667 
0668       bool filter(edm::Event&, edm::EventSetup const&) override {
0669         ++m_count;
0670         return true;
0671       }
0672 
0673       static void globalBeginRunProduce(edm::Run& iRun, edm::EventSetup const&, RunContext const*) {
0674         ++m_count;
0675         if (!gbr) {
0676           throw cms::Exception("begin out of sequence") << "globalBeginRunProduce seen before globalBeginRun";
0677         }
0678       }
0679 
0680       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0681         ++m_count;
0682         auto pCache = iContext->run();
0683         if (pCache->run != 1) {
0684           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0685         }
0686         gbr = false;
0687         ger = true;
0688       }
0689 
0690       ~TestBeginRunFilter() {
0691         if (m_count != trans_) {
0692           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0693         }
0694       }
0695     };
0696 
0697     class TestEndRunFilter : public edm::stream::EDFilter<edm::RunCache<Cache>, edm::EndRunProducer> {
0698     public:
0699       static std::atomic<unsigned int> m_count;
0700       unsigned int trans_;
0701       static std::atomic<unsigned int> cvalue_;
0702       static std::atomic<bool> gbr;
0703       static std::atomic<bool> ger;
0704 
0705       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0706         ++m_count;
0707         gbr = true;
0708         ger = false;
0709         auto pCache = std::make_shared<Cache>();
0710         ++(pCache->run);
0711         return pCache;
0712       }
0713 
0714       TestEndRunFilter(edm::ParameterSet const& p) {
0715         trans_ = p.getParameter<int>("transitions");
0716         cvalue_ = p.getParameter<int>("cachevalue");
0717         m_count = 0;
0718         produces<unsigned int>();
0719         produces<unsigned int, edm::Transition::EndRun>("a");
0720       }
0721 
0722       bool filter(edm::Event&, edm::EventSetup const&) override {
0723         ++m_count;
0724 
0725         return true;
0726       }
0727 
0728       static void globalEndRunProduce(edm::Run& iRun, edm::EventSetup const&, RunContext const*) {
0729         ++m_count;
0730         if (ger) {
0731           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalEndRunProduce";
0732         }
0733       }
0734 
0735       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0736         ++m_count;
0737         auto pCache = iContext->run();
0738         if (pCache->run != 1) {
0739           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0740         }
0741         gbr = false;
0742         ger = true;
0743       }
0744 
0745       ~TestEndRunFilter() {
0746         if (m_count != trans_) {
0747           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0748         }
0749       }
0750     };
0751 
0752     class TestBeginLumiBlockFilter
0753         : public edm::stream::EDFilter<edm::LuminosityBlockCache<Cache>, edm::BeginLuminosityBlockProducer> {
0754     public:
0755       static std::atomic<unsigned int> m_count;
0756       unsigned int trans_;
0757       static std::atomic<unsigned int> cvalue_;
0758       static std::atomic<bool> gbl;
0759       static std::atomic<bool> gel;
0760 
0761       TestBeginLumiBlockFilter(edm::ParameterSet const& p) {
0762         trans_ = p.getParameter<int>("transitions");
0763         cvalue_ = p.getParameter<int>("cachevalue");
0764         m_count = 0;
0765         produces<unsigned int>();
0766         produces<unsigned int, edm::Transition::BeginLuminosityBlock>("a");
0767       }
0768 
0769       bool filter(edm::Event&, edm::EventSetup const&) override {
0770         ++m_count;
0771 
0772         return true;
0773       }
0774 
0775       static void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&,
0776                                                     edm::EventSetup const&,
0777                                                     LuminosityBlockContext const*) {
0778         ++m_count;
0779         if (!gbl) {
0780           throw cms::Exception("begin out of sequence")
0781               << "globalBeginLumiBlockProduce seen before globalBeginLumiBlock";
0782         }
0783       }
0784 
0785       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0786                                                                edm::EventSetup const&,
0787                                                                RunContext const*) {
0788         ++m_count;
0789         gbl = true;
0790         gel = false;
0791         auto pCache = std::make_shared<Cache>();
0792         ++(pCache->lumi);
0793         return pCache;
0794       }
0795 
0796       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0797                                            edm::EventSetup const&,
0798                                            LuminosityBlockContext const* iLBContext) {
0799         ++m_count;
0800         auto pCache = iLBContext->luminosityBlock();
0801         if (pCache->lumi != 1) {
0802           throw cms::Exception("end out of sequence")
0803               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0804               << iLB.luminosityBlock();
0805         }
0806         gel = true;
0807         gbl = false;
0808       }
0809 
0810       ~TestBeginLumiBlockFilter() {
0811         if (m_count != trans_) {
0812           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0813         }
0814       }
0815     };
0816 
0817     class TestEndLumiBlockFilter
0818         : public edm::stream::EDFilter<edm::LuminosityBlockCache<Cache>, edm::EndLuminosityBlockProducer> {
0819     public:
0820       static std::atomic<unsigned int> m_count;
0821       unsigned int trans_;
0822       static std::atomic<unsigned int> cvalue_;
0823       static std::atomic<bool> gbl;
0824       static std::atomic<bool> gel;
0825 
0826       TestEndLumiBlockFilter(edm::ParameterSet const& p) {
0827         trans_ = p.getParameter<int>("transitions");
0828         cvalue_ = p.getParameter<int>("cachevalue");
0829         m_count = 0;
0830         produces<unsigned int>();
0831         produces<unsigned int, edm::Transition::EndLuminosityBlock>("a");
0832       }
0833 
0834       bool filter(edm::Event&, edm::EventSetup const&) override {
0835         ++m_count;
0836 
0837         return true;
0838       }
0839 
0840       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0841                                                                edm::EventSetup const&,
0842                                                                RunContext const*) {
0843         ++m_count;
0844         gbl = true;
0845         gel = false;
0846         auto pCache = std::make_shared<Cache>();
0847         ++(pCache->lumi);
0848         return pCache;
0849       }
0850 
0851       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0852                                            edm::EventSetup const&,
0853                                            LuminosityBlockContext const* iLBContext) {
0854         ++m_count;
0855         auto pCache = iLBContext->luminosityBlock();
0856         if (pCache->lumi != 1) {
0857           throw cms::Exception("end out of sequence")
0858               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0859               << iLB.luminosityBlock();
0860         }
0861         gel = true;
0862         gbl = false;
0863       }
0864 
0865       static void globalEndLuminosityBlockProduce(edm::LuminosityBlock&,
0866                                                   edm::EventSetup const&,
0867                                                   LuminosityBlockContext const*) {
0868         ++m_count;
0869       }
0870 
0871       ~TestEndLumiBlockFilter() {
0872         if (m_count != trans_) {
0873           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0874         }
0875       }
0876     };
0877 
0878     class TestInputProcessBlockCache {
0879     public:
0880       long long int value_ = 0;
0881     };
0882 
0883     class TestInputProcessBlockCache1 {
0884     public:
0885       long long int value_ = 0;
0886     };
0887 
0888     class InputProcessBlockIntFilter
0889         : public edm::stream::EDFilter<
0890               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>> {
0891     public:
0892       explicit InputProcessBlockIntFilter(edm::ParameterSet const& pset) {
0893         {
0894           expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
0895           sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
0896           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0897           if (not tag.label().empty()) {
0898             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0899           }
0900         }
0901         {
0902           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0903           if (not tag.label().empty()) {
0904             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0905           }
0906         }
0907         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
0908             getTokenBegin_,
0909             [this](edm::ProcessBlock const& processBlock,
0910                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
0911               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
0912               returnValue->value_ += processBlock.get(getTokenBegin_).value;
0913               returnValue->value_ += processBlock.get(getTokenEnd_).value;
0914               return returnValue;
0915             });
0916       }
0917 
0918       static void accessInputProcessBlock(edm::ProcessBlock const&) {
0919         edm::LogAbsolute("InputProcessBlockIntFilter") << "InputProcessBlockIntFilter::accessInputProcessBlock";
0920       }
0921 
0922       bool filter(edm::Event& event, edm::EventSetup const&) override {
0923         auto cacheTuple = processBlockCaches(event);
0924         if (!expectedByRun_.empty()) {
0925           if (expectedByRun_.at(event.run() - 1) !=
0926               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
0927             throw cms::Exception("UnexpectedValue")
0928                 << "InputProcessBlockIntFilter::filter cached value was "
0929                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
0930                 << " but it was supposed to be " << expectedByRun_.at(event.run() - 1);
0931           }
0932         }
0933         // Force events to be processed concurrently
0934         if (sleepTime_ > 0) {
0935           std::this_thread::sleep_for(std::chrono::microseconds(sleepTime_));
0936         }
0937         return true;
0938       }
0939 
0940     private:
0941       edm::EDGetTokenT<IntProduct> getTokenBegin_;
0942       edm::EDGetTokenT<IntProduct> getTokenEnd_;
0943       std::vector<int> expectedByRun_;
0944       unsigned int sleepTime_{0};
0945     };
0946 
0947     struct InputProcessBlockGlobalCacheAn {
0948       // The tokens are duplicated in this test module to prove that they
0949       // work both as GlobalCache members and module data members.
0950       // We need them as GlobalCache members for accessInputProcessBlock.
0951       // In registerProcessBlockCacheFiller we use tokens that are member
0952       // variables of the class and because the lambda captures the "this"
0953       // pointer of the zeroth stream module instance. We always
0954       // use the zeroth EDConsumer. In the case of registerProcessBlockCacheFiller,
0955       // either set of tokens would work. Note that in the GlobalCache case
0956       // there is a slight weirdness that the zeroth consumer is used but
0957       // the token comes from the last consumer instance. It works because
0958       // all the stream module instances have EDConsumer base classes with
0959       // containers with the same contents in the same order (not 100% guaranteed,
0960       // but it would be difficult to implement a module where this isn't true).
0961       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBegin_;
0962       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEnd_;
0963       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBeginM_;
0964       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEndM_;
0965       mutable std::atomic<unsigned int> transitions_{0};
0966       int sum_{0};
0967       unsigned int expectedTransitions_{0};
0968       std::vector<int> expectedByRun_;
0969       int expectedSum_{0};
0970       unsigned int sleepTime_{0};
0971     };
0972 
0973     // Same thing as previous class except with a GlobalCache added
0974     class InputProcessBlockIntFilterG
0975         : public edm::stream::EDFilter<
0976               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>,
0977               edm::GlobalCache<InputProcessBlockGlobalCacheAn>> {
0978     public:
0979       explicit InputProcessBlockIntFilterG(edm::ParameterSet const& pset,
0980                                            InputProcessBlockGlobalCacheAn const* testGlobalCache) {
0981         {
0982           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0983           if (not tag.label().empty()) {
0984             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0985             testGlobalCache->getTokenBegin_ = getTokenBegin_;
0986           }
0987         }
0988         {
0989           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0990           if (not tag.label().empty()) {
0991             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0992             testGlobalCache->getTokenEnd_ = getTokenEnd_;
0993           }
0994         }
0995         {
0996           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlockM");
0997           if (not tag.label().empty()) {
0998             getTokenBeginM_ = consumes<IntProduct, edm::InProcess>(tag);
0999             testGlobalCache->getTokenBeginM_ = getTokenBeginM_;
1000           }
1001         }
1002         {
1003           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlockM");
1004           if (not tag.label().empty()) {
1005             getTokenEndM_ = consumes<IntProduct, edm::InProcess>(tag);
1006             testGlobalCache->getTokenEndM_ = getTokenEndM_;
1007           }
1008         }
1009         registerProcessBlockCacheFiller<int>(
1010             getTokenBegin_, [this](edm::ProcessBlock const& processBlock, std::shared_ptr<int> const& previousCache) {
1011               auto returnValue = std::make_shared<int>(0);
1012               *returnValue += processBlock.get(getTokenBegin_).value;
1013               *returnValue += processBlock.get(getTokenEnd_).value;
1014               ++globalCache()->transitions_;
1015               return returnValue;
1016             });
1017         registerProcessBlockCacheFiller<1>(getTokenBegin_,
1018                                            [this](edm::ProcessBlock const& processBlock,
1019                                                   std::shared_ptr<TestInputProcessBlockCache> const& previousCache) {
1020                                              auto returnValue = std::make_shared<TestInputProcessBlockCache>();
1021                                              returnValue->value_ += processBlock.get(getTokenBegin_).value;
1022                                              returnValue->value_ += processBlock.get(getTokenEnd_).value;
1023                                              ++globalCache()->transitions_;
1024                                              return returnValue;
1025                                            });
1026         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
1027             getTokenBegin_,
1028             [this](edm::ProcessBlock const& processBlock,
1029                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
1030               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
1031               returnValue->value_ += processBlock.get(getTokenBegin_).value;
1032               returnValue->value_ += processBlock.get(getTokenEnd_).value;
1033               ++globalCache()->transitions_;
1034               return returnValue;
1035             });
1036       }
1037 
1038       static std::unique_ptr<InputProcessBlockGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const& pset) {
1039         auto testGlobalCache = std::make_unique<InputProcessBlockGlobalCacheAn>();
1040         testGlobalCache->expectedTransitions_ = pset.getParameter<int>("transitions");
1041         testGlobalCache->expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
1042         testGlobalCache->expectedSum_ = pset.getParameter<int>("expectedSum");
1043         testGlobalCache->sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
1044         return testGlobalCache;
1045       }
1046 
1047       static void accessInputProcessBlock(edm::ProcessBlock const& processBlock,
1048                                           InputProcessBlockGlobalCacheAn* testGlobalCache) {
1049         if (processBlock.processName() == "PROD1") {
1050           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBegin_).value;
1051           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEnd_).value;
1052         }
1053         if (processBlock.processName() == "MERGE") {
1054           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBeginM_).value;
1055           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEndM_).value;
1056         }
1057         ++testGlobalCache->transitions_;
1058       }
1059 
1060       bool filter(edm::Event& event, edm::EventSetup const&) override {
1061         auto cacheTuple = processBlockCaches(event);
1062         auto testGlobalCache = globalCache();
1063         if (!testGlobalCache->expectedByRun_.empty()) {
1064           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != *std::get<edm::CacheHandle<int>>(cacheTuple)) {
1065             throw cms::Exception("UnexpectedValue")
1066                 << "InputProcessBlockIntFilterG::filter cached value was "
1067                 << *std::get<edm::CacheHandle<int>>(cacheTuple) << " but it was supposed to be "
1068                 << testGlobalCache->expectedByRun_.at(event.run() - 1);
1069           }
1070           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != std::get<1>(cacheTuple)->value_) {
1071             throw cms::Exception("UnexpectedValue")
1072                 << "InputProcessBlockIntFilterG::filter second cached value was " << std::get<1>(cacheTuple)->value_
1073                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
1074           }
1075           if (testGlobalCache->expectedByRun_.at(event.run() - 1) !=
1076               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
1077             throw cms::Exception("UnexpectedValue")
1078                 << "InputProcessBlockIntFilterG::filter third cached value was "
1079                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
1080                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
1081           }
1082         }
1083         ++testGlobalCache->transitions_;
1084 
1085         // Force events to be processed concurrently
1086         if (testGlobalCache->sleepTime_ > 0) {
1087           std::this_thread::sleep_for(std::chrono::microseconds(testGlobalCache->sleepTime_));
1088         }
1089         return true;
1090       }
1091 
1092       static void globalEndJob(InputProcessBlockGlobalCacheAn* testGlobalCache) {
1093         if (testGlobalCache->transitions_ != testGlobalCache->expectedTransitions_) {
1094           throw cms::Exception("transitions")
1095               << "InputProcessBlockIntFilterG transitions " << testGlobalCache->transitions_
1096               << " but it was supposed to be " << testGlobalCache->expectedTransitions_;
1097         }
1098 
1099         if (testGlobalCache->sum_ != testGlobalCache->expectedSum_) {
1100           throw cms::Exception("UnexpectedValue") << "InputProcessBlockIntFilterG sum " << testGlobalCache->sum_
1101                                                   << " but it was supposed to be " << testGlobalCache->expectedSum_;
1102         }
1103       }
1104 
1105     private:
1106       edm::EDGetTokenT<IntProduct> getTokenBegin_;
1107       edm::EDGetTokenT<IntProduct> getTokenEnd_;
1108       edm::EDGetTokenT<IntProduct> getTokenBeginM_;
1109       edm::EDGetTokenT<IntProduct> getTokenEndM_;
1110     };
1111 
1112   }  // namespace stream
1113 }  // namespace edmtest
1114 std::atomic<unsigned int> edmtest::stream::GlobalIntFilter::m_count{0};
1115 std::atomic<unsigned int> edmtest::stream::RunIntFilter::m_count{0};
1116 std::atomic<unsigned int> edmtest::stream::LumiIntFilter::m_count{0};
1117 std::atomic<unsigned int> edmtest::stream::RunSummaryIntFilter::m_count{0};
1118 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntFilter::m_count{0};
1119 std::atomic<unsigned int> edmtest::stream::TestBeginRunFilter::m_count{0};
1120 std::atomic<unsigned int> edmtest::stream::TestEndRunFilter::m_count{0};
1121 std::atomic<unsigned int> edmtest::stream::TestBeginLumiBlockFilter::m_count{0};
1122 std::atomic<unsigned int> edmtest::stream::TestEndLumiBlockFilter::m_count{0};
1123 std::atomic<unsigned int> edmtest::stream::GlobalIntFilter::cvalue_{0};
1124 std::atomic<unsigned int> edmtest::stream::RunIntFilter::cvalue_{0};
1125 std::atomic<unsigned int> edmtest::stream::LumiIntFilter::cvalue_{0};
1126 std::atomic<unsigned int> edmtest::stream::RunSummaryIntFilter::cvalue_{0};
1127 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntFilter::cvalue_{0};
1128 std::atomic<unsigned int> edmtest::stream::TestBeginRunFilter::cvalue_{0};
1129 std::atomic<unsigned int> edmtest::stream::TestEndRunFilter::cvalue_{0};
1130 std::atomic<unsigned int> edmtest::stream::TestBeginLumiBlockFilter::cvalue_{0};
1131 std::atomic<unsigned int> edmtest::stream::TestEndLumiBlockFilter::cvalue_{0};
1132 std::atomic<bool> edmtest::stream::RunSummaryIntFilter::globalBeginRunCalled_{false};
1133 std::atomic<bool> edmtest::stream::LumiSummaryIntFilter::globalBeginLumiCalled_{false};
1134 std::atomic<bool> edmtest::stream::TestBeginRunFilter::gbr{false};
1135 std::atomic<bool> edmtest::stream::TestBeginRunFilter::ger{false};
1136 std::atomic<bool> edmtest::stream::TestEndRunFilter::gbr{false};
1137 std::atomic<bool> edmtest::stream::TestEndRunFilter::ger{false};
1138 std::atomic<bool> edmtest::stream::TestBeginLumiBlockFilter::gbl{false};
1139 std::atomic<bool> edmtest::stream::TestBeginLumiBlockFilter::gel{false};
1140 std::atomic<bool> edmtest::stream::TestEndLumiBlockFilter::gbl{false};
1141 std::atomic<bool> edmtest::stream::TestEndLumiBlockFilter::gel{false};
1142 DEFINE_FWK_MODULE(edmtest::stream::GlobalIntFilter);
1143 DEFINE_FWK_MODULE(edmtest::stream::RunIntFilter);
1144 DEFINE_FWK_MODULE(edmtest::stream::LumiIntFilter);
1145 DEFINE_FWK_MODULE(edmtest::stream::RunSummaryIntFilter);
1146 DEFINE_FWK_MODULE(edmtest::stream::LumiSummaryIntFilter);
1147 DEFINE_FWK_MODULE(edmtest::stream::ProcessBlockIntFilter);
1148 DEFINE_FWK_MODULE(edmtest::stream::TestBeginProcessBlockFilter);
1149 DEFINE_FWK_MODULE(edmtest::stream::TestEndProcessBlockFilter);
1150 DEFINE_FWK_MODULE(edmtest::stream::TestBeginRunFilter);
1151 DEFINE_FWK_MODULE(edmtest::stream::TestEndRunFilter);
1152 DEFINE_FWK_MODULE(edmtest::stream::TestBeginLumiBlockFilter);
1153 DEFINE_FWK_MODULE(edmtest::stream::TestEndLumiBlockFilter);
1154 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntFilter);
1155 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntFilterG);