Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-03 05:26:59

0001 
0002 /*----------------------------------------------------------------------
0003 
0004 Toy edm::stream::EDAnalyzer modules of
0005 edm::*Cache templates
0006 for testing purposes only.
0007 
0008 ----------------------------------------------------------------------*/
0009 #include <atomic>
0010 #include <functional>
0011 #include <iostream>
0012 #include <map>
0013 #include <tuple>
0014 #include <unistd.h>
0015 #include <vector>
0016 #include <chrono>
0017 
0018 #include "FWCore/Framework/interface/CacheHandle.h"
0019 #include "FWCore/Framework/interface/stream/EDAnalyzer.h"
0020 #include "FWCore/Framework/interface/maker/WorkerT.h"
0021 #include "FWCore/Framework/interface/HistoryAppender.h"
0022 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0023 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0024 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0025 #include "FWCore/Framework/interface/Event.h"
0026 #include "FWCore/Framework/interface/MakerMacros.h"
0027 #include "FWCore/Framework/interface/ProcessBlock.h"
0028 #include "FWCore/Framework/interface/TriggerNamesService.h"
0029 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0030 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0031 #include "FWCore/ServiceRegistry/interface/Service.h"
0032 #include "FWCore/Utilities/interface/EDMException.h"
0033 #include "DataFormats/Provenance/interface/BranchDescription.h"
0034 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0035 
0036 namespace edmtest {
0037   namespace stream {
0038 
0039     // anonymous namespace here causes build warnings
0040     namespace cache {
0041       struct Cache {
0042         Cache() : value(0), run(0), lumi(0) {}
0043         //Using mutable since we want to update the value.
0044         mutable std::atomic<unsigned int> value;
0045         mutable std::atomic<unsigned int> run;
0046         mutable std::atomic<unsigned int> lumi;
0047       };
0048       struct SummaryCache {
0049         // Intentionally not thread safe, not atomic
0050         unsigned int value = 0;
0051       };
0052       struct TestGlobalCacheAn {
0053         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenBegin_;
0054         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenEnd_;
0055         unsigned int trans_{0};
0056         mutable std::atomic<unsigned int> m_count{0};
0057       };
0058     }  // namespace cache
0059 
0060     using Cache = cache::Cache;
0061     using SummaryCache = cache::SummaryCache;
0062     using TestGlobalCacheAn = cache::TestGlobalCacheAn;
0063 
0064     class GlobalIntAnalyzer : public edm::stream::EDAnalyzer<edm::GlobalCache<Cache>> {
0065     public:
0066       static std::atomic<unsigned int> m_count;
0067       unsigned int trans_;
0068       static std::atomic<unsigned int> cvalue_;
0069 
0070       static std::unique_ptr<Cache> initializeGlobalCache(edm::ParameterSet const& p) {
0071         ++m_count;
0072         return std::make_unique<Cache>();
0073       }
0074 
0075       GlobalIntAnalyzer(edm::ParameterSet const& p, Cache const* iGlobal) {
0076         trans_ = p.getParameter<int>("transitions");
0077         cvalue_ = p.getParameter<int>("cachevalue");
0078         callWhenNewProductsRegistered([](edm::BranchDescription const& desc) {
0079           std::cout << "stream::GlobalIntAnalyzer " << desc.moduleLabel() << std::endl;
0080         });
0081       }
0082 
0083       static void globalBeginJob(Cache* iGlobal) {
0084         ++m_count;
0085         if (iGlobal->value != 0) {
0086           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
0087         }
0088       }
0089 
0090       void analyze(edm::Event const&, edm::EventSetup const&) {
0091         ++m_count;
0092         ++((globalCache())->value);
0093       }
0094 
0095       static void globalEndJob(Cache* iGlobal) {
0096         ++m_count;
0097         if (iGlobal->value != cvalue_) {
0098           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be " << cvalue_;
0099         }
0100       }
0101 
0102       ~GlobalIntAnalyzer() {
0103         if (m_count != trans_) {
0104           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0105         }
0106       }
0107     };
0108 
0109     class RunIntAnalyzer : public edm::stream::EDAnalyzer<edm::RunCache<Cache>, edm::stream::WatchRuns> {
0110     public:
0111       static std::atomic<unsigned int> m_count;
0112       unsigned int trans_;
0113       static std::atomic<unsigned int> cvalue_;
0114 
0115       RunIntAnalyzer(edm::ParameterSet const& p) {
0116         trans_ = p.getParameter<int>("transitions");
0117         cvalue_ = p.getParameter<int>("cachevalue");
0118         m_count = 0;
0119       }
0120 
0121       void analyze(edm::Event const&, edm::EventSetup const&) override {
0122         if (moduleDescription().processName() != edm::Service<edm::service::TriggerNamesService>()->getProcessName()) {
0123           throw cms::Exception("LogicError") << "module description not properly initialized in stream analyzer";
0124         }
0125         ++m_count;
0126         ++(runCache()->value);
0127       }
0128 
0129       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0130         ++m_count;
0131         auto pCache = std::make_shared<Cache>();
0132         pCache->run = iRun.runAuxiliary().run();
0133         return pCache;
0134       }
0135 
0136       void beginRun(edm::Run const& iRun, edm::EventSetup const&) override {
0137         if (runCache()->run != iRun.runAuxiliary().run()) {
0138           throw cms::Exception("begin out of sequence") << "beginRun seen before globalBeginRun";
0139         }
0140       }
0141 
0142       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0143         ++m_count;
0144         auto pCache = iContext->run();
0145         if (pCache->run != iRun.runAuxiliary().run()) {
0146           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0147         }
0148         pCache->run = 0;
0149         if (iContext->run()->value != cvalue_) {
0150           throw cms::Exception("cache value") << iContext->run()->value << " but it was supposed to be " << cvalue_;
0151         }
0152       }
0153 
0154       void endRun(edm::Run const& iRun, edm::EventSetup const&) override {
0155         if (runCache()->run != iRun.runAuxiliary().run()) {
0156           throw cms::Exception("end out of sequence") << "globalEndRun seen before endRun";
0157         }
0158       }
0159 
0160       ~RunIntAnalyzer() {
0161         if (m_count != trans_) {
0162           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0163         }
0164       }
0165     };
0166 
0167     class LumiIntAnalyzer
0168         : public edm::stream::EDAnalyzer<edm::LuminosityBlockCache<Cache>, edm::stream::WatchLuminosityBlocks> {
0169     public:
0170       static std::atomic<unsigned int> m_count;
0171       unsigned int trans_;
0172       static std::atomic<unsigned int> cvalue_;
0173 
0174       LumiIntAnalyzer(edm::ParameterSet const& p) {
0175         trans_ = p.getParameter<int>("transitions");
0176         cvalue_ = p.getParameter<int>("cachevalue");
0177         m_count = 0;
0178         // just to create a data dependence
0179         auto const& tag = p.getParameter<edm::InputTag>("moduleLabel");
0180         if (not tag.label().empty()) {
0181           consumes<unsigned int, edm::InLumi>(tag);
0182         }
0183       }
0184 
0185       void analyze(edm::Event const&, edm::EventSetup const&) override {
0186         ++m_count;
0187         ++(luminosityBlockCache()->value);
0188       }
0189 
0190       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0191                                                                edm::EventSetup const&,
0192                                                                RunContext const*) {
0193         ++m_count;
0194         auto pCache = std::make_shared<Cache>();
0195         pCache->run = iLB.luminosityBlockAuxiliary().run();
0196         pCache->lumi = iLB.luminosityBlockAuxiliary().luminosityBlock();
0197         return pCache;
0198       }
0199 
0200       void beginLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0201         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0202             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0203           throw cms::Exception("begin out of sequence")
0204               << "beginLuminosityBlock seen before globalBeginLuminosityBlock";
0205         }
0206       }
0207 
0208       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0209                                            edm::EventSetup const&,
0210                                            LuminosityBlockContext const* iLBContext) {
0211         ++m_count;
0212         auto pCache = iLBContext->luminosityBlock();
0213         if (pCache->run != iLB.luminosityBlockAuxiliary().run() ||
0214             pCache->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0215           throw cms::Exception("end out of sequence")
0216               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0217               << iLB.luminosityBlock();
0218         }
0219         pCache->run = 0;
0220         pCache->lumi = 0;
0221         if (iLBContext->luminosityBlock()->value != cvalue_) {
0222           throw cms::Exception("cache value")
0223               << iLBContext->luminosityBlock()->value << " but it was supposed to be " << cvalue_;
0224         }
0225       }
0226 
0227       void endLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0228         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0229             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0230           throw cms::Exception("end out of sequence") << "globalEndLuminosityBlock seen before endLuminosityBlock";
0231         }
0232       }
0233 
0234       ~LumiIntAnalyzer() {
0235         if (m_count != trans_) {
0236           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0237         }
0238       }
0239     };
0240 
0241     class RunSummaryIntAnalyzer : public edm::stream::EDAnalyzer<edm::RunCache<Cache>,
0242                                                                  edm::RunSummaryCache<SummaryCache>,
0243                                                                  edm::stream::WatchRuns> {
0244     public:
0245       static std::atomic<unsigned int> m_count;
0246       unsigned int trans_;
0247       static std::atomic<unsigned int> cvalue_;
0248       static std::atomic<bool> globalBeginRunCalled_;
0249       unsigned int valueAccumulatedForStream_ = 0;
0250       bool endRunWasCalled_ = false;
0251 
0252       RunSummaryIntAnalyzer(edm::ParameterSet const& p) {
0253         trans_ = p.getParameter<int>("transitions");
0254         cvalue_ = p.getParameter<int>("cachevalue");
0255         m_count = 0;
0256       }
0257 
0258       void analyze(edm::Event const&, edm::EventSetup const&) override {
0259         ++m_count;
0260         ++(runCache()->value);
0261         ++valueAccumulatedForStream_;
0262       }
0263 
0264       void beginRun(edm::Run const&, edm::EventSetup const&) override {
0265         valueAccumulatedForStream_ = 0;
0266         endRunWasCalled_ = false;
0267       }
0268 
0269       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0270         ++m_count;
0271         globalBeginRunCalled_ = true;
0272         auto pCache = std::make_shared<Cache>();
0273         ++(pCache->run);
0274         return pCache;
0275       }
0276 
0277       static std::shared_ptr<SummaryCache> globalBeginRunSummary(edm::Run const&,
0278                                                                  edm::EventSetup const&,
0279                                                                  GlobalCache const*) {
0280         ++m_count;
0281         if (!globalBeginRunCalled_) {
0282           throw cms::Exception("begin out of sequence") << "globalBeginRunSummary seen before globalBeginRun";
0283         }
0284         return std::make_shared<SummaryCache>();
0285       }
0286 
0287       void endRunSummary(edm::Run const&, edm::EventSetup const&, SummaryCache* runSummaryCache) const override {
0288         runSummaryCache->value += valueAccumulatedForStream_;
0289         if (!endRunWasCalled_) {
0290           throw cms::Exception("end out of sequence") << "endRunSummary seen before endRun";
0291         }
0292       }
0293 
0294       static void globalEndRunSummary(edm::Run const&,
0295                                       edm::EventSetup const&,
0296                                       RunContext const*,
0297                                       SummaryCache* runSummaryCache) {
0298         ++m_count;
0299         if (runSummaryCache->value != cvalue_) {
0300           throw cms::Exception("unexpectedValue")
0301               << "run summary cache value = " << runSummaryCache->value << " but it was supposed to be " << cvalue_;
0302         }
0303       }
0304 
0305       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0306         ++m_count;
0307         auto pCache = iContext->run();
0308         if (pCache->value != cvalue_) {
0309           throw cms::Exception("unExpectedValue")
0310               << "run cache value " << pCache->value << " but it was supposed to be " << cvalue_;
0311         }
0312         if (pCache->run != 1) {
0313           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0314         }
0315       }
0316 
0317       void endRun(edm::Run const&, edm::EventSetup const&) override { endRunWasCalled_ = true; }
0318 
0319       ~RunSummaryIntAnalyzer() {
0320         if (m_count != trans_) {
0321           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0322         }
0323       }
0324     };
0325 
0326     class LumiSummaryIntAnalyzer : public edm::stream::EDAnalyzer<edm::LuminosityBlockCache<Cache>,
0327                                                                   edm::LuminosityBlockSummaryCache<SummaryCache>,
0328                                                                   edm::stream::WatchLuminosityBlocks> {
0329     public:
0330       static std::atomic<unsigned int> m_count;
0331       unsigned int trans_;
0332       static std::atomic<unsigned int> cvalue_;
0333       static std::atomic<bool> globalBeginLumiCalled_;
0334       unsigned int valueAccumulatedForStream_ = 0;
0335       bool endLumiWasCalled_ = false;
0336 
0337       LumiSummaryIntAnalyzer(edm::ParameterSet const& p) {
0338         trans_ = p.getParameter<int>("transitions");
0339         cvalue_ = p.getParameter<int>("cachevalue");
0340         m_count = 0;
0341       }
0342 
0343       void analyze(edm::Event const&, edm::EventSetup const&) override {
0344         ++m_count;
0345         ++(luminosityBlockCache()->value);
0346         ++valueAccumulatedForStream_;
0347       }
0348 
0349       void beginLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0350         valueAccumulatedForStream_ = 0;
0351         endLumiWasCalled_ = false;
0352       }
0353 
0354       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0355                                                                edm::EventSetup const&,
0356                                                                RunContext const*) {
0357         ++m_count;
0358         globalBeginLumiCalled_ = true;
0359         auto pCache = std::make_shared<Cache>();
0360         ++(pCache->lumi);
0361         return pCache;
0362       }
0363 
0364       static std::shared_ptr<SummaryCache> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
0365                                                                              edm::EventSetup const&,
0366                                                                              LuminosityBlockContext const*) {
0367         ++m_count;
0368         if (!globalBeginLumiCalled_) {
0369           throw cms::Exception("begin out of sequence")
0370               << "globalBeginLuminosityBlockSummary seen before globalBeginLuminosityBlock";
0371         }
0372         globalBeginLumiCalled_ = false;
0373         return std::make_shared<SummaryCache>();
0374       }
0375 
0376       void endLuminosityBlockSummary(edm::LuminosityBlock const&,
0377                                      edm::EventSetup const&,
0378                                      SummaryCache* lumiSummaryCache) const override {
0379         lumiSummaryCache->value += valueAccumulatedForStream_;
0380         if (!endLumiWasCalled_) {
0381           throw cms::Exception("end out of sequence") << "endLuminosityBlockSummary seen before endLuminosityBlock";
0382         }
0383       }
0384 
0385       static void globalEndLuminosityBlockSummary(edm::LuminosityBlock const&,
0386                                                   edm::EventSetup const&,
0387                                                   LuminosityBlockContext const* iLBContext,
0388                                                   SummaryCache* lumiSummaryCache) {
0389         ++m_count;
0390         if (lumiSummaryCache->value != cvalue_) {
0391           throw cms::Exception("unexpectedValue")
0392               << "lumi summary cache value = " << lumiSummaryCache->value << " but it was supposed to be " << cvalue_;
0393         }
0394         auto pCache = iLBContext->luminosityBlock();
0395         // Add one so globalEndLuminosityBlock can check this function was called first
0396         ++pCache->value;
0397       }
0398 
0399       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0400                                            edm::EventSetup const&,
0401                                            LuminosityBlockContext const* iLBContext) {
0402         ++m_count;
0403         auto pCache = iLBContext->luminosityBlock();
0404         if (pCache->value != cvalue_ + 1) {
0405           throw cms::Exception("unexpectedValue")
0406               << "lumi cache value " << pCache->value << " but it was supposed to be " << cvalue_ + 1;
0407         }
0408         if (pCache->lumi != 1) {
0409           throw cms::Exception("end out of sequence")
0410               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0411               << iLB.luminosityBlock();
0412         }
0413       }
0414 
0415       void endLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0416         endLumiWasCalled_ = true;
0417       }
0418 
0419       ~LumiSummaryIntAnalyzer() {
0420         if (m_count != trans_) {
0421           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0422         }
0423       }
0424     };
0425 
0426     class ProcessBlockIntAnalyzer
0427         : public edm::stream::EDAnalyzer<edm::WatchProcessBlock, edm::GlobalCache<TestGlobalCacheAn>> {
0428     public:
0429       explicit ProcessBlockIntAnalyzer(edm::ParameterSet const& pset, TestGlobalCacheAn const* testGlobalCache) {
0430         {
0431           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0432           if (not tag.label().empty()) {
0433             testGlobalCache->getTokenBegin_ = consumes<unsigned int, edm::InProcess>(tag);
0434           }
0435         }
0436         {
0437           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0438           if (not tag.label().empty()) {
0439             testGlobalCache->getTokenEnd_ = consumes<unsigned int, edm::InProcess>(tag);
0440           }
0441         }
0442       }
0443 
0444       static std::unique_ptr<TestGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const& pset) {
0445         auto testGlobalCache = std::make_unique<TestGlobalCacheAn>();
0446         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0447         return testGlobalCache;
0448       }
0449 
0450       static void beginProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheAn* testGlobalCache) {
0451         if (testGlobalCache->m_count != 0) {
0452           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer::begin transitions "
0453                                               << testGlobalCache->m_count << " but it was supposed to be " << 0;
0454         }
0455         ++testGlobalCache->m_count;
0456 
0457         const unsigned int valueToGet = 51;
0458         if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0459           if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0460             throw cms::Exception("BadValue")
0461                 << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0462           }
0463         }
0464       }
0465 
0466       void analyze(edm::Event const&, edm::EventSetup const&) override {
0467         TestGlobalCacheAn const* testGlobalCache = globalCache();
0468         if (testGlobalCache->m_count < 1u) {
0469           throw cms::Exception("out of sequence") << "produce before beginProcessBlock " << testGlobalCache->m_count;
0470         }
0471         ++testGlobalCache->m_count;
0472       }
0473 
0474       static void endProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheAn* testGlobalCache) {
0475         ++testGlobalCache->m_count;
0476         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0477           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer::end transitions " << testGlobalCache->m_count
0478                                               << " but it was supposed to be " << testGlobalCache->trans_;
0479         }
0480         {
0481           const unsigned int valueToGet = 51;
0482           if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0483             if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0484               throw cms::Exception("BadValue")
0485                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0486             }
0487           }
0488         }
0489         {
0490           const unsigned int valueToGet = 61;
0491           if (not testGlobalCache->getTokenEnd_.isUninitialized()) {
0492             if (processBlock.get(testGlobalCache->getTokenEnd_) != valueToGet) {
0493               throw cms::Exception("BadValue")
0494                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenEnd_);
0495             }
0496           }
0497         }
0498       }
0499 
0500       static void globalEndJob(TestGlobalCacheAn* testGlobalCache) {
0501         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0502           throw cms::Exception("transitions")
0503               << "TestBeginProcessBlockAnalyzer transitions " << testGlobalCache->m_count
0504               << " but it was supposed to be " << testGlobalCache->trans_;
0505         }
0506       }
0507 
0508       ~ProcessBlockIntAnalyzer() {
0509         TestGlobalCacheAn const* testGlobalCache = globalCache();
0510         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0511           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer transitions " << testGlobalCache->m_count
0512                                               << " but it was supposed to be " << testGlobalCache->trans_;
0513         }
0514       }
0515     };
0516 
0517     class TestInputProcessBlockCache {
0518     public:
0519       long long int value_ = 0;
0520     };
0521 
0522     class TestInputProcessBlockCache1 {
0523     public:
0524       long long int value_ = 0;
0525     };
0526 
0527     class InputProcessBlockIntAnalyzer
0528         : public edm::stream::EDAnalyzer<
0529               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>> {
0530     public:
0531       explicit InputProcessBlockIntAnalyzer(edm::ParameterSet const& pset) {
0532         {
0533           expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
0534           sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
0535           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0536           if (not tag.label().empty()) {
0537             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0538           }
0539         }
0540         {
0541           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0542           if (not tag.label().empty()) {
0543             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0544           }
0545         }
0546         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
0547             getTokenBegin_,
0548             [this](edm::ProcessBlock const& processBlock,
0549                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
0550               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
0551               returnValue->value_ += processBlock.get(getTokenBegin_).value;
0552               returnValue->value_ += processBlock.get(getTokenEnd_).value;
0553               return returnValue;
0554             });
0555       }
0556 
0557       static void accessInputProcessBlock(edm::ProcessBlock const&) {
0558         edm::LogAbsolute("InputProcessBlockIntAnalyzer") << "InputProcessBlockIntAnalyzer::accessInputProcessBlock";
0559       }
0560 
0561       void analyze(edm::Event const& event, edm::EventSetup const&) override {
0562         auto cacheTuple = processBlockCaches(event);
0563         if (!expectedByRun_.empty()) {
0564           if (expectedByRun_.at(event.run() - 1) !=
0565               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
0566             throw cms::Exception("UnexpectedValue")
0567                 << "InputProcessBlockIntAnalyzer::analyze cached value was "
0568                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
0569                 << " but it was supposed to be " << expectedByRun_.at(event.run() - 1);
0570           }
0571         }
0572         // Force events to be processed concurrently
0573         if (sleepTime_ > 0) {
0574           std::this_thread::sleep_for(std::chrono::microseconds(sleepTime_));
0575         }
0576       }
0577 
0578     private:
0579       edm::EDGetTokenT<IntProduct> getTokenBegin_;
0580       edm::EDGetTokenT<IntProduct> getTokenEnd_;
0581       std::vector<int> expectedByRun_;
0582       unsigned int sleepTime_{0};
0583     };
0584 
0585     struct InputProcessBlockGlobalCacheAn {
0586       // The tokens are duplicated in this test module to prove that they
0587       // work both as GlobalCache members and module data members.
0588       // We need them as GlobalCache members for accessInputProcessBlock.
0589       // In registerProcessBlockCacheFiller we use tokens that are member
0590       // variables of the class and because the lambda captures the "this"
0591       // pointer of the zeroth stream module instance. We always
0592       // use the zeroth EDConsumer. In the case of registerProcessBlockCacheFiller,
0593       // either set of tokens would work. Note that in the GlobalCache case
0594       // there is a slight weirdness that the zeroth consumer is used but
0595       // the token comes from the last consumer instance. It works because
0596       // all the stream module instances have EDConsumer base classes with
0597       // containers with the same contents in the same order (not 100% guaranteed,
0598       // but it would be difficult to implement a module where this isn't true).
0599       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBegin_;
0600       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEnd_;
0601       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBeginM_;
0602       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEndM_;
0603       mutable std::atomic<unsigned int> transitions_{0};
0604       int sum_{0};
0605       unsigned int expectedTransitions_{0};
0606       std::vector<int> expectedByRun_;
0607       int expectedSum_{0};
0608       unsigned int sleepTime_{0};
0609     };
0610 
0611     // Same thing as previous class except with a GlobalCache added
0612     class InputProcessBlockIntAnalyzerG
0613         : public edm::stream::EDAnalyzer<
0614               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>,
0615               edm::GlobalCache<InputProcessBlockGlobalCacheAn>> {
0616     public:
0617       explicit InputProcessBlockIntAnalyzerG(edm::ParameterSet const& pset,
0618                                              InputProcessBlockGlobalCacheAn const* testGlobalCache) {
0619         {
0620           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0621           if (not tag.label().empty()) {
0622             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0623             testGlobalCache->getTokenBegin_ = getTokenBegin_;
0624           }
0625         }
0626         {
0627           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0628           if (not tag.label().empty()) {
0629             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0630             testGlobalCache->getTokenEnd_ = getTokenEnd_;
0631           }
0632         }
0633         {
0634           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlockM");
0635           if (not tag.label().empty()) {
0636             getTokenBeginM_ = consumes<IntProduct, edm::InProcess>(tag);
0637             testGlobalCache->getTokenBeginM_ = getTokenBeginM_;
0638           }
0639         }
0640         {
0641           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlockM");
0642           if (not tag.label().empty()) {
0643             getTokenEndM_ = consumes<IntProduct, edm::InProcess>(tag);
0644             testGlobalCache->getTokenEndM_ = getTokenEndM_;
0645           }
0646         }
0647         registerProcessBlockCacheFiller<int>(
0648             getTokenBegin_, [this](edm::ProcessBlock const& processBlock, std::shared_ptr<int> const& previousCache) {
0649               auto returnValue = std::make_shared<int>(0);
0650               *returnValue += processBlock.get(getTokenBegin_).value;
0651               *returnValue += processBlock.get(getTokenEnd_).value;
0652               ++globalCache()->transitions_;
0653               return returnValue;
0654             });
0655         registerProcessBlockCacheFiller<1>(getTokenBegin_,
0656                                            [this](edm::ProcessBlock const& processBlock,
0657                                                   std::shared_ptr<TestInputProcessBlockCache> const& previousCache) {
0658                                              auto returnValue = std::make_shared<TestInputProcessBlockCache>();
0659                                              returnValue->value_ += processBlock.get(getTokenBegin_).value;
0660                                              returnValue->value_ += processBlock.get(getTokenEnd_).value;
0661                                              ++globalCache()->transitions_;
0662                                              return returnValue;
0663                                            });
0664         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
0665             getTokenBegin_,
0666             [this](edm::ProcessBlock const& processBlock,
0667                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
0668               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
0669               returnValue->value_ += processBlock.get(getTokenBegin_).value;
0670               returnValue->value_ += processBlock.get(getTokenEnd_).value;
0671               ++globalCache()->transitions_;
0672               return returnValue;
0673             });
0674       }
0675 
0676       static std::unique_ptr<InputProcessBlockGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const& pset) {
0677         auto testGlobalCache = std::make_unique<InputProcessBlockGlobalCacheAn>();
0678         testGlobalCache->expectedTransitions_ = pset.getParameter<int>("transitions");
0679         testGlobalCache->expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
0680         testGlobalCache->expectedSum_ = pset.getParameter<int>("expectedSum");
0681         testGlobalCache->sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
0682         return testGlobalCache;
0683       }
0684 
0685       static void accessInputProcessBlock(edm::ProcessBlock const& processBlock,
0686                                           InputProcessBlockGlobalCacheAn* testGlobalCache) {
0687         if (processBlock.processName() == "PROD1") {
0688           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBegin_).value;
0689           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEnd_).value;
0690         }
0691         if (processBlock.processName() == "MERGE") {
0692           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBeginM_).value;
0693           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEndM_).value;
0694         }
0695         ++testGlobalCache->transitions_;
0696       }
0697 
0698       void analyze(edm::Event const& event, edm::EventSetup const&) override {
0699         auto cacheTuple = processBlockCaches(event);
0700         auto testGlobalCache = globalCache();
0701         if (!testGlobalCache->expectedByRun_.empty()) {
0702           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != *std::get<edm::CacheHandle<int>>(cacheTuple)) {
0703             throw cms::Exception("UnexpectedValue")
0704                 << "InputProcessBlockIntAnalyzerG::analyze cached value was "
0705                 << *std::get<edm::CacheHandle<int>>(cacheTuple) << " but it was supposed to be "
0706                 << testGlobalCache->expectedByRun_.at(event.run() - 1);
0707           }
0708           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != std::get<1>(cacheTuple)->value_) {
0709             throw cms::Exception("UnexpectedValue")
0710                 << "InputProcessBlockIntAnalyzerG::analyze second cached value was " << std::get<1>(cacheTuple)->value_
0711                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
0712           }
0713           if (testGlobalCache->expectedByRun_.at(event.run() - 1) !=
0714               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
0715             throw cms::Exception("UnexpectedValue")
0716                 << "InputProcessBlockIntAnalyzerG::analyze third cached value was "
0717                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
0718                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
0719           }
0720         }
0721         ++testGlobalCache->transitions_;
0722 
0723         // Force events to be processed concurrently
0724         if (testGlobalCache->sleepTime_ > 0) {
0725           std::this_thread::sleep_for(std::chrono::microseconds(testGlobalCache->sleepTime_));
0726         }
0727       }
0728 
0729       static void globalEndJob(InputProcessBlockGlobalCacheAn* testGlobalCache) {
0730         if (testGlobalCache->transitions_ != testGlobalCache->expectedTransitions_) {
0731           throw cms::Exception("transitions")
0732               << "InputProcessBlockIntAnalyzerG transitions " << testGlobalCache->transitions_
0733               << " but it was supposed to be " << testGlobalCache->expectedTransitions_;
0734         }
0735 
0736         if (testGlobalCache->sum_ != testGlobalCache->expectedSum_) {
0737           throw cms::Exception("UnexpectedValue") << "InputProcessBlockIntAnalyzerG sum " << testGlobalCache->sum_
0738                                                   << " but it was supposed to be " << testGlobalCache->expectedSum_;
0739         }
0740       }
0741 
0742     private:
0743       edm::EDGetTokenT<IntProduct> getTokenBegin_;
0744       edm::EDGetTokenT<IntProduct> getTokenEnd_;
0745       edm::EDGetTokenT<IntProduct> getTokenBeginM_;
0746       edm::EDGetTokenT<IntProduct> getTokenEndM_;
0747     };
0748 
0749     // The next two test that modules without the
0750     // static accessInputProcessBlock function will build.
0751     // And also that modules with no functor registered run.
0752 
0753     class InputProcessBlockIntAnalyzerNS
0754         : public edm::stream::EDAnalyzer<edm::InputProcessBlockCache<int, TestInputProcessBlockCache>> {
0755     public:
0756       explicit InputProcessBlockIntAnalyzerNS(edm::ParameterSet const& pset) {}
0757       void analyze(edm::Event const&, edm::EventSetup const&) override {}
0758     };
0759 
0760     // Same thing as previous class except with a GlobalCache added
0761     class InputProcessBlockIntAnalyzerGNS
0762         : public edm::stream::EDAnalyzer<edm::InputProcessBlockCache<int, TestInputProcessBlockCache>,
0763                                          edm::GlobalCache<TestGlobalCacheAn>> {
0764     public:
0765       explicit InputProcessBlockIntAnalyzerGNS(edm::ParameterSet const& pset,
0766                                                TestGlobalCacheAn const* testGlobalCache) {}
0767       static std::unique_ptr<TestGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const&) {
0768         return std::make_unique<TestGlobalCacheAn>();
0769       }
0770       void analyze(edm::Event const&, edm::EventSetup const&) override {}
0771       static void globalEndJob(TestGlobalCacheAn* testGlobalCache) {}
0772     };
0773 
0774   }  // namespace stream
0775 }  // namespace edmtest
0776 std::atomic<unsigned int> edmtest::stream::GlobalIntAnalyzer::m_count{0};
0777 std::atomic<unsigned int> edmtest::stream::RunIntAnalyzer::m_count{0};
0778 std::atomic<unsigned int> edmtest::stream::LumiIntAnalyzer::m_count{0};
0779 std::atomic<unsigned int> edmtest::stream::RunSummaryIntAnalyzer::m_count{0};
0780 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntAnalyzer::m_count{0};
0781 std::atomic<unsigned int> edmtest::stream::GlobalIntAnalyzer::cvalue_{0};
0782 std::atomic<unsigned int> edmtest::stream::RunIntAnalyzer::cvalue_{0};
0783 std::atomic<unsigned int> edmtest::stream::LumiIntAnalyzer::cvalue_{0};
0784 std::atomic<unsigned int> edmtest::stream::RunSummaryIntAnalyzer::cvalue_{0};
0785 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntAnalyzer::cvalue_{0};
0786 std::atomic<bool> edmtest::stream::RunSummaryIntAnalyzer::globalBeginRunCalled_{false};
0787 std::atomic<bool> edmtest::stream::LumiSummaryIntAnalyzer::globalBeginLumiCalled_{false};
0788 DEFINE_FWK_MODULE(edmtest::stream::GlobalIntAnalyzer);
0789 DEFINE_FWK_MODULE(edmtest::stream::RunIntAnalyzer);
0790 DEFINE_FWK_MODULE(edmtest::stream::LumiIntAnalyzer);
0791 DEFINE_FWK_MODULE(edmtest::stream::RunSummaryIntAnalyzer);
0792 DEFINE_FWK_MODULE(edmtest::stream::LumiSummaryIntAnalyzer);
0793 DEFINE_FWK_MODULE(edmtest::stream::ProcessBlockIntAnalyzer);
0794 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzer);
0795 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerG);
0796 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerNS);
0797 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerGNS);