Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-08-13 23:27:15

0001 
0002 /*----------------------------------------------------------------------
0003 
0004 Toy edm::stream::EDAnalyzer modules of
0005 edm::*Cache templates
0006 for testing purposes only.
0007 
0008 ----------------------------------------------------------------------*/
0009 #include <atomic>
0010 #include <functional>
0011 #include <iostream>
0012 #include <map>
0013 #include <tuple>
0014 #include <unistd.h>
0015 #include <vector>
0016 
0017 #include "FWCore/Framework/interface/CacheHandle.h"
0018 #include "FWCore/Framework/interface/stream/EDAnalyzer.h"
0019 #include "FWCore/Framework/interface/maker/WorkerT.h"
0020 #include "FWCore/Framework/interface/HistoryAppender.h"
0021 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0022 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0023 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0024 #include "FWCore/Framework/interface/Event.h"
0025 #include "FWCore/Framework/interface/MakerMacros.h"
0026 #include "FWCore/Framework/interface/ProcessBlock.h"
0027 #include "FWCore/Framework/interface/TriggerNamesService.h"
0028 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0030 #include "FWCore/ServiceRegistry/interface/Service.h"
0031 #include "FWCore/Utilities/interface/EDMException.h"
0032 #include "DataFormats/Provenance/interface/BranchDescription.h"
0033 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0034 
0035 namespace edmtest {
0036   namespace stream {
0037 
0038     // anonymous namespace here causes build warnings
0039     namespace cache {
0040       struct Cache {
0041         Cache() : value(0), run(0), lumi(0) {}
0042         //Using mutable since we want to update the value.
0043         mutable std::atomic<unsigned int> value;
0044         mutable std::atomic<unsigned int> run;
0045         mutable std::atomic<unsigned int> lumi;
0046       };
0047       struct TestGlobalCacheAn {
0048         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenBegin_;
0049         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenEnd_;
0050         unsigned int trans_{0};
0051         mutable std::atomic<unsigned int> m_count{0};
0052       };
0053     }  // namespace cache
0054 
0055     using Cache = cache::Cache;
0056     using TestGlobalCacheAn = cache::TestGlobalCacheAn;
0057 
0058     class GlobalIntAnalyzer : public edm::stream::EDAnalyzer<edm::GlobalCache<Cache>> {
0059     public:
0060       static std::atomic<unsigned int> m_count;
0061       unsigned int trans_;
0062       static std::atomic<unsigned int> cvalue_;
0063 
0064       static std::unique_ptr<Cache> initializeGlobalCache(edm::ParameterSet const& p) {
0065         ++m_count;
0066         return std::make_unique<Cache>();
0067       }
0068 
0069       GlobalIntAnalyzer(edm::ParameterSet const& p, Cache const* iGlobal) {
0070         trans_ = p.getParameter<int>("transitions");
0071         cvalue_ = p.getParameter<int>("cachevalue");
0072         callWhenNewProductsRegistered([](edm::BranchDescription const& desc) {
0073           std::cout << "stream::GlobalIntAnalyzer " << desc.moduleLabel() << std::endl;
0074         });
0075       }
0076 
0077       static void globalBeginJob(Cache* iGlobal) {
0078         ++m_count;
0079         if (iGlobal->value != 0) {
0080           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
0081         }
0082       }
0083 
0084       void analyze(edm::Event const&, edm::EventSetup const&) {
0085         ++m_count;
0086         ++((globalCache())->value);
0087       }
0088 
0089       static void globalEndJob(Cache* iGlobal) {
0090         ++m_count;
0091         if (iGlobal->value != cvalue_) {
0092           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be " << cvalue_;
0093         }
0094       }
0095 
0096       ~GlobalIntAnalyzer() {
0097         if (m_count != trans_) {
0098           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0099         }
0100       }
0101     };
0102 
0103     class RunIntAnalyzer : public edm::stream::EDAnalyzer<edm::RunCache<Cache>> {
0104     public:
0105       static std::atomic<unsigned int> m_count;
0106       unsigned int trans_;
0107       static std::atomic<unsigned int> cvalue_;
0108       static std::atomic<bool> gbr;
0109       static std::atomic<bool> ger;
0110       bool br;
0111       bool er;
0112 
0113       RunIntAnalyzer(edm::ParameterSet const& p) {
0114         trans_ = p.getParameter<int>("transitions");
0115         cvalue_ = p.getParameter<int>("cachevalue");
0116         m_count = 0;
0117       }
0118 
0119       void analyze(edm::Event const&, edm::EventSetup const&) override {
0120         if (moduleDescription().processName() != edm::Service<edm::service::TriggerNamesService>()->getProcessName()) {
0121           throw cms::Exception("LogicError") << "module description not properly initialized in stream analyzer";
0122         }
0123         ++m_count;
0124         ++(runCache()->value);
0125       }
0126 
0127       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0128         ++m_count;
0129         gbr = true;
0130         ger = false;
0131         auto pCache = std::make_shared<Cache>();
0132         ++(pCache->run);
0133         return pCache;
0134       }
0135 
0136       void beginRun(edm::Run const&, edm::EventSetup const&) override {
0137         br = true;
0138         er = true;
0139         if (!gbr) {
0140           throw cms::Exception("begin out of sequence") << "beginRun seen before globalBeginRun";
0141         }
0142       }
0143 
0144       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0145         ++m_count;
0146         auto pCache = iContext->run();
0147         if (pCache->run != 1) {
0148           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0149         }
0150         ger = true;
0151         gbr = false;
0152         if (iContext->run()->value != cvalue_) {
0153           throw cms::Exception("cache value") << iContext->run()->value << " but it was supposed to be " << cvalue_;
0154         }
0155       }
0156 
0157       void endRun(edm::Run const&, edm::EventSetup const&) override {
0158         er = true;
0159         br = false;
0160         if (ger) {
0161           throw cms::Exception("end out of sequence") << "globalEndRun seen before endRun";
0162         }
0163       }
0164 
0165       ~RunIntAnalyzer() {
0166         if (m_count != trans_) {
0167           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0168         }
0169       }
0170     };
0171 
0172     class LumiIntAnalyzer : public edm::stream::EDAnalyzer<edm::LuminosityBlockCache<Cache>> {
0173     public:
0174       static std::atomic<unsigned int> m_count;
0175       unsigned int trans_;
0176       static std::atomic<unsigned int> cvalue_;
0177       static std::atomic<bool> gbl;
0178       static std::atomic<bool> gel;
0179       static std::atomic<bool> bl;
0180       static std::atomic<bool> el;
0181 
0182       LumiIntAnalyzer(edm::ParameterSet const& p) {
0183         trans_ = p.getParameter<int>("transitions");
0184         cvalue_ = p.getParameter<int>("cachevalue");
0185         m_count = 0;
0186         // just to create a data dependence
0187         auto const& tag = p.getParameter<edm::InputTag>("moduleLabel");
0188         if (not tag.label().empty()) {
0189           consumes<unsigned int, edm::InLumi>(tag);
0190         }
0191       }
0192 
0193       void analyze(edm::Event const&, edm::EventSetup const&) override {
0194         ++m_count;
0195         ++(luminosityBlockCache()->value);
0196       }
0197 
0198       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0199                                                                edm::EventSetup const&,
0200                                                                RunContext const*) {
0201         ++m_count;
0202         gbl = true;
0203         gel = false;
0204         auto pCache = std::make_shared<Cache>();
0205         ++(pCache->lumi);
0206         return pCache;
0207       }
0208 
0209       void beginLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0210         bl = true;
0211         el = false;
0212         if (!gbl) {
0213           throw cms::Exception("begin out of sequence")
0214               << "beginLuminosityBlock seen before globalBeginLuminosityBlock";
0215         }
0216       }
0217 
0218       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0219                                            edm::EventSetup const&,
0220                                            LuminosityBlockContext const* iLBContext) {
0221         ++m_count;
0222         auto pCache = iLBContext->luminosityBlock();
0223         if (pCache->lumi != 1) {
0224           throw cms::Exception("end out of sequence")
0225               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0226               << iLB.luminosityBlock();
0227         }
0228         gel = true;
0229         gbl = false;
0230         if (iLBContext->luminosityBlock()->value != cvalue_) {
0231           throw cms::Exception("cache value")
0232               << iLBContext->luminosityBlock()->value << " but it was supposed to be " << cvalue_;
0233         }
0234       }
0235 
0236       static void endLuminosityBlock(edm::Run const&, edm::EventSetup const&, LuminosityBlockContext const*) {
0237         el = true;
0238         bl = false;
0239         if (gel) {
0240           throw cms::Exception("end out of sequence") << "globalEndLuminosityBlock seen before endLuminosityBlock";
0241         }
0242       }
0243 
0244       ~LumiIntAnalyzer() {
0245         if (m_count != trans_) {
0246           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0247         }
0248       }
0249     };
0250 
0251     class RunSummaryIntAnalyzer : public edm::stream::EDAnalyzer<edm::RunCache<Cache>, edm::RunSummaryCache<Cache>> {
0252     public:
0253       static std::atomic<unsigned int> m_count;
0254       unsigned int trans_;
0255       static std::atomic<unsigned int> cvalue_;
0256       static std::atomic<bool> gbr;
0257       static std::atomic<bool> ger;
0258       static std::atomic<bool> gbrs;
0259       static std::atomic<bool> gers;
0260       static std::atomic<bool> brs;
0261       static std::atomic<bool> ers;
0262       static std::atomic<bool> br;
0263       static std::atomic<bool> er;
0264 
0265       RunSummaryIntAnalyzer(edm::ParameterSet const& p) {
0266         trans_ = p.getParameter<int>("transitions");
0267         cvalue_ = p.getParameter<int>("cachevalue");
0268         m_count = 0;
0269       }
0270 
0271       void analyze(edm::Event const&, edm::EventSetup const&) override {
0272         ++m_count;
0273         ++(runCache()->value);
0274       }
0275 
0276       void beginRun(edm::Run const&, edm::EventSetup const&) override {
0277         br = true;
0278         er = false;
0279       }
0280 
0281       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0282         ++m_count;
0283         gbr = true;
0284         ger = false;
0285         auto pCache = std::make_shared<Cache>();
0286         ++(pCache->run);
0287         return pCache;
0288       }
0289 
0290       static std::shared_ptr<Cache> globalBeginRunSummary(edm::Run const&, edm::EventSetup const&, GlobalCache const*) {
0291         ++m_count;
0292         gbrs = true;
0293         gers = false;
0294         brs = true;
0295         ers = false;
0296         if (!gbr) {
0297           throw cms::Exception("begin out of sequence") << "globalBeginRunSummary seen before globalBeginRun";
0298         }
0299         return std::make_shared<Cache>();
0300       }
0301 
0302       void endRunSummary(edm::Run const&, edm::EventSetup const&, Cache* gCache) const override {
0303         brs = false;
0304         ers = true;
0305         gCache->value += runCache()->value;
0306         runCache()->value = 0;
0307         if (!er) {
0308           throw cms::Exception("end out of sequence") << "endRunSummary seen before endRun";
0309         }
0310       }
0311 
0312       static void globalEndRunSummary(edm::Run const&, edm::EventSetup const&, RunContext const*, Cache* gCache) {
0313         ++m_count;
0314         gbrs = false;
0315         gers = true;
0316         if (!ers) {
0317           throw cms::Exception("end out of sequence") << "globalEndRunSummary seen before endRunSummary";
0318         }
0319         if (gCache->value != cvalue_) {
0320           throw cms::Exception("cache value") << gCache->value << " but it was supposed to be " << cvalue_;
0321         }
0322       }
0323 
0324       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0325         ++m_count;
0326         gbr = false;
0327         ger = true;
0328         auto pCache = iContext->run();
0329         if (pCache->run != 1) {
0330           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0331         }
0332       }
0333 
0334       void endRun(edm::Run const&, edm::EventSetup const&) override {
0335         er = true;
0336         br = false;
0337       }
0338 
0339       ~RunSummaryIntAnalyzer() {
0340         if (m_count != trans_) {
0341           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0342         }
0343       }
0344     };
0345 
0346     class LumiSummaryIntAnalyzer
0347         : public edm::stream::EDAnalyzer<edm::LuminosityBlockCache<Cache>, edm::LuminosityBlockSummaryCache<Cache>> {
0348     public:
0349       static std::atomic<unsigned int> m_count;
0350       static std::atomic<unsigned int> m_lumiSumCalls;
0351       unsigned int trans_;
0352       static std::atomic<unsigned int> cvalue_;
0353       static std::atomic<bool> gbl;
0354       static std::atomic<bool> gel;
0355       static std::atomic<bool> gbls;
0356       static std::atomic<bool> gels;
0357       static std::atomic<bool> bls;
0358       static std::atomic<bool> els;
0359       static std::atomic<bool> bl;
0360       static std::atomic<bool> el;
0361 
0362       LumiSummaryIntAnalyzer(edm::ParameterSet const& p) {
0363         trans_ = p.getParameter<int>("transitions");
0364         cvalue_ = p.getParameter<int>("cachevalue");
0365         m_count = 0;
0366       }
0367 
0368       void analyze(edm::Event const&, edm::EventSetup const&) override {
0369         ++m_count;
0370         ++(luminosityBlockCache()->value);
0371       }
0372 
0373       void beginLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0374         bl = true;
0375         el = false;
0376       }
0377 
0378       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0379                                                                edm::EventSetup const&,
0380                                                                RunContext const*) {
0381         ++m_count;
0382         gbl = true;
0383         gel = false;
0384         auto pCache = std::make_shared<Cache>();
0385         ++(pCache->lumi);
0386         return pCache;
0387       }
0388 
0389       static std::shared_ptr<Cache> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
0390                                                                       edm::EventSetup const&,
0391                                                                       LuminosityBlockContext const*) {
0392         ++m_count;
0393         gbls = true;
0394         gels = false;
0395         bls = true;
0396         els = false;
0397         if (!gbl) {
0398           throw cms::Exception("begin out of sequence")
0399               << "globalBeginLuminosityBlockSummary seen before globalBeginLuminosityBlock";
0400         }
0401         return std::make_shared<Cache>();
0402       }
0403 
0404       void endLuminosityBlockSummary(edm::LuminosityBlock const&,
0405                                      edm::EventSetup const&,
0406                                      Cache* gCache) const override {
0407         ++m_lumiSumCalls;
0408         bls = false;
0409         els = true;
0410         //This routine could be called at the same time as another stream is calling analyze so must do the change atomically
0411         auto v = luminosityBlockCache()->value.exchange(0);
0412         gCache->value += v;
0413         if (el) {
0414           throw cms::Exception("end out of sequence") << "endLuminosityBlock seen before endLuminosityBlockSummary";
0415         }
0416       }
0417 
0418       static void globalEndLuminosityBlockSummary(edm::LuminosityBlock const&,
0419                                                   edm::EventSetup const&,
0420                                                   LuminosityBlockContext const*,
0421                                                   Cache* gCache) {
0422         ++m_count;
0423         auto nLumis = m_lumiSumCalls.load();
0424         gbls = false;
0425         gels = true;
0426         if (!els) {
0427           throw cms::Exception("end out of sequence")
0428               << "globalEndLuminosityBlockSummary seen before endLuminosityBlockSummary";
0429         }
0430         if (gCache->value != cvalue_) {
0431           throw cms::Exception("cache value")
0432               << gCache->value << " but it was supposed to be " << cvalue_ << " endLumiBlockSummary called " << nLumis;
0433         }
0434       }
0435 
0436       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0437                                            edm::EventSetup const&,
0438                                            LuminosityBlockContext const* iLBContext) {
0439         ++m_count;
0440         auto pCache = iLBContext->luminosityBlock();
0441         if (pCache->lumi != 1) {
0442           throw cms::Exception("end out of sequence")
0443               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0444               << iLB.luminosityBlock();
0445         }
0446         gel = true;
0447         gbl = false;
0448         if (!gels) {
0449           throw cms::Exception("end out of sequence")
0450               << "globalEndLuminosityBlockSummary seen before globalEndLuminosityBlock";
0451         }
0452       }
0453 
0454       static void endLuminosityBlock(edm::Run const&, edm::EventSetup const&, LuminosityBlockContext const*) {
0455         el = true;
0456         bl = false;
0457       }
0458 
0459       ~LumiSummaryIntAnalyzer() {
0460         if (m_count != trans_) {
0461           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0462         }
0463       }
0464     };
0465 
0466     class ProcessBlockIntAnalyzer
0467         : public edm::stream::EDAnalyzer<edm::WatchProcessBlock, edm::GlobalCache<TestGlobalCacheAn>> {
0468     public:
0469       explicit ProcessBlockIntAnalyzer(edm::ParameterSet const& pset, TestGlobalCacheAn const* testGlobalCache) {
0470         {
0471           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0472           if (not tag.label().empty()) {
0473             testGlobalCache->getTokenBegin_ = consumes<unsigned int, edm::InProcess>(tag);
0474           }
0475         }
0476         {
0477           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0478           if (not tag.label().empty()) {
0479             testGlobalCache->getTokenEnd_ = consumes<unsigned int, edm::InProcess>(tag);
0480           }
0481         }
0482       }
0483 
0484       static std::unique_ptr<TestGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const& pset) {
0485         auto testGlobalCache = std::make_unique<TestGlobalCacheAn>();
0486         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0487         return testGlobalCache;
0488       }
0489 
0490       static void beginProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheAn* testGlobalCache) {
0491         if (testGlobalCache->m_count != 0) {
0492           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer::begin transitions "
0493                                               << testGlobalCache->m_count << " but it was supposed to be " << 0;
0494         }
0495         ++testGlobalCache->m_count;
0496 
0497         const unsigned int valueToGet = 51;
0498         if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0499           if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0500             throw cms::Exception("BadValue")
0501                 << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0502           }
0503         }
0504       }
0505 
0506       void analyze(edm::Event const&, edm::EventSetup const&) override {
0507         TestGlobalCacheAn const* testGlobalCache = globalCache();
0508         if (testGlobalCache->m_count < 1u) {
0509           throw cms::Exception("out of sequence") << "produce before beginProcessBlock " << testGlobalCache->m_count;
0510         }
0511         ++testGlobalCache->m_count;
0512       }
0513 
0514       static void endProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheAn* testGlobalCache) {
0515         ++testGlobalCache->m_count;
0516         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0517           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer::end transitions " << testGlobalCache->m_count
0518                                               << " but it was supposed to be " << testGlobalCache->trans_;
0519         }
0520         {
0521           const unsigned int valueToGet = 51;
0522           if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0523             if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0524               throw cms::Exception("BadValue")
0525                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0526             }
0527           }
0528         }
0529         {
0530           const unsigned int valueToGet = 61;
0531           if (not testGlobalCache->getTokenEnd_.isUninitialized()) {
0532             if (processBlock.get(testGlobalCache->getTokenEnd_) != valueToGet) {
0533               throw cms::Exception("BadValue")
0534                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenEnd_);
0535             }
0536           }
0537         }
0538       }
0539 
0540       static void globalEndJob(TestGlobalCacheAn* testGlobalCache) {
0541         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0542           throw cms::Exception("transitions")
0543               << "TestBeginProcessBlockAnalyzer transitions " << testGlobalCache->m_count
0544               << " but it was supposed to be " << testGlobalCache->trans_;
0545         }
0546       }
0547 
0548       ~ProcessBlockIntAnalyzer() {
0549         TestGlobalCacheAn const* testGlobalCache = globalCache();
0550         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0551           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer transitions " << testGlobalCache->m_count
0552                                               << " but it was supposed to be " << testGlobalCache->trans_;
0553         }
0554       }
0555     };
0556 
0557     class TestInputProcessBlockCache {
0558     public:
0559       long long int value_ = 0;
0560     };
0561 
0562     class TestInputProcessBlockCache1 {
0563     public:
0564       long long int value_ = 0;
0565     };
0566 
0567     class InputProcessBlockIntAnalyzer
0568         : public edm::stream::EDAnalyzer<
0569               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>> {
0570     public:
0571       explicit InputProcessBlockIntAnalyzer(edm::ParameterSet const& pset) {
0572         {
0573           expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
0574           sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
0575           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0576           if (not tag.label().empty()) {
0577             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0578           }
0579         }
0580         {
0581           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0582           if (not tag.label().empty()) {
0583             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0584           }
0585         }
0586         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
0587             getTokenBegin_,
0588             [this](edm::ProcessBlock const& processBlock,
0589                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
0590               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
0591               returnValue->value_ += processBlock.get(getTokenBegin_).value;
0592               returnValue->value_ += processBlock.get(getTokenEnd_).value;
0593               return returnValue;
0594             });
0595       }
0596 
0597       static void accessInputProcessBlock(edm::ProcessBlock const&) {
0598         edm::LogAbsolute("InputProcessBlockIntAnalyzer") << "InputProcessBlockIntAnalyzer::accessInputProcessBlock";
0599       }
0600 
0601       void analyze(edm::Event const& event, edm::EventSetup const&) override {
0602         auto cacheTuple = processBlockCaches(event);
0603         if (!expectedByRun_.empty()) {
0604           if (expectedByRun_.at(event.run() - 1) !=
0605               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
0606             throw cms::Exception("UnexpectedValue")
0607                 << "InputProcessBlockIntAnalyzer::analyze cached value was "
0608                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
0609                 << " but it was supposed to be " << expectedByRun_.at(event.run() - 1);
0610           }
0611         }
0612         // Force events to be processed concurrently
0613         if (sleepTime_ > 0) {
0614           usleep(sleepTime_);
0615         }
0616       }
0617 
0618     private:
0619       edm::EDGetTokenT<IntProduct> getTokenBegin_;
0620       edm::EDGetTokenT<IntProduct> getTokenEnd_;
0621       std::vector<int> expectedByRun_;
0622       unsigned int sleepTime_{0};
0623     };
0624 
0625     struct InputProcessBlockGlobalCacheAn {
0626       // The tokens are duplicated in this test module to prove that they
0627       // work both as GlobalCache members and module data members.
0628       // We need them as GlobalCache members for accessInputProcessBlock.
0629       // In registerProcessBlockCacheFiller we use tokens that are member
0630       // variables of the class and because the lambda captures the "this"
0631       // pointer of the zeroth stream module instance. We always
0632       // use the zeroth EDConsumer. In the case of registerProcessBlockCacheFiller,
0633       // either set of tokens would work. Note that in the GlobalCache case
0634       // there is a slight weirdness that the zeroth consumer is used but
0635       // the token comes from the last consumer instance. It works because
0636       // all the stream module instances have EDConsumer base classes with
0637       // containers with the same contents in the same order (not 100% guaranteed,
0638       // but it would be difficult to implement a module where this isn't true).
0639       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBegin_;
0640       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEnd_;
0641       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBeginM_;
0642       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEndM_;
0643       mutable std::atomic<unsigned int> transitions_{0};
0644       int sum_{0};
0645       unsigned int expectedTransitions_{0};
0646       std::vector<int> expectedByRun_;
0647       int expectedSum_{0};
0648       unsigned int sleepTime_{0};
0649     };
0650 
0651     // Same thing as previous class except with a GlobalCache added
0652     class InputProcessBlockIntAnalyzerG
0653         : public edm::stream::EDAnalyzer<
0654               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>,
0655               edm::GlobalCache<InputProcessBlockGlobalCacheAn>> {
0656     public:
0657       explicit InputProcessBlockIntAnalyzerG(edm::ParameterSet const& pset,
0658                                              InputProcessBlockGlobalCacheAn const* testGlobalCache) {
0659         {
0660           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0661           if (not tag.label().empty()) {
0662             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0663             testGlobalCache->getTokenBegin_ = getTokenBegin_;
0664           }
0665         }
0666         {
0667           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0668           if (not tag.label().empty()) {
0669             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0670             testGlobalCache->getTokenEnd_ = getTokenEnd_;
0671           }
0672         }
0673         {
0674           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlockM");
0675           if (not tag.label().empty()) {
0676             getTokenBeginM_ = consumes<IntProduct, edm::InProcess>(tag);
0677             testGlobalCache->getTokenBeginM_ = getTokenBeginM_;
0678           }
0679         }
0680         {
0681           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlockM");
0682           if (not tag.label().empty()) {
0683             getTokenEndM_ = consumes<IntProduct, edm::InProcess>(tag);
0684             testGlobalCache->getTokenEndM_ = getTokenEndM_;
0685           }
0686         }
0687         registerProcessBlockCacheFiller<int>(
0688             getTokenBegin_, [this](edm::ProcessBlock const& processBlock, std::shared_ptr<int> const& previousCache) {
0689               auto returnValue = std::make_shared<int>(0);
0690               *returnValue += processBlock.get(getTokenBegin_).value;
0691               *returnValue += processBlock.get(getTokenEnd_).value;
0692               ++globalCache()->transitions_;
0693               return returnValue;
0694             });
0695         registerProcessBlockCacheFiller<1>(getTokenBegin_,
0696                                            [this](edm::ProcessBlock const& processBlock,
0697                                                   std::shared_ptr<TestInputProcessBlockCache> const& previousCache) {
0698                                              auto returnValue = std::make_shared<TestInputProcessBlockCache>();
0699                                              returnValue->value_ += processBlock.get(getTokenBegin_).value;
0700                                              returnValue->value_ += processBlock.get(getTokenEnd_).value;
0701                                              ++globalCache()->transitions_;
0702                                              return returnValue;
0703                                            });
0704         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
0705             getTokenBegin_,
0706             [this](edm::ProcessBlock const& processBlock,
0707                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
0708               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
0709               returnValue->value_ += processBlock.get(getTokenBegin_).value;
0710               returnValue->value_ += processBlock.get(getTokenEnd_).value;
0711               ++globalCache()->transitions_;
0712               return returnValue;
0713             });
0714       }
0715 
0716       static std::unique_ptr<InputProcessBlockGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const& pset) {
0717         auto testGlobalCache = std::make_unique<InputProcessBlockGlobalCacheAn>();
0718         testGlobalCache->expectedTransitions_ = pset.getParameter<int>("transitions");
0719         testGlobalCache->expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
0720         testGlobalCache->expectedSum_ = pset.getParameter<int>("expectedSum");
0721         testGlobalCache->sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
0722         return testGlobalCache;
0723       }
0724 
0725       static void accessInputProcessBlock(edm::ProcessBlock const& processBlock,
0726                                           InputProcessBlockGlobalCacheAn* testGlobalCache) {
0727         if (processBlock.processName() == "PROD1") {
0728           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBegin_).value;
0729           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEnd_).value;
0730         }
0731         if (processBlock.processName() == "MERGE") {
0732           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBeginM_).value;
0733           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEndM_).value;
0734         }
0735         ++testGlobalCache->transitions_;
0736       }
0737 
0738       void analyze(edm::Event const& event, edm::EventSetup const&) override {
0739         auto cacheTuple = processBlockCaches(event);
0740         auto testGlobalCache = globalCache();
0741         if (!testGlobalCache->expectedByRun_.empty()) {
0742           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != *std::get<edm::CacheHandle<int>>(cacheTuple)) {
0743             throw cms::Exception("UnexpectedValue")
0744                 << "InputProcessBlockIntAnalyzerG::analyze cached value was "
0745                 << *std::get<edm::CacheHandle<int>>(cacheTuple) << " but it was supposed to be "
0746                 << testGlobalCache->expectedByRun_.at(event.run() - 1);
0747           }
0748           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != std::get<1>(cacheTuple)->value_) {
0749             throw cms::Exception("UnexpectedValue")
0750                 << "InputProcessBlockIntAnalyzerG::analyze second cached value was " << std::get<1>(cacheTuple)->value_
0751                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
0752           }
0753           if (testGlobalCache->expectedByRun_.at(event.run() - 1) !=
0754               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
0755             throw cms::Exception("UnexpectedValue")
0756                 << "InputProcessBlockIntAnalyzerG::analyze third cached value was "
0757                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
0758                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
0759           }
0760         }
0761         ++testGlobalCache->transitions_;
0762 
0763         // Force events to be processed concurrently
0764         if (testGlobalCache->sleepTime_ > 0) {
0765           usleep(testGlobalCache->sleepTime_);
0766         }
0767       }
0768 
0769       static void globalEndJob(InputProcessBlockGlobalCacheAn* testGlobalCache) {
0770         if (testGlobalCache->transitions_ != testGlobalCache->expectedTransitions_) {
0771           throw cms::Exception("transitions")
0772               << "InputProcessBlockIntAnalyzerG transitions " << testGlobalCache->transitions_
0773               << " but it was supposed to be " << testGlobalCache->expectedTransitions_;
0774         }
0775 
0776         if (testGlobalCache->sum_ != testGlobalCache->expectedSum_) {
0777           throw cms::Exception("UnexpectedValue") << "InputProcessBlockIntAnalyzerG sum " << testGlobalCache->sum_
0778                                                   << " but it was supposed to be " << testGlobalCache->expectedSum_;
0779         }
0780       }
0781 
0782     private:
0783       edm::EDGetTokenT<IntProduct> getTokenBegin_;
0784       edm::EDGetTokenT<IntProduct> getTokenEnd_;
0785       edm::EDGetTokenT<IntProduct> getTokenBeginM_;
0786       edm::EDGetTokenT<IntProduct> getTokenEndM_;
0787     };
0788 
0789     // The next two test that modules without the
0790     // static accessInputProcessBlock function will build.
0791     // And also that modules with no functor registered run.
0792 
0793     class InputProcessBlockIntAnalyzerNS
0794         : public edm::stream::EDAnalyzer<edm::InputProcessBlockCache<int, TestInputProcessBlockCache>> {
0795     public:
0796       explicit InputProcessBlockIntAnalyzerNS(edm::ParameterSet const& pset) {}
0797       void analyze(edm::Event const&, edm::EventSetup const&) override {}
0798     };
0799 
0800     // Same thing as previous class except with a GlobalCache added
0801     class InputProcessBlockIntAnalyzerGNS
0802         : public edm::stream::EDAnalyzer<edm::InputProcessBlockCache<int, TestInputProcessBlockCache>,
0803                                          edm::GlobalCache<TestGlobalCacheAn>> {
0804     public:
0805       explicit InputProcessBlockIntAnalyzerGNS(edm::ParameterSet const& pset,
0806                                                TestGlobalCacheAn const* testGlobalCache) {}
0807       static std::unique_ptr<TestGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const&) {
0808         return std::make_unique<TestGlobalCacheAn>();
0809       }
0810       void analyze(edm::Event const&, edm::EventSetup const&) override {}
0811       static void globalEndJob(TestGlobalCacheAn* testGlobalCache) {}
0812     };
0813 
0814   }  // namespace stream
0815 }  // namespace edmtest
0816 std::atomic<unsigned int> edmtest::stream::GlobalIntAnalyzer::m_count{0};
0817 std::atomic<unsigned int> edmtest::stream::RunIntAnalyzer::m_count{0};
0818 std::atomic<unsigned int> edmtest::stream::LumiIntAnalyzer::m_count{0};
0819 std::atomic<unsigned int> edmtest::stream::RunSummaryIntAnalyzer::m_count{0};
0820 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntAnalyzer::m_count{0};
0821 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntAnalyzer::m_lumiSumCalls{0};
0822 std::atomic<unsigned int> edmtest::stream::GlobalIntAnalyzer::cvalue_{0};
0823 std::atomic<unsigned int> edmtest::stream::RunIntAnalyzer::cvalue_{0};
0824 std::atomic<unsigned int> edmtest::stream::LumiIntAnalyzer::cvalue_{0};
0825 std::atomic<unsigned int> edmtest::stream::RunSummaryIntAnalyzer::cvalue_{0};
0826 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntAnalyzer::cvalue_{0};
0827 std::atomic<bool> edmtest::stream::RunIntAnalyzer::gbr{false};
0828 std::atomic<bool> edmtest::stream::RunIntAnalyzer::ger{false};
0829 std::atomic<bool> edmtest::stream::LumiIntAnalyzer::gbl{false};
0830 std::atomic<bool> edmtest::stream::LumiIntAnalyzer::gel{false};
0831 std::atomic<bool> edmtest::stream::LumiIntAnalyzer::bl{false};
0832 std::atomic<bool> edmtest::stream::LumiIntAnalyzer::el{false};
0833 std::atomic<bool> edmtest::stream::RunSummaryIntAnalyzer::gbr{false};
0834 std::atomic<bool> edmtest::stream::RunSummaryIntAnalyzer::ger{false};
0835 std::atomic<bool> edmtest::stream::RunSummaryIntAnalyzer::gbrs{false};
0836 std::atomic<bool> edmtest::stream::RunSummaryIntAnalyzer::gers{false};
0837 std::atomic<bool> edmtest::stream::RunSummaryIntAnalyzer::brs{false};
0838 std::atomic<bool> edmtest::stream::RunSummaryIntAnalyzer::ers{false};
0839 std::atomic<bool> edmtest::stream::RunSummaryIntAnalyzer::br{false};
0840 std::atomic<bool> edmtest::stream::RunSummaryIntAnalyzer::er{false};
0841 std::atomic<bool> edmtest::stream::LumiSummaryIntAnalyzer::gbl{false};
0842 std::atomic<bool> edmtest::stream::LumiSummaryIntAnalyzer::gel{false};
0843 std::atomic<bool> edmtest::stream::LumiSummaryIntAnalyzer::gbls{false};
0844 std::atomic<bool> edmtest::stream::LumiSummaryIntAnalyzer::gels{false};
0845 std::atomic<bool> edmtest::stream::LumiSummaryIntAnalyzer::bls{false};
0846 std::atomic<bool> edmtest::stream::LumiSummaryIntAnalyzer::els{false};
0847 std::atomic<bool> edmtest::stream::LumiSummaryIntAnalyzer::bl{false};
0848 std::atomic<bool> edmtest::stream::LumiSummaryIntAnalyzer::el{false};
0849 DEFINE_FWK_MODULE(edmtest::stream::GlobalIntAnalyzer);
0850 DEFINE_FWK_MODULE(edmtest::stream::RunIntAnalyzer);
0851 DEFINE_FWK_MODULE(edmtest::stream::LumiIntAnalyzer);
0852 DEFINE_FWK_MODULE(edmtest::stream::RunSummaryIntAnalyzer);
0853 DEFINE_FWK_MODULE(edmtest::stream::LumiSummaryIntAnalyzer);
0854 DEFINE_FWK_MODULE(edmtest::stream::ProcessBlockIntAnalyzer);
0855 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzer);
0856 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerG);
0857 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerNS);
0858 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerGNS);