Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:20

0001 
0002 /*----------------------------------------------------------------------
0003 
0004 Toy edm::stream::EDFilter modules of
0005 edm::*Cache templates and edm::*Producer classes
0006 for testing purposes only.
0007 
0008 ----------------------------------------------------------------------*/
0009 
0010 #include <atomic>
0011 #include <functional>
0012 #include <iostream>
0013 #include <map>
0014 #include <tuple>
0015 #include <unistd.h>
0016 #include <vector>
0017 
0018 #include "FWCore/Framework/interface/CacheHandle.h"
0019 #include "FWCore/Framework/interface/stream/EDFilter.h"
0020 #include "FWCore/Framework/interface/maker/WorkerT.h"
0021 #include "FWCore/Framework/interface/HistoryAppender.h"
0022 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0023 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0024 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0025 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0026 #include "FWCore/Framework/interface/Event.h"
0027 #include "FWCore/Framework/interface/MakerMacros.h"
0028 #include "FWCore/Framework/interface/ProcessBlock.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0030 #include "FWCore/Utilities/interface/EDMException.h"
0031 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0032 
0033 namespace edmtest {
0034   namespace stream {
0035 
0036     // anonymous namespace here causes build warnings
0037     namespace cache {
0038       struct Cache {
0039         Cache() : value(0), run(0), lumi(0) {}
0040         //Using mutable since we want to update the value.
0041         mutable std::atomic<unsigned int> value;
0042         mutable std::atomic<unsigned int> run;
0043         mutable std::atomic<unsigned int> lumi;
0044       };
0045 
0046       struct SummaryCache {
0047         // Intentionally not thread safe, not atomic
0048         unsigned int value = 0;
0049       };
0050 
0051       struct TestGlobalCacheFil {
0052         CMS_THREAD_SAFE mutable edm::EDPutTokenT<unsigned int> token_;
0053         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenBegin_;
0054         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenEnd_;
0055         unsigned int trans_{0};
0056         mutable std::atomic<unsigned int> m_count{0};
0057       };
0058     }  // namespace cache
0059 
0060     using Cache = cache::Cache;
0061     using SummaryCache = cache::SummaryCache;
0062     using TestGlobalCacheFil = cache::TestGlobalCacheFil;
0063 
0064     class GlobalIntFilter : public edm::stream::EDFilter<edm::GlobalCache<Cache>> {
0065     public:
0066       static std::atomic<unsigned int> m_count;
0067       unsigned int trans_;
0068       static std::atomic<unsigned int> cvalue_;
0069 
0070       static std::unique_ptr<Cache> initializeGlobalCache(edm::ParameterSet const&) {
0071         ++m_count;
0072         return std::make_unique<Cache>();
0073       }
0074 
0075       GlobalIntFilter(edm::ParameterSet const& p, const Cache* iGlobal) {
0076         trans_ = p.getParameter<int>("transitions");
0077         cvalue_ = p.getParameter<int>("cachevalue");
0078         produces<unsigned int>();
0079       }
0080 
0081       static void globalBeginJob(Cache* iGlobal) {
0082         ++m_count;
0083         if (iGlobal->value != 0) {
0084           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
0085         }
0086       }
0087 
0088       bool filter(edm::Event&, edm::EventSetup const&) override {
0089         ++m_count;
0090         ++((globalCache())->value);
0091 
0092         return true;
0093       }
0094 
0095       static void globalEndJob(Cache* iGlobal) {
0096         ++m_count;
0097         if (iGlobal->value != cvalue_) {
0098           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be " << cvalue_;
0099         }
0100       }
0101 
0102       ~GlobalIntFilter() {
0103         if (m_count != trans_) {
0104           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0105         }
0106       }
0107     };
0108 
0109     class RunIntFilter : public edm::stream::EDFilter<edm::RunCache<Cache>, edm::stream::WatchRuns> {
0110     public:
0111       static std::atomic<unsigned int> m_count;
0112       unsigned int trans_;
0113       static std::atomic<unsigned int> cvalue_;
0114 
0115       RunIntFilter(edm::ParameterSet const& p) {
0116         trans_ = p.getParameter<int>("transitions");
0117         cvalue_ = p.getParameter<int>("cachevalue");
0118         m_count = 0;
0119         produces<unsigned int>();
0120       }
0121 
0122       bool filter(edm::Event&, edm::EventSetup const&) override {
0123         ++m_count;
0124         ++(runCache()->value);
0125         return true;
0126       }
0127 
0128       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0129         ++m_count;
0130         auto pCache = std::make_shared<Cache>();
0131         pCache->run = iRun.runAuxiliary().run();
0132         return pCache;
0133       }
0134 
0135       void beginRun(edm::Run const& iRun, edm::EventSetup const&) override {
0136         if (runCache()->run != iRun.runAuxiliary().run()) {
0137           throw cms::Exception("begin out of sequence") << "beginRun seen before globalBeginRun";
0138         }
0139       }
0140 
0141       void endRun(edm::Run const& iRun, edm::EventSetup const&) override {
0142         if (runCache()->run != iRun.runAuxiliary().run()) {
0143           throw cms::Exception("end out of sequence") << "globalEndRun seen before endRun";
0144         }
0145       }
0146 
0147       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0148         ++m_count;
0149         auto pCache = iContext->run();
0150         if (pCache->run != iRun.runAuxiliary().run()) {
0151           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0152         }
0153         pCache->run = 0;
0154         if (iContext->run()->value != cvalue_) {
0155           throw cms::Exception("cache value") << iContext->run()->value << " but it was supposed to be " << cvalue_;
0156         }
0157       }
0158 
0159       ~RunIntFilter() {
0160         if (m_count != trans_) {
0161           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0162         }
0163       }
0164     };
0165 
0166     class LumiIntFilter
0167         : public edm::stream::EDFilter<edm::LuminosityBlockCache<Cache>, edm::stream::WatchLuminosityBlocks> {
0168     public:
0169       static std::atomic<unsigned int> m_count;
0170       unsigned int trans_;
0171       static std::atomic<unsigned int> cvalue_;
0172 
0173       LumiIntFilter(edm::ParameterSet const& p) {
0174         trans_ = p.getParameter<int>("transitions");
0175         cvalue_ = p.getParameter<int>("cachevalue");
0176         m_count = 0;
0177         produces<unsigned int>();
0178       }
0179 
0180       bool filter(edm::Event&, edm::EventSetup const&) override {
0181         ++m_count;
0182         ++(luminosityBlockCache()->value);
0183 
0184         return true;
0185       }
0186 
0187       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0188                                                                edm::EventSetup const&,
0189                                                                RunContext const*) {
0190         ++m_count;
0191         auto pCache = std::make_shared<Cache>();
0192         pCache->run = iLB.luminosityBlockAuxiliary().run();
0193         pCache->lumi = iLB.luminosityBlockAuxiliary().luminosityBlock();
0194         return pCache;
0195       }
0196 
0197       void beginLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0198         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0199             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0200           throw cms::Exception("begin out of sequence")
0201               << "beginLuminosityBlock seen before globalBeginLuminosityBlock " << luminosityBlockCache()->run << " "
0202               << iLB.luminosityBlockAuxiliary().run();
0203         }
0204       }
0205 
0206       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0207                                            edm::EventSetup const&,
0208                                            LuminosityBlockContext const* iLBContext) {
0209         ++m_count;
0210         auto pCache = iLBContext->luminosityBlock();
0211         if (pCache->run != iLB.luminosityBlockAuxiliary().run() ||
0212             pCache->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0213           throw cms::Exception("end out of sequence")
0214               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0215               << iLB.luminosityBlock();
0216         }
0217         pCache->run = 0;
0218         pCache->lumi = 0;
0219         if (pCache->value != cvalue_) {
0220           throw cms::Exception("cache value") << "LumiIntFilter cache value " << iLBContext->luminosityBlock()->value
0221                                               << " but it was supposed to be " << cvalue_;
0222         }
0223       }
0224 
0225       void endLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0226         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0227             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0228           throw cms::Exception("end out of sequence") << "globalEndLuminosityBlock seen before endLuminosityBlock";
0229         }
0230       }
0231 
0232       ~LumiIntFilter() {
0233         if (m_count != trans_) {
0234           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0235         }
0236       }
0237     };
0238 
0239     class RunSummaryIntFilter
0240         : public edm::stream::EDFilter<edm::RunCache<Cache>, edm::RunSummaryCache<SummaryCache>, edm::stream::WatchRuns> {
0241     public:
0242       static std::atomic<unsigned int> m_count;
0243       unsigned int trans_;
0244       static std::atomic<unsigned int> cvalue_;
0245       static std::atomic<bool> globalBeginRunCalled_;
0246       unsigned int valueAccumulatedForStream_ = 0;
0247       bool endRunWasCalled_ = false;
0248 
0249       RunSummaryIntFilter(edm::ParameterSet const& p) {
0250         trans_ = p.getParameter<int>("transitions");
0251         cvalue_ = p.getParameter<int>("cachevalue");
0252         m_count = 0;
0253         produces<unsigned int>();
0254       }
0255 
0256       void beginRun(edm::Run const&, edm::EventSetup const&) override {
0257         valueAccumulatedForStream_ = 0;
0258         endRunWasCalled_ = false;
0259       }
0260 
0261       bool filter(edm::Event&, edm::EventSetup const&) override {
0262         ++m_count;
0263         ++(runCache()->value);
0264         ++valueAccumulatedForStream_;
0265         return true;
0266       }
0267 
0268       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0269         ++m_count;
0270         globalBeginRunCalled_ = true;
0271         auto pCache = std::make_shared<Cache>();
0272         ++(pCache->run);
0273         return pCache;
0274       }
0275 
0276       static std::shared_ptr<SummaryCache> globalBeginRunSummary(edm::Run const&,
0277                                                                  edm::EventSetup const&,
0278                                                                  GlobalCache const*) {
0279         ++m_count;
0280         if (!globalBeginRunCalled_) {
0281           throw cms::Exception("begin out of sequence") << "globalBeginRunSummary seen before globalBeginRun";
0282         }
0283         globalBeginRunCalled_ = false;
0284         return std::make_shared<SummaryCache>();
0285       }
0286 
0287       void endRunSummary(edm::Run const&, edm::EventSetup const&, SummaryCache* runSummaryCache) const override {
0288         runSummaryCache->value += valueAccumulatedForStream_;
0289         if (!endRunWasCalled_) {
0290           throw cms::Exception("end out of sequence") << "endRunSummary seen before endRun";
0291         }
0292       }
0293 
0294       static void globalEndRunSummary(edm::Run const&,
0295                                       edm::EventSetup const&,
0296                                       RunContext const*,
0297                                       SummaryCache* runSummaryCache) {
0298         ++m_count;
0299         if (runSummaryCache->value != cvalue_) {
0300           throw cms::Exception("unexpectedValue")
0301               << "run summary cache value = " << runSummaryCache->value << " but it was supposed to be " << cvalue_;
0302         }
0303       }
0304 
0305       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0306         ++m_count;
0307         auto pCache = iContext->run();
0308         if (pCache->value != cvalue_) {
0309           throw cms::Exception("unExpectedValue")
0310               << "run cache value " << pCache->value << " but it was supposed to be " << cvalue_;
0311         }
0312         if (pCache->run != 1) {
0313           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0314         }
0315       }
0316 
0317       void endRun(edm::Run const&, edm::EventSetup const&) override { endRunWasCalled_ = true; }
0318 
0319       ~RunSummaryIntFilter() {
0320         if (m_count != trans_) {
0321           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0322         }
0323       }
0324     };
0325 
0326     class LumiSummaryIntFilter : public edm::stream::EDFilter<edm::LuminosityBlockCache<Cache>,
0327                                                               edm::LuminosityBlockSummaryCache<SummaryCache>,
0328                                                               edm::stream::WatchLuminosityBlocks> {
0329     public:
0330       static std::atomic<unsigned int> m_count;
0331       unsigned int trans_;
0332       static std::atomic<unsigned int> cvalue_;
0333       static std::atomic<bool> globalBeginLumiCalled_;
0334       unsigned int valueAccumulatedForStream_ = 0;
0335       bool endLumiWasCalled_ = false;
0336 
0337       LumiSummaryIntFilter(edm::ParameterSet const& p) {
0338         trans_ = p.getParameter<int>("transitions");
0339         cvalue_ = p.getParameter<int>("cachevalue");
0340         m_count = 0;
0341         produces<unsigned int>();
0342       }
0343 
0344       bool filter(edm::Event&, edm::EventSetup const&) override {
0345         ++m_count;
0346         ++(luminosityBlockCache()->value);
0347         ++valueAccumulatedForStream_;
0348         return true;
0349       }
0350 
0351       void beginLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0352         valueAccumulatedForStream_ = 0;
0353         valueAccumulatedForStream_ = 0;
0354         endLumiWasCalled_ = false;
0355       }
0356 
0357       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0358                                                                edm::EventSetup const&,
0359                                                                RunContext const*) {
0360         ++m_count;
0361         globalBeginLumiCalled_ = true;
0362         auto pCache = std::make_shared<Cache>();
0363         ++(pCache->lumi);
0364         return pCache;
0365       }
0366 
0367       static std::shared_ptr<SummaryCache> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
0368                                                                              edm::EventSetup const&,
0369                                                                              LuminosityBlockContext const*) {
0370         ++m_count;
0371         if (!globalBeginLumiCalled_) {
0372           throw cms::Exception("begin out of sequence")
0373               << "globalBeginLuminosityBlockSummary seen before globalBeginLuminosityBlock";
0374         }
0375         globalBeginLumiCalled_ = false;
0376         return std::make_shared<SummaryCache>();
0377       }
0378 
0379       void endLuminosityBlockSummary(edm::LuminosityBlock const&,
0380                                      edm::EventSetup const&,
0381                                      SummaryCache* lumiSummaryCache) const override {
0382         lumiSummaryCache->value += valueAccumulatedForStream_;
0383         if (!endLumiWasCalled_) {
0384           throw cms::Exception("end out of sequence") << "endLuminosityBlockSummary seen before endLuminosityBlock";
0385         }
0386       }
0387 
0388       static void globalEndLuminosityBlockSummary(edm::LuminosityBlock const&,
0389                                                   edm::EventSetup const&,
0390                                                   LuminosityBlockContext const* iLBContext,
0391                                                   SummaryCache* lumiSummaryCache) {
0392         ++m_count;
0393         if (lumiSummaryCache->value != cvalue_) {
0394           throw cms::Exception("unexpectedValue")
0395               << "lumi summary cache value = " << lumiSummaryCache->value << " but it was supposed to be " << cvalue_;
0396         }
0397         auto pCache = iLBContext->luminosityBlock();
0398         // Add one so globalEndLuminosityBlock can check this function was called first
0399         ++pCache->value;
0400       }
0401 
0402       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0403                                            edm::EventSetup const&,
0404                                            LuminosityBlockContext const* iLBContext) {
0405         ++m_count;
0406         auto pCache = iLBContext->luminosityBlock();
0407         if (pCache->value != cvalue_ + 1) {
0408           throw cms::Exception("unexpectedValue")
0409               << "lumi cache value " << pCache->value << " but it was supposed to be " << cvalue_ + 1;
0410         }
0411         if (pCache->lumi != 1) {
0412           throw cms::Exception("end out of sequence")
0413               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0414               << iLB.luminosityBlock();
0415         }
0416       }
0417 
0418       void endLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0419         endLumiWasCalled_ = true;
0420       }
0421 
0422       ~LumiSummaryIntFilter() {
0423         if (m_count != trans_) {
0424           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0425         }
0426       }
0427     };
0428 
0429     class ProcessBlockIntFilter
0430         : public edm::stream::EDFilter<edm::WatchProcessBlock, edm::GlobalCache<TestGlobalCacheFil>> {
0431     public:
0432       explicit ProcessBlockIntFilter(edm::ParameterSet const& pset, TestGlobalCacheFil const* testGlobalCache) {
0433         produces<unsigned int>();
0434 
0435         {
0436           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0437           if (not tag.label().empty()) {
0438             testGlobalCache->getTokenBegin_ = consumes<unsigned int, edm::InProcess>(tag);
0439           }
0440         }
0441         {
0442           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0443           if (not tag.label().empty()) {
0444             testGlobalCache->getTokenEnd_ = consumes<unsigned int, edm::InProcess>(tag);
0445           }
0446         }
0447       }
0448 
0449       static std::unique_ptr<TestGlobalCacheFil> initializeGlobalCache(edm::ParameterSet const& pset) {
0450         auto testGlobalCache = std::make_unique<TestGlobalCacheFil>();
0451         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0452         return testGlobalCache;
0453       }
0454 
0455       static void beginProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheFil* testGlobalCache) {
0456         if (testGlobalCache->m_count != 0) {
0457           throw cms::Exception("transitions") << "ProcessBlockIntFilter::begin transitions " << testGlobalCache->m_count
0458                                               << " but it was supposed to be " << 0;
0459         }
0460         ++testGlobalCache->m_count;
0461         const unsigned int valueToGet = 71;
0462         if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0463           if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0464             throw cms::Exception("BadValue")
0465                 << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0466           }
0467         }
0468       }
0469 
0470       bool filter(edm::Event&, edm::EventSetup const&) override {
0471         TestGlobalCacheFil const* testGlobalCache = globalCache();
0472         if (testGlobalCache->m_count < 1u) {
0473           throw cms::Exception("out of sequence") << "produce before beginProcessBlock " << testGlobalCache->m_count;
0474         }
0475         ++testGlobalCache->m_count;
0476         return true;
0477       }
0478 
0479       static void endProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheFil* testGlobalCache) {
0480         ++testGlobalCache->m_count;
0481         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0482           throw cms::Exception("transitions") << "ProcessBlockIntFilter::end transitions " << testGlobalCache->m_count
0483                                               << " but it was supposed to be " << testGlobalCache->trans_;
0484         }
0485         {
0486           const unsigned int valueToGet = 71;
0487           if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0488             if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0489               throw cms::Exception("BadValue")
0490                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0491             }
0492           }
0493         }
0494         {
0495           const unsigned int valueToGet = 81;
0496           if (not testGlobalCache->getTokenEnd_.isUninitialized()) {
0497             if (processBlock.get(testGlobalCache->getTokenEnd_) != valueToGet) {
0498               throw cms::Exception("BadValue")
0499                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenEnd_);
0500             }
0501           }
0502         }
0503       }
0504 
0505       static void globalEndJob(TestGlobalCacheFil* testGlobalCache) {
0506         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0507           throw cms::Exception("transitions") << "ProcessBlockIntFilter transitions " << testGlobalCache->m_count
0508                                               << " but it was supposed to be " << testGlobalCache->trans_;
0509         }
0510       }
0511 
0512       ~ProcessBlockIntFilter() {
0513         TestGlobalCacheFil const* testGlobalCache = globalCache();
0514         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0515           throw cms::Exception("transitions") << "ProcessBlockIntFilter transitions " << testGlobalCache->m_count
0516                                               << " but it was supposed to be " << testGlobalCache->trans_;
0517         }
0518       }
0519     };
0520 
0521     class TestBeginProcessBlockFilter
0522         : public edm::stream::EDFilter<edm::BeginProcessBlockProducer, edm::GlobalCache<TestGlobalCacheFil>> {
0523     public:
0524       explicit TestBeginProcessBlockFilter(edm::ParameterSet const& pset, TestGlobalCacheFil const* testGlobalCache) {
0525         testGlobalCache->token_ = produces<unsigned int, edm::Transition::BeginProcessBlock>("begin");
0526         produces<unsigned int>();
0527 
0528         auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0529         if (not tag.label().empty()) {
0530           testGlobalCache->getTokenBegin_ = consumes<unsigned int, edm::InProcess>(tag);
0531         }
0532       }
0533 
0534       static std::unique_ptr<TestGlobalCacheFil> initializeGlobalCache(edm::ParameterSet const& pset) {
0535         auto testGlobalCache = std::make_unique<TestGlobalCacheFil>();
0536         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0537         return testGlobalCache;
0538       }
0539 
0540       static void beginProcessBlockProduce(edm::ProcessBlock& processBlock, TestGlobalCacheFil const* testGlobalCache) {
0541         if (testGlobalCache->m_count != 0) {
0542           throw cms::Exception("transitions") << "TestBeginProcessBlockFilter transitions " << testGlobalCache->m_count
0543                                               << " but it was supposed to be " << 0;
0544         }
0545         ++testGlobalCache->m_count;
0546 
0547         const unsigned int valueToPutAndGet = 71;
0548         processBlock.emplace(testGlobalCache->token_, valueToPutAndGet);
0549 
0550         if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0551           if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToPutAndGet) {
0552             throw cms::Exception("BadValue")
0553                 << "expected " << valueToPutAndGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0554           }
0555         }
0556       }
0557 
0558       bool filter(edm::Event&, edm::EventSetup const&) override {
0559         TestGlobalCacheFil const* testGlobalCache = globalCache();
0560         if (testGlobalCache->m_count < 1u) {
0561           throw cms::Exception("out of sequence")
0562               << "produce before beginProcessBlockProduce " << testGlobalCache->m_count;
0563         }
0564         ++testGlobalCache->m_count;
0565         return true;
0566       }
0567 
0568       static void globalEndJob(TestGlobalCacheFil* testGlobalCache) {
0569         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0570           throw cms::Exception("transitions") << "TestBeginProcessBlockFilter transitions " << testGlobalCache->m_count
0571                                               << " but it was supposed to be " << testGlobalCache->trans_;
0572         }
0573       }
0574 
0575       ~TestBeginProcessBlockFilter() {
0576         TestGlobalCacheFil const* testGlobalCache = globalCache();
0577         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0578           throw cms::Exception("transitions") << "TestBeginProcessBlockFilter transitions " << testGlobalCache->m_count
0579                                               << " but it was supposed to be " << testGlobalCache->trans_;
0580         }
0581       }
0582     };
0583 
0584     class TestEndProcessBlockFilter
0585         : public edm::stream::EDFilter<edm::EndProcessBlockProducer, edm::GlobalCache<TestGlobalCacheFil>> {
0586     public:
0587       explicit TestEndProcessBlockFilter(edm::ParameterSet const& pset, TestGlobalCacheFil const* testGlobalCache) {
0588         testGlobalCache->token_ = produces<unsigned int, edm::Transition::EndProcessBlock>("end");
0589         produces<unsigned int>();
0590 
0591         auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0592         if (not tag.label().empty()) {
0593           testGlobalCache->getTokenEnd_ = consumes<unsigned int, edm::InProcess>(tag);
0594         }
0595       }
0596 
0597       static std::unique_ptr<TestGlobalCacheFil> initializeGlobalCache(edm::ParameterSet const& pset) {
0598         auto testGlobalCache = std::make_unique<TestGlobalCacheFil>();
0599         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0600         return testGlobalCache;
0601       }
0602 
0603       bool filter(edm::Event&, edm::EventSetup const&) override {
0604         TestGlobalCacheFil const* testGlobalCache = globalCache();
0605         ++testGlobalCache->m_count;
0606         return true;
0607       }
0608 
0609       static void endProcessBlockProduce(edm::ProcessBlock& processBlock, TestGlobalCacheFil const* testGlobalCache) {
0610         ++testGlobalCache->m_count;
0611         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0612           throw cms::Exception("transitions") << "TestEndProcessBlockFilter transitions " << testGlobalCache->m_count
0613                                               << " but it was supposed to be " << testGlobalCache->trans_;
0614         }
0615 
0616         const unsigned int valueToPutAndGet = 81;
0617         processBlock.emplace(testGlobalCache->token_, valueToPutAndGet);
0618         if (not testGlobalCache->getTokenEnd_.isUninitialized()) {
0619           if (processBlock.get(testGlobalCache->getTokenEnd_) != valueToPutAndGet) {
0620             throw cms::Exception("BadValue")
0621                 << "expected " << valueToPutAndGet << " but got " << processBlock.get(testGlobalCache->getTokenEnd_);
0622           }
0623         }
0624       }
0625 
0626       static void globalEndJob(TestGlobalCacheFil* testGlobalCache) {
0627         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0628           throw cms::Exception("transitions") << "TestEndProcessBlockFilter transitions " << testGlobalCache->m_count
0629                                               << " but it was supposed to be " << testGlobalCache->trans_;
0630         }
0631       }
0632 
0633       ~TestEndProcessBlockFilter() {
0634         TestGlobalCacheFil const* testGlobalCache = globalCache();
0635         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0636           throw cms::Exception("transitions") << "~TestEndProcessBlockFilter transitions " << testGlobalCache->m_count
0637                                               << " but it was supposed to be " << testGlobalCache->trans_;
0638         }
0639       }
0640     };
0641 
0642     class TestBeginRunFilter : public edm::stream::EDFilter<edm::RunCache<Cache>, edm::BeginRunProducer> {
0643     public:
0644       static std::atomic<unsigned int> m_count;
0645       unsigned int trans_;
0646       static std::atomic<unsigned int> cvalue_;
0647       static std::atomic<bool> gbr;
0648       static std::atomic<bool> ger;
0649 
0650       TestBeginRunFilter(edm::ParameterSet const& p) {
0651         trans_ = p.getParameter<int>("transitions");
0652         cvalue_ = p.getParameter<int>("cachevalue");
0653         m_count = 0;
0654         produces<unsigned int>();
0655         produces<unsigned int, edm::Transition::BeginRun>("a");
0656       }
0657 
0658       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0659         ++m_count;
0660         gbr = true;
0661         ger = false;
0662         auto pCache = std::make_shared<Cache>();
0663         ++(pCache->run);
0664         return pCache;
0665       }
0666 
0667       bool filter(edm::Event&, edm::EventSetup const&) override {
0668         ++m_count;
0669         return true;
0670       }
0671 
0672       static void globalBeginRunProduce(edm::Run& iRun, edm::EventSetup const&, RunContext const*) {
0673         ++m_count;
0674         if (!gbr) {
0675           throw cms::Exception("begin out of sequence") << "globalBeginRunProduce seen before globalBeginRun";
0676         }
0677       }
0678 
0679       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0680         ++m_count;
0681         auto pCache = iContext->run();
0682         if (pCache->run != 1) {
0683           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0684         }
0685         gbr = false;
0686         ger = true;
0687       }
0688 
0689       ~TestBeginRunFilter() {
0690         if (m_count != trans_) {
0691           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0692         }
0693       }
0694     };
0695 
0696     class TestEndRunFilter : public edm::stream::EDFilter<edm::RunCache<Cache>, edm::EndRunProducer> {
0697     public:
0698       static std::atomic<unsigned int> m_count;
0699       unsigned int trans_;
0700       static std::atomic<unsigned int> cvalue_;
0701       static std::atomic<bool> gbr;
0702       static std::atomic<bool> ger;
0703 
0704       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0705         ++m_count;
0706         gbr = true;
0707         ger = false;
0708         auto pCache = std::make_shared<Cache>();
0709         ++(pCache->run);
0710         return pCache;
0711       }
0712 
0713       TestEndRunFilter(edm::ParameterSet const& p) {
0714         trans_ = p.getParameter<int>("transitions");
0715         cvalue_ = p.getParameter<int>("cachevalue");
0716         m_count = 0;
0717         produces<unsigned int>();
0718         produces<unsigned int, edm::Transition::EndRun>("a");
0719       }
0720 
0721       bool filter(edm::Event&, edm::EventSetup const&) override {
0722         ++m_count;
0723 
0724         return true;
0725       }
0726 
0727       static void globalEndRunProduce(edm::Run& iRun, edm::EventSetup const&, RunContext const*) {
0728         ++m_count;
0729         if (ger) {
0730           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalEndRunProduce";
0731         }
0732       }
0733 
0734       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0735         ++m_count;
0736         auto pCache = iContext->run();
0737         if (pCache->run != 1) {
0738           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0739         }
0740         gbr = false;
0741         ger = true;
0742       }
0743 
0744       ~TestEndRunFilter() {
0745         if (m_count != trans_) {
0746           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0747         }
0748       }
0749     };
0750 
0751     class TestBeginLumiBlockFilter
0752         : public edm::stream::EDFilter<edm::LuminosityBlockCache<Cache>, edm::BeginLuminosityBlockProducer> {
0753     public:
0754       static std::atomic<unsigned int> m_count;
0755       unsigned int trans_;
0756       static std::atomic<unsigned int> cvalue_;
0757       static std::atomic<bool> gbl;
0758       static std::atomic<bool> gel;
0759 
0760       TestBeginLumiBlockFilter(edm::ParameterSet const& p) {
0761         trans_ = p.getParameter<int>("transitions");
0762         cvalue_ = p.getParameter<int>("cachevalue");
0763         m_count = 0;
0764         produces<unsigned int>();
0765         produces<unsigned int, edm::Transition::BeginLuminosityBlock>("a");
0766       }
0767 
0768       bool filter(edm::Event&, edm::EventSetup const&) override {
0769         ++m_count;
0770 
0771         return true;
0772       }
0773 
0774       static void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&,
0775                                                     edm::EventSetup const&,
0776                                                     LuminosityBlockContext const*) {
0777         ++m_count;
0778         if (!gbl) {
0779           throw cms::Exception("begin out of sequence")
0780               << "globalBeginLumiBlockProduce seen before globalBeginLumiBlock";
0781         }
0782       }
0783 
0784       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0785                                                                edm::EventSetup const&,
0786                                                                RunContext const*) {
0787         ++m_count;
0788         gbl = true;
0789         gel = false;
0790         auto pCache = std::make_shared<Cache>();
0791         ++(pCache->lumi);
0792         return pCache;
0793       }
0794 
0795       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0796                                            edm::EventSetup const&,
0797                                            LuminosityBlockContext const* iLBContext) {
0798         ++m_count;
0799         auto pCache = iLBContext->luminosityBlock();
0800         if (pCache->lumi != 1) {
0801           throw cms::Exception("end out of sequence")
0802               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0803               << iLB.luminosityBlock();
0804         }
0805         gel = true;
0806         gbl = false;
0807       }
0808 
0809       ~TestBeginLumiBlockFilter() {
0810         if (m_count != trans_) {
0811           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0812         }
0813       }
0814     };
0815 
0816     class TestEndLumiBlockFilter
0817         : public edm::stream::EDFilter<edm::LuminosityBlockCache<Cache>, edm::EndLuminosityBlockProducer> {
0818     public:
0819       static std::atomic<unsigned int> m_count;
0820       unsigned int trans_;
0821       static std::atomic<unsigned int> cvalue_;
0822       static std::atomic<bool> gbl;
0823       static std::atomic<bool> gel;
0824 
0825       TestEndLumiBlockFilter(edm::ParameterSet const& p) {
0826         trans_ = p.getParameter<int>("transitions");
0827         cvalue_ = p.getParameter<int>("cachevalue");
0828         m_count = 0;
0829         produces<unsigned int>();
0830         produces<unsigned int, edm::Transition::EndLuminosityBlock>("a");
0831       }
0832 
0833       bool filter(edm::Event&, edm::EventSetup const&) override {
0834         ++m_count;
0835 
0836         return true;
0837       }
0838 
0839       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0840                                                                edm::EventSetup const&,
0841                                                                RunContext const*) {
0842         ++m_count;
0843         gbl = true;
0844         gel = false;
0845         auto pCache = std::make_shared<Cache>();
0846         ++(pCache->lumi);
0847         return pCache;
0848       }
0849 
0850       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0851                                            edm::EventSetup const&,
0852                                            LuminosityBlockContext const* iLBContext) {
0853         ++m_count;
0854         auto pCache = iLBContext->luminosityBlock();
0855         if (pCache->lumi != 1) {
0856           throw cms::Exception("end out of sequence")
0857               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0858               << iLB.luminosityBlock();
0859         }
0860         gel = true;
0861         gbl = false;
0862       }
0863 
0864       static void globalEndLuminosityBlockProduce(edm::LuminosityBlock&,
0865                                                   edm::EventSetup const&,
0866                                                   LuminosityBlockContext const*) {
0867         ++m_count;
0868       }
0869 
0870       ~TestEndLumiBlockFilter() {
0871         if (m_count != trans_) {
0872           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0873         }
0874       }
0875     };
0876 
0877     class TestInputProcessBlockCache {
0878     public:
0879       long long int value_ = 0;
0880     };
0881 
0882     class TestInputProcessBlockCache1 {
0883     public:
0884       long long int value_ = 0;
0885     };
0886 
0887     class InputProcessBlockIntFilter
0888         : public edm::stream::EDFilter<
0889               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>> {
0890     public:
0891       explicit InputProcessBlockIntFilter(edm::ParameterSet const& pset) {
0892         {
0893           expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
0894           sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
0895           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0896           if (not tag.label().empty()) {
0897             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0898           }
0899         }
0900         {
0901           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0902           if (not tag.label().empty()) {
0903             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0904           }
0905         }
0906         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
0907             getTokenBegin_,
0908             [this](edm::ProcessBlock const& processBlock,
0909                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
0910               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
0911               returnValue->value_ += processBlock.get(getTokenBegin_).value;
0912               returnValue->value_ += processBlock.get(getTokenEnd_).value;
0913               return returnValue;
0914             });
0915       }
0916 
0917       static void accessInputProcessBlock(edm::ProcessBlock const&) {
0918         edm::LogAbsolute("InputProcessBlockIntFilter") << "InputProcessBlockIntFilter::accessInputProcessBlock";
0919       }
0920 
0921       bool filter(edm::Event& event, edm::EventSetup const&) override {
0922         auto cacheTuple = processBlockCaches(event);
0923         if (!expectedByRun_.empty()) {
0924           if (expectedByRun_.at(event.run() - 1) !=
0925               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
0926             throw cms::Exception("UnexpectedValue")
0927                 << "InputProcessBlockIntFilter::filter cached value was "
0928                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
0929                 << " but it was supposed to be " << expectedByRun_.at(event.run() - 1);
0930           }
0931         }
0932         // Force events to be processed concurrently
0933         if (sleepTime_ > 0) {
0934           usleep(sleepTime_);
0935         }
0936         return true;
0937       }
0938 
0939     private:
0940       edm::EDGetTokenT<IntProduct> getTokenBegin_;
0941       edm::EDGetTokenT<IntProduct> getTokenEnd_;
0942       std::vector<int> expectedByRun_;
0943       unsigned int sleepTime_{0};
0944     };
0945 
0946     struct InputProcessBlockGlobalCacheAn {
0947       // The tokens are duplicated in this test module to prove that they
0948       // work both as GlobalCache members and module data members.
0949       // We need them as GlobalCache members for accessInputProcessBlock.
0950       // In registerProcessBlockCacheFiller we use tokens that are member
0951       // variables of the class and because the lambda captures the "this"
0952       // pointer of the zeroth stream module instance. We always
0953       // use the zeroth EDConsumer. In the case of registerProcessBlockCacheFiller,
0954       // either set of tokens would work. Note that in the GlobalCache case
0955       // there is a slight weirdness that the zeroth consumer is used but
0956       // the token comes from the last consumer instance. It works because
0957       // all the stream module instances have EDConsumer base classes with
0958       // containers with the same contents in the same order (not 100% guaranteed,
0959       // but it would be difficult to implement a module where this isn't true).
0960       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBegin_;
0961       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEnd_;
0962       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBeginM_;
0963       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEndM_;
0964       mutable std::atomic<unsigned int> transitions_{0};
0965       int sum_{0};
0966       unsigned int expectedTransitions_{0};
0967       std::vector<int> expectedByRun_;
0968       int expectedSum_{0};
0969       unsigned int sleepTime_{0};
0970     };
0971 
0972     // Same thing as previous class except with a GlobalCache added
0973     class InputProcessBlockIntFilterG
0974         : public edm::stream::EDFilter<
0975               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>,
0976               edm::GlobalCache<InputProcessBlockGlobalCacheAn>> {
0977     public:
0978       explicit InputProcessBlockIntFilterG(edm::ParameterSet const& pset,
0979                                            InputProcessBlockGlobalCacheAn const* testGlobalCache) {
0980         {
0981           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0982           if (not tag.label().empty()) {
0983             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0984             testGlobalCache->getTokenBegin_ = getTokenBegin_;
0985           }
0986         }
0987         {
0988           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0989           if (not tag.label().empty()) {
0990             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0991             testGlobalCache->getTokenEnd_ = getTokenEnd_;
0992           }
0993         }
0994         {
0995           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlockM");
0996           if (not tag.label().empty()) {
0997             getTokenBeginM_ = consumes<IntProduct, edm::InProcess>(tag);
0998             testGlobalCache->getTokenBeginM_ = getTokenBeginM_;
0999           }
1000         }
1001         {
1002           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlockM");
1003           if (not tag.label().empty()) {
1004             getTokenEndM_ = consumes<IntProduct, edm::InProcess>(tag);
1005             testGlobalCache->getTokenEndM_ = getTokenEndM_;
1006           }
1007         }
1008         registerProcessBlockCacheFiller<int>(
1009             getTokenBegin_, [this](edm::ProcessBlock const& processBlock, std::shared_ptr<int> const& previousCache) {
1010               auto returnValue = std::make_shared<int>(0);
1011               *returnValue += processBlock.get(getTokenBegin_).value;
1012               *returnValue += processBlock.get(getTokenEnd_).value;
1013               ++globalCache()->transitions_;
1014               return returnValue;
1015             });
1016         registerProcessBlockCacheFiller<1>(getTokenBegin_,
1017                                            [this](edm::ProcessBlock const& processBlock,
1018                                                   std::shared_ptr<TestInputProcessBlockCache> const& previousCache) {
1019                                              auto returnValue = std::make_shared<TestInputProcessBlockCache>();
1020                                              returnValue->value_ += processBlock.get(getTokenBegin_).value;
1021                                              returnValue->value_ += processBlock.get(getTokenEnd_).value;
1022                                              ++globalCache()->transitions_;
1023                                              return returnValue;
1024                                            });
1025         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
1026             getTokenBegin_,
1027             [this](edm::ProcessBlock const& processBlock,
1028                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
1029               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
1030               returnValue->value_ += processBlock.get(getTokenBegin_).value;
1031               returnValue->value_ += processBlock.get(getTokenEnd_).value;
1032               ++globalCache()->transitions_;
1033               return returnValue;
1034             });
1035       }
1036 
1037       static std::unique_ptr<InputProcessBlockGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const& pset) {
1038         auto testGlobalCache = std::make_unique<InputProcessBlockGlobalCacheAn>();
1039         testGlobalCache->expectedTransitions_ = pset.getParameter<int>("transitions");
1040         testGlobalCache->expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
1041         testGlobalCache->expectedSum_ = pset.getParameter<int>("expectedSum");
1042         testGlobalCache->sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
1043         return testGlobalCache;
1044       }
1045 
1046       static void accessInputProcessBlock(edm::ProcessBlock const& processBlock,
1047                                           InputProcessBlockGlobalCacheAn* testGlobalCache) {
1048         if (processBlock.processName() == "PROD1") {
1049           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBegin_).value;
1050           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEnd_).value;
1051         }
1052         if (processBlock.processName() == "MERGE") {
1053           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBeginM_).value;
1054           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEndM_).value;
1055         }
1056         ++testGlobalCache->transitions_;
1057       }
1058 
1059       bool filter(edm::Event& event, edm::EventSetup const&) override {
1060         auto cacheTuple = processBlockCaches(event);
1061         auto testGlobalCache = globalCache();
1062         if (!testGlobalCache->expectedByRun_.empty()) {
1063           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != *std::get<edm::CacheHandle<int>>(cacheTuple)) {
1064             throw cms::Exception("UnexpectedValue")
1065                 << "InputProcessBlockIntFilterG::filter cached value was "
1066                 << *std::get<edm::CacheHandle<int>>(cacheTuple) << " but it was supposed to be "
1067                 << testGlobalCache->expectedByRun_.at(event.run() - 1);
1068           }
1069           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != std::get<1>(cacheTuple)->value_) {
1070             throw cms::Exception("UnexpectedValue")
1071                 << "InputProcessBlockIntFilterG::filter second cached value was " << std::get<1>(cacheTuple)->value_
1072                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
1073           }
1074           if (testGlobalCache->expectedByRun_.at(event.run() - 1) !=
1075               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
1076             throw cms::Exception("UnexpectedValue")
1077                 << "InputProcessBlockIntFilterG::filter third cached value was "
1078                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
1079                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
1080           }
1081         }
1082         ++testGlobalCache->transitions_;
1083 
1084         // Force events to be processed concurrently
1085         if (testGlobalCache->sleepTime_ > 0) {
1086           usleep(testGlobalCache->sleepTime_);
1087         }
1088         return true;
1089       }
1090 
1091       static void globalEndJob(InputProcessBlockGlobalCacheAn* testGlobalCache) {
1092         if (testGlobalCache->transitions_ != testGlobalCache->expectedTransitions_) {
1093           throw cms::Exception("transitions")
1094               << "InputProcessBlockIntFilterG transitions " << testGlobalCache->transitions_
1095               << " but it was supposed to be " << testGlobalCache->expectedTransitions_;
1096         }
1097 
1098         if (testGlobalCache->sum_ != testGlobalCache->expectedSum_) {
1099           throw cms::Exception("UnexpectedValue") << "InputProcessBlockIntFilterG sum " << testGlobalCache->sum_
1100                                                   << " but it was supposed to be " << testGlobalCache->expectedSum_;
1101         }
1102       }
1103 
1104     private:
1105       edm::EDGetTokenT<IntProduct> getTokenBegin_;
1106       edm::EDGetTokenT<IntProduct> getTokenEnd_;
1107       edm::EDGetTokenT<IntProduct> getTokenBeginM_;
1108       edm::EDGetTokenT<IntProduct> getTokenEndM_;
1109     };
1110 
1111   }  // namespace stream
1112 }  // namespace edmtest
1113 std::atomic<unsigned int> edmtest::stream::GlobalIntFilter::m_count{0};
1114 std::atomic<unsigned int> edmtest::stream::RunIntFilter::m_count{0};
1115 std::atomic<unsigned int> edmtest::stream::LumiIntFilter::m_count{0};
1116 std::atomic<unsigned int> edmtest::stream::RunSummaryIntFilter::m_count{0};
1117 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntFilter::m_count{0};
1118 std::atomic<unsigned int> edmtest::stream::TestBeginRunFilter::m_count{0};
1119 std::atomic<unsigned int> edmtest::stream::TestEndRunFilter::m_count{0};
1120 std::atomic<unsigned int> edmtest::stream::TestBeginLumiBlockFilter::m_count{0};
1121 std::atomic<unsigned int> edmtest::stream::TestEndLumiBlockFilter::m_count{0};
1122 std::atomic<unsigned int> edmtest::stream::GlobalIntFilter::cvalue_{0};
1123 std::atomic<unsigned int> edmtest::stream::RunIntFilter::cvalue_{0};
1124 std::atomic<unsigned int> edmtest::stream::LumiIntFilter::cvalue_{0};
1125 std::atomic<unsigned int> edmtest::stream::RunSummaryIntFilter::cvalue_{0};
1126 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntFilter::cvalue_{0};
1127 std::atomic<unsigned int> edmtest::stream::TestBeginRunFilter::cvalue_{0};
1128 std::atomic<unsigned int> edmtest::stream::TestEndRunFilter::cvalue_{0};
1129 std::atomic<unsigned int> edmtest::stream::TestBeginLumiBlockFilter::cvalue_{0};
1130 std::atomic<unsigned int> edmtest::stream::TestEndLumiBlockFilter::cvalue_{0};
1131 std::atomic<bool> edmtest::stream::RunSummaryIntFilter::globalBeginRunCalled_{false};
1132 std::atomic<bool> edmtest::stream::LumiSummaryIntFilter::globalBeginLumiCalled_{false};
1133 std::atomic<bool> edmtest::stream::TestBeginRunFilter::gbr{false};
1134 std::atomic<bool> edmtest::stream::TestBeginRunFilter::ger{false};
1135 std::atomic<bool> edmtest::stream::TestEndRunFilter::gbr{false};
1136 std::atomic<bool> edmtest::stream::TestEndRunFilter::ger{false};
1137 std::atomic<bool> edmtest::stream::TestBeginLumiBlockFilter::gbl{false};
1138 std::atomic<bool> edmtest::stream::TestBeginLumiBlockFilter::gel{false};
1139 std::atomic<bool> edmtest::stream::TestEndLumiBlockFilter::gbl{false};
1140 std::atomic<bool> edmtest::stream::TestEndLumiBlockFilter::gel{false};
1141 DEFINE_FWK_MODULE(edmtest::stream::GlobalIntFilter);
1142 DEFINE_FWK_MODULE(edmtest::stream::RunIntFilter);
1143 DEFINE_FWK_MODULE(edmtest::stream::LumiIntFilter);
1144 DEFINE_FWK_MODULE(edmtest::stream::RunSummaryIntFilter);
1145 DEFINE_FWK_MODULE(edmtest::stream::LumiSummaryIntFilter);
1146 DEFINE_FWK_MODULE(edmtest::stream::ProcessBlockIntFilter);
1147 DEFINE_FWK_MODULE(edmtest::stream::TestBeginProcessBlockFilter);
1148 DEFINE_FWK_MODULE(edmtest::stream::TestEndProcessBlockFilter);
1149 DEFINE_FWK_MODULE(edmtest::stream::TestBeginRunFilter);
1150 DEFINE_FWK_MODULE(edmtest::stream::TestEndRunFilter);
1151 DEFINE_FWK_MODULE(edmtest::stream::TestBeginLumiBlockFilter);
1152 DEFINE_FWK_MODULE(edmtest::stream::TestEndLumiBlockFilter);
1153 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntFilter);
1154 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntFilterG);