Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-10-13 03:38:02

0001 
0002 /*----------------------------------------------------------------------
0003 
0004 Toy edm::stream::EDAnalyzer modules of
0005 edm::*Cache templates
0006 for testing purposes only.
0007 
0008 ----------------------------------------------------------------------*/
0009 #include <atomic>
0010 #include <functional>
0011 #include <iostream>
0012 #include <map>
0013 #include <tuple>
0014 #include <unistd.h>
0015 #include <vector>
0016 
0017 #include "FWCore/Framework/interface/CacheHandle.h"
0018 #include "FWCore/Framework/interface/stream/EDAnalyzer.h"
0019 #include "FWCore/Framework/interface/maker/WorkerT.h"
0020 #include "FWCore/Framework/interface/HistoryAppender.h"
0021 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0022 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0023 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0024 #include "FWCore/Framework/interface/Event.h"
0025 #include "FWCore/Framework/interface/MakerMacros.h"
0026 #include "FWCore/Framework/interface/ProcessBlock.h"
0027 #include "FWCore/Framework/interface/TriggerNamesService.h"
0028 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0030 #include "FWCore/ServiceRegistry/interface/Service.h"
0031 #include "FWCore/Utilities/interface/EDMException.h"
0032 #include "DataFormats/Provenance/interface/BranchDescription.h"
0033 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0034 
0035 namespace edmtest {
0036   namespace stream {
0037 
0038     // anonymous namespace here causes build warnings
0039     namespace cache {
0040       struct Cache {
0041         Cache() : value(0), run(0), lumi(0) {}
0042         //Using mutable since we want to update the value.
0043         mutable std::atomic<unsigned int> value;
0044         mutable std::atomic<unsigned int> run;
0045         mutable std::atomic<unsigned int> lumi;
0046       };
0047       struct SummaryCache {
0048         // Intentionally not thread safe, not atomic
0049         unsigned int value = 0;
0050       };
0051       struct TestGlobalCacheAn {
0052         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenBegin_;
0053         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenEnd_;
0054         unsigned int trans_{0};
0055         mutable std::atomic<unsigned int> m_count{0};
0056       };
0057     }  // namespace cache
0058 
0059     using Cache = cache::Cache;
0060     using SummaryCache = cache::SummaryCache;
0061     using TestGlobalCacheAn = cache::TestGlobalCacheAn;
0062 
0063     class GlobalIntAnalyzer : public edm::stream::EDAnalyzer<edm::GlobalCache<Cache>> {
0064     public:
0065       static std::atomic<unsigned int> m_count;
0066       unsigned int trans_;
0067       static std::atomic<unsigned int> cvalue_;
0068 
0069       static std::unique_ptr<Cache> initializeGlobalCache(edm::ParameterSet const& p) {
0070         ++m_count;
0071         return std::make_unique<Cache>();
0072       }
0073 
0074       GlobalIntAnalyzer(edm::ParameterSet const& p, Cache const* iGlobal) {
0075         trans_ = p.getParameter<int>("transitions");
0076         cvalue_ = p.getParameter<int>("cachevalue");
0077         callWhenNewProductsRegistered([](edm::BranchDescription const& desc) {
0078           std::cout << "stream::GlobalIntAnalyzer " << desc.moduleLabel() << std::endl;
0079         });
0080       }
0081 
0082       static void globalBeginJob(Cache* iGlobal) {
0083         ++m_count;
0084         if (iGlobal->value != 0) {
0085           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
0086         }
0087       }
0088 
0089       void analyze(edm::Event const&, edm::EventSetup const&) {
0090         ++m_count;
0091         ++((globalCache())->value);
0092       }
0093 
0094       static void globalEndJob(Cache* iGlobal) {
0095         ++m_count;
0096         if (iGlobal->value != cvalue_) {
0097           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be " << cvalue_;
0098         }
0099       }
0100 
0101       ~GlobalIntAnalyzer() {
0102         if (m_count != trans_) {
0103           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0104         }
0105       }
0106     };
0107 
0108     class RunIntAnalyzer : public edm::stream::EDAnalyzer<edm::RunCache<Cache>> {
0109     public:
0110       static std::atomic<unsigned int> m_count;
0111       unsigned int trans_;
0112       static std::atomic<unsigned int> cvalue_;
0113 
0114       RunIntAnalyzer(edm::ParameterSet const& p) {
0115         trans_ = p.getParameter<int>("transitions");
0116         cvalue_ = p.getParameter<int>("cachevalue");
0117         m_count = 0;
0118       }
0119 
0120       void analyze(edm::Event const&, edm::EventSetup const&) override {
0121         if (moduleDescription().processName() != edm::Service<edm::service::TriggerNamesService>()->getProcessName()) {
0122           throw cms::Exception("LogicError") << "module description not properly initialized in stream analyzer";
0123         }
0124         ++m_count;
0125         ++(runCache()->value);
0126       }
0127 
0128       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0129         ++m_count;
0130         auto pCache = std::make_shared<Cache>();
0131         pCache->run = iRun.runAuxiliary().run();
0132         return pCache;
0133       }
0134 
0135       void beginRun(edm::Run const& iRun, edm::EventSetup const&) override {
0136         if (runCache()->run != iRun.runAuxiliary().run()) {
0137           throw cms::Exception("begin out of sequence") << "beginRun seen before globalBeginRun";
0138         }
0139       }
0140 
0141       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0142         ++m_count;
0143         auto pCache = iContext->run();
0144         if (pCache->run != iRun.runAuxiliary().run()) {
0145           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0146         }
0147         pCache->run = 0;
0148         if (iContext->run()->value != cvalue_) {
0149           throw cms::Exception("cache value") << iContext->run()->value << " but it was supposed to be " << cvalue_;
0150         }
0151       }
0152 
0153       void endRun(edm::Run const& iRun, edm::EventSetup const&) override {
0154         if (runCache()->run != iRun.runAuxiliary().run()) {
0155           throw cms::Exception("end out of sequence") << "globalEndRun seen before endRun";
0156         }
0157       }
0158 
0159       ~RunIntAnalyzer() {
0160         if (m_count != trans_) {
0161           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0162         }
0163       }
0164     };
0165 
0166     class LumiIntAnalyzer : public edm::stream::EDAnalyzer<edm::LuminosityBlockCache<Cache>> {
0167     public:
0168       static std::atomic<unsigned int> m_count;
0169       unsigned int trans_;
0170       static std::atomic<unsigned int> cvalue_;
0171 
0172       LumiIntAnalyzer(edm::ParameterSet const& p) {
0173         trans_ = p.getParameter<int>("transitions");
0174         cvalue_ = p.getParameter<int>("cachevalue");
0175         m_count = 0;
0176         // just to create a data dependence
0177         auto const& tag = p.getParameter<edm::InputTag>("moduleLabel");
0178         if (not tag.label().empty()) {
0179           consumes<unsigned int, edm::InLumi>(tag);
0180         }
0181       }
0182 
0183       void analyze(edm::Event const&, edm::EventSetup const&) override {
0184         ++m_count;
0185         ++(luminosityBlockCache()->value);
0186       }
0187 
0188       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0189                                                                edm::EventSetup const&,
0190                                                                RunContext const*) {
0191         ++m_count;
0192         auto pCache = std::make_shared<Cache>();
0193         pCache->run = iLB.luminosityBlockAuxiliary().run();
0194         pCache->lumi = iLB.luminosityBlockAuxiliary().luminosityBlock();
0195         return pCache;
0196       }
0197 
0198       void beginLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0199         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0200             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0201           throw cms::Exception("begin out of sequence")
0202               << "beginLuminosityBlock seen before globalBeginLuminosityBlock";
0203         }
0204       }
0205 
0206       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0207                                            edm::EventSetup const&,
0208                                            LuminosityBlockContext const* iLBContext) {
0209         ++m_count;
0210         auto pCache = iLBContext->luminosityBlock();
0211         if (pCache->run != iLB.luminosityBlockAuxiliary().run() ||
0212             pCache->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0213           throw cms::Exception("end out of sequence")
0214               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0215               << iLB.luminosityBlock();
0216         }
0217         pCache->run = 0;
0218         pCache->lumi = 0;
0219         if (iLBContext->luminosityBlock()->value != cvalue_) {
0220           throw cms::Exception("cache value")
0221               << iLBContext->luminosityBlock()->value << " but it was supposed to be " << cvalue_;
0222         }
0223       }
0224 
0225       void endLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0226         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0227             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0228           throw cms::Exception("end out of sequence") << "globalEndLuminosityBlock seen before endLuminosityBlock";
0229         }
0230       }
0231 
0232       ~LumiIntAnalyzer() {
0233         if (m_count != trans_) {
0234           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0235         }
0236       }
0237     };
0238 
0239     class RunSummaryIntAnalyzer
0240         : public edm::stream::EDAnalyzer<edm::RunCache<Cache>, edm::RunSummaryCache<SummaryCache>> {
0241     public:
0242       static std::atomic<unsigned int> m_count;
0243       unsigned int trans_;
0244       static std::atomic<unsigned int> cvalue_;
0245       static std::atomic<bool> globalBeginRunCalled_;
0246       unsigned int valueAccumulatedForStream_ = 0;
0247       bool endRunWasCalled_ = false;
0248 
0249       RunSummaryIntAnalyzer(edm::ParameterSet const& p) {
0250         trans_ = p.getParameter<int>("transitions");
0251         cvalue_ = p.getParameter<int>("cachevalue");
0252         m_count = 0;
0253       }
0254 
0255       void analyze(edm::Event const&, edm::EventSetup const&) override {
0256         ++m_count;
0257         ++(runCache()->value);
0258         ++valueAccumulatedForStream_;
0259       }
0260 
0261       void beginRun(edm::Run const&, edm::EventSetup const&) override {
0262         valueAccumulatedForStream_ = 0;
0263         endRunWasCalled_ = false;
0264       }
0265 
0266       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0267         ++m_count;
0268         globalBeginRunCalled_ = true;
0269         auto pCache = std::make_shared<Cache>();
0270         ++(pCache->run);
0271         return pCache;
0272       }
0273 
0274       static std::shared_ptr<SummaryCache> globalBeginRunSummary(edm::Run const&,
0275                                                                  edm::EventSetup const&,
0276                                                                  GlobalCache const*) {
0277         ++m_count;
0278         if (!globalBeginRunCalled_) {
0279           throw cms::Exception("begin out of sequence") << "globalBeginRunSummary seen before globalBeginRun";
0280         }
0281         return std::make_shared<SummaryCache>();
0282       }
0283 
0284       void endRunSummary(edm::Run const&, edm::EventSetup const&, SummaryCache* runSummaryCache) const override {
0285         runSummaryCache->value += valueAccumulatedForStream_;
0286         if (!endRunWasCalled_) {
0287           throw cms::Exception("end out of sequence") << "endRunSummary seen before endRun";
0288         }
0289       }
0290 
0291       static void globalEndRunSummary(edm::Run const&,
0292                                       edm::EventSetup const&,
0293                                       RunContext const*,
0294                                       SummaryCache* runSummaryCache) {
0295         ++m_count;
0296         if (runSummaryCache->value != cvalue_) {
0297           throw cms::Exception("unexpectedValue")
0298               << "run summary cache value = " << runSummaryCache->value << " but it was supposed to be " << cvalue_;
0299         }
0300       }
0301 
0302       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0303         ++m_count;
0304         auto pCache = iContext->run();
0305         if (pCache->value != cvalue_) {
0306           throw cms::Exception("unExpectedValue")
0307               << "run cache value " << pCache->value << " but it was supposed to be " << cvalue_;
0308         }
0309         if (pCache->run != 1) {
0310           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0311         }
0312       }
0313 
0314       void endRun(edm::Run const&, edm::EventSetup const&) override { endRunWasCalled_ = true; }
0315 
0316       ~RunSummaryIntAnalyzer() {
0317         if (m_count != trans_) {
0318           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0319         }
0320       }
0321     };
0322 
0323     class LumiSummaryIntAnalyzer : public edm::stream::EDAnalyzer<edm::LuminosityBlockCache<Cache>,
0324                                                                   edm::LuminosityBlockSummaryCache<SummaryCache>> {
0325     public:
0326       static std::atomic<unsigned int> m_count;
0327       unsigned int trans_;
0328       static std::atomic<unsigned int> cvalue_;
0329       static std::atomic<bool> globalBeginLumiCalled_;
0330       unsigned int valueAccumulatedForStream_ = 0;
0331       bool endLumiWasCalled_ = false;
0332 
0333       LumiSummaryIntAnalyzer(edm::ParameterSet const& p) {
0334         trans_ = p.getParameter<int>("transitions");
0335         cvalue_ = p.getParameter<int>("cachevalue");
0336         m_count = 0;
0337       }
0338 
0339       void analyze(edm::Event const&, edm::EventSetup const&) override {
0340         ++m_count;
0341         ++(luminosityBlockCache()->value);
0342         ++valueAccumulatedForStream_;
0343       }
0344 
0345       void beginLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0346         valueAccumulatedForStream_ = 0;
0347         endLumiWasCalled_ = false;
0348       }
0349 
0350       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0351                                                                edm::EventSetup const&,
0352                                                                RunContext const*) {
0353         ++m_count;
0354         globalBeginLumiCalled_ = true;
0355         auto pCache = std::make_shared<Cache>();
0356         ++(pCache->lumi);
0357         return pCache;
0358       }
0359 
0360       static std::shared_ptr<SummaryCache> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
0361                                                                              edm::EventSetup const&,
0362                                                                              LuminosityBlockContext const*) {
0363         ++m_count;
0364         if (!globalBeginLumiCalled_) {
0365           throw cms::Exception("begin out of sequence")
0366               << "globalBeginLuminosityBlockSummary seen before globalBeginLuminosityBlock";
0367         }
0368         globalBeginLumiCalled_ = false;
0369         return std::make_shared<SummaryCache>();
0370       }
0371 
0372       void endLuminosityBlockSummary(edm::LuminosityBlock const&,
0373                                      edm::EventSetup const&,
0374                                      SummaryCache* lumiSummaryCache) const override {
0375         lumiSummaryCache->value += valueAccumulatedForStream_;
0376         if (!endLumiWasCalled_) {
0377           throw cms::Exception("end out of sequence") << "endLuminosityBlockSummary seen before endLuminosityBlock";
0378         }
0379       }
0380 
0381       static void globalEndLuminosityBlockSummary(edm::LuminosityBlock const&,
0382                                                   edm::EventSetup const&,
0383                                                   LuminosityBlockContext const* iLBContext,
0384                                                   SummaryCache* lumiSummaryCache) {
0385         ++m_count;
0386         if (lumiSummaryCache->value != cvalue_) {
0387           throw cms::Exception("unexpectedValue")
0388               << "lumi summary cache value = " << lumiSummaryCache->value << " but it was supposed to be " << cvalue_;
0389         }
0390         auto pCache = iLBContext->luminosityBlock();
0391         // Add one so globalEndLuminosityBlock can check this function was called first
0392         ++pCache->value;
0393       }
0394 
0395       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0396                                            edm::EventSetup const&,
0397                                            LuminosityBlockContext const* iLBContext) {
0398         ++m_count;
0399         auto pCache = iLBContext->luminosityBlock();
0400         if (pCache->value != cvalue_ + 1) {
0401           throw cms::Exception("unexpectedValue")
0402               << "lumi cache value " << pCache->value << " but it was supposed to be " << cvalue_ + 1;
0403         }
0404         if (pCache->lumi != 1) {
0405           throw cms::Exception("end out of sequence")
0406               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0407               << iLB.luminosityBlock();
0408         }
0409       }
0410 
0411       void endLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0412         endLumiWasCalled_ = true;
0413       }
0414 
0415       ~LumiSummaryIntAnalyzer() {
0416         if (m_count != trans_) {
0417           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0418         }
0419       }
0420     };
0421 
0422     class ProcessBlockIntAnalyzer
0423         : public edm::stream::EDAnalyzer<edm::WatchProcessBlock, edm::GlobalCache<TestGlobalCacheAn>> {
0424     public:
0425       explicit ProcessBlockIntAnalyzer(edm::ParameterSet const& pset, TestGlobalCacheAn const* testGlobalCache) {
0426         {
0427           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0428           if (not tag.label().empty()) {
0429             testGlobalCache->getTokenBegin_ = consumes<unsigned int, edm::InProcess>(tag);
0430           }
0431         }
0432         {
0433           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0434           if (not tag.label().empty()) {
0435             testGlobalCache->getTokenEnd_ = consumes<unsigned int, edm::InProcess>(tag);
0436           }
0437         }
0438       }
0439 
0440       static std::unique_ptr<TestGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const& pset) {
0441         auto testGlobalCache = std::make_unique<TestGlobalCacheAn>();
0442         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0443         return testGlobalCache;
0444       }
0445 
0446       static void beginProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheAn* testGlobalCache) {
0447         if (testGlobalCache->m_count != 0) {
0448           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer::begin transitions "
0449                                               << testGlobalCache->m_count << " but it was supposed to be " << 0;
0450         }
0451         ++testGlobalCache->m_count;
0452 
0453         const unsigned int valueToGet = 51;
0454         if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0455           if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0456             throw cms::Exception("BadValue")
0457                 << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0458           }
0459         }
0460       }
0461 
0462       void analyze(edm::Event const&, edm::EventSetup const&) override {
0463         TestGlobalCacheAn const* testGlobalCache = globalCache();
0464         if (testGlobalCache->m_count < 1u) {
0465           throw cms::Exception("out of sequence") << "produce before beginProcessBlock " << testGlobalCache->m_count;
0466         }
0467         ++testGlobalCache->m_count;
0468       }
0469 
0470       static void endProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheAn* testGlobalCache) {
0471         ++testGlobalCache->m_count;
0472         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0473           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer::end transitions " << testGlobalCache->m_count
0474                                               << " but it was supposed to be " << testGlobalCache->trans_;
0475         }
0476         {
0477           const unsigned int valueToGet = 51;
0478           if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0479             if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0480               throw cms::Exception("BadValue")
0481                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0482             }
0483           }
0484         }
0485         {
0486           const unsigned int valueToGet = 61;
0487           if (not testGlobalCache->getTokenEnd_.isUninitialized()) {
0488             if (processBlock.get(testGlobalCache->getTokenEnd_) != valueToGet) {
0489               throw cms::Exception("BadValue")
0490                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenEnd_);
0491             }
0492           }
0493         }
0494       }
0495 
0496       static void globalEndJob(TestGlobalCacheAn* testGlobalCache) {
0497         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0498           throw cms::Exception("transitions")
0499               << "TestBeginProcessBlockAnalyzer transitions " << testGlobalCache->m_count
0500               << " but it was supposed to be " << testGlobalCache->trans_;
0501         }
0502       }
0503 
0504       ~ProcessBlockIntAnalyzer() {
0505         TestGlobalCacheAn const* testGlobalCache = globalCache();
0506         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0507           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer transitions " << testGlobalCache->m_count
0508                                               << " but it was supposed to be " << testGlobalCache->trans_;
0509         }
0510       }
0511     };
0512 
0513     class TestInputProcessBlockCache {
0514     public:
0515       long long int value_ = 0;
0516     };
0517 
0518     class TestInputProcessBlockCache1 {
0519     public:
0520       long long int value_ = 0;
0521     };
0522 
0523     class InputProcessBlockIntAnalyzer
0524         : public edm::stream::EDAnalyzer<
0525               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>> {
0526     public:
0527       explicit InputProcessBlockIntAnalyzer(edm::ParameterSet const& pset) {
0528         {
0529           expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
0530           sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
0531           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0532           if (not tag.label().empty()) {
0533             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0534           }
0535         }
0536         {
0537           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0538           if (not tag.label().empty()) {
0539             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0540           }
0541         }
0542         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
0543             getTokenBegin_,
0544             [this](edm::ProcessBlock const& processBlock,
0545                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
0546               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
0547               returnValue->value_ += processBlock.get(getTokenBegin_).value;
0548               returnValue->value_ += processBlock.get(getTokenEnd_).value;
0549               return returnValue;
0550             });
0551       }
0552 
0553       static void accessInputProcessBlock(edm::ProcessBlock const&) {
0554         edm::LogAbsolute("InputProcessBlockIntAnalyzer") << "InputProcessBlockIntAnalyzer::accessInputProcessBlock";
0555       }
0556 
0557       void analyze(edm::Event const& event, edm::EventSetup const&) override {
0558         auto cacheTuple = processBlockCaches(event);
0559         if (!expectedByRun_.empty()) {
0560           if (expectedByRun_.at(event.run() - 1) !=
0561               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
0562             throw cms::Exception("UnexpectedValue")
0563                 << "InputProcessBlockIntAnalyzer::analyze cached value was "
0564                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
0565                 << " but it was supposed to be " << expectedByRun_.at(event.run() - 1);
0566           }
0567         }
0568         // Force events to be processed concurrently
0569         if (sleepTime_ > 0) {
0570           usleep(sleepTime_);
0571         }
0572       }
0573 
0574     private:
0575       edm::EDGetTokenT<IntProduct> getTokenBegin_;
0576       edm::EDGetTokenT<IntProduct> getTokenEnd_;
0577       std::vector<int> expectedByRun_;
0578       unsigned int sleepTime_{0};
0579     };
0580 
0581     struct InputProcessBlockGlobalCacheAn {
0582       // The tokens are duplicated in this test module to prove that they
0583       // work both as GlobalCache members and module data members.
0584       // We need them as GlobalCache members for accessInputProcessBlock.
0585       // In registerProcessBlockCacheFiller we use tokens that are member
0586       // variables of the class and because the lambda captures the "this"
0587       // pointer of the zeroth stream module instance. We always
0588       // use the zeroth EDConsumer. In the case of registerProcessBlockCacheFiller,
0589       // either set of tokens would work. Note that in the GlobalCache case
0590       // there is a slight weirdness that the zeroth consumer is used but
0591       // the token comes from the last consumer instance. It works because
0592       // all the stream module instances have EDConsumer base classes with
0593       // containers with the same contents in the same order (not 100% guaranteed,
0594       // but it would be difficult to implement a module where this isn't true).
0595       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBegin_;
0596       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEnd_;
0597       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBeginM_;
0598       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEndM_;
0599       mutable std::atomic<unsigned int> transitions_{0};
0600       int sum_{0};
0601       unsigned int expectedTransitions_{0};
0602       std::vector<int> expectedByRun_;
0603       int expectedSum_{0};
0604       unsigned int sleepTime_{0};
0605     };
0606 
0607     // Same thing as previous class except with a GlobalCache added
0608     class InputProcessBlockIntAnalyzerG
0609         : public edm::stream::EDAnalyzer<
0610               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>,
0611               edm::GlobalCache<InputProcessBlockGlobalCacheAn>> {
0612     public:
0613       explicit InputProcessBlockIntAnalyzerG(edm::ParameterSet const& pset,
0614                                              InputProcessBlockGlobalCacheAn const* testGlobalCache) {
0615         {
0616           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0617           if (not tag.label().empty()) {
0618             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0619             testGlobalCache->getTokenBegin_ = getTokenBegin_;
0620           }
0621         }
0622         {
0623           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0624           if (not tag.label().empty()) {
0625             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0626             testGlobalCache->getTokenEnd_ = getTokenEnd_;
0627           }
0628         }
0629         {
0630           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlockM");
0631           if (not tag.label().empty()) {
0632             getTokenBeginM_ = consumes<IntProduct, edm::InProcess>(tag);
0633             testGlobalCache->getTokenBeginM_ = getTokenBeginM_;
0634           }
0635         }
0636         {
0637           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlockM");
0638           if (not tag.label().empty()) {
0639             getTokenEndM_ = consumes<IntProduct, edm::InProcess>(tag);
0640             testGlobalCache->getTokenEndM_ = getTokenEndM_;
0641           }
0642         }
0643         registerProcessBlockCacheFiller<int>(
0644             getTokenBegin_, [this](edm::ProcessBlock const& processBlock, std::shared_ptr<int> const& previousCache) {
0645               auto returnValue = std::make_shared<int>(0);
0646               *returnValue += processBlock.get(getTokenBegin_).value;
0647               *returnValue += processBlock.get(getTokenEnd_).value;
0648               ++globalCache()->transitions_;
0649               return returnValue;
0650             });
0651         registerProcessBlockCacheFiller<1>(getTokenBegin_,
0652                                            [this](edm::ProcessBlock const& processBlock,
0653                                                   std::shared_ptr<TestInputProcessBlockCache> const& previousCache) {
0654                                              auto returnValue = std::make_shared<TestInputProcessBlockCache>();
0655                                              returnValue->value_ += processBlock.get(getTokenBegin_).value;
0656                                              returnValue->value_ += processBlock.get(getTokenEnd_).value;
0657                                              ++globalCache()->transitions_;
0658                                              return returnValue;
0659                                            });
0660         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
0661             getTokenBegin_,
0662             [this](edm::ProcessBlock const& processBlock,
0663                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
0664               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
0665               returnValue->value_ += processBlock.get(getTokenBegin_).value;
0666               returnValue->value_ += processBlock.get(getTokenEnd_).value;
0667               ++globalCache()->transitions_;
0668               return returnValue;
0669             });
0670       }
0671 
0672       static std::unique_ptr<InputProcessBlockGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const& pset) {
0673         auto testGlobalCache = std::make_unique<InputProcessBlockGlobalCacheAn>();
0674         testGlobalCache->expectedTransitions_ = pset.getParameter<int>("transitions");
0675         testGlobalCache->expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
0676         testGlobalCache->expectedSum_ = pset.getParameter<int>("expectedSum");
0677         testGlobalCache->sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
0678         return testGlobalCache;
0679       }
0680 
0681       static void accessInputProcessBlock(edm::ProcessBlock const& processBlock,
0682                                           InputProcessBlockGlobalCacheAn* testGlobalCache) {
0683         if (processBlock.processName() == "PROD1") {
0684           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBegin_).value;
0685           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEnd_).value;
0686         }
0687         if (processBlock.processName() == "MERGE") {
0688           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBeginM_).value;
0689           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEndM_).value;
0690         }
0691         ++testGlobalCache->transitions_;
0692       }
0693 
0694       void analyze(edm::Event const& event, edm::EventSetup const&) override {
0695         auto cacheTuple = processBlockCaches(event);
0696         auto testGlobalCache = globalCache();
0697         if (!testGlobalCache->expectedByRun_.empty()) {
0698           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != *std::get<edm::CacheHandle<int>>(cacheTuple)) {
0699             throw cms::Exception("UnexpectedValue")
0700                 << "InputProcessBlockIntAnalyzerG::analyze cached value was "
0701                 << *std::get<edm::CacheHandle<int>>(cacheTuple) << " but it was supposed to be "
0702                 << testGlobalCache->expectedByRun_.at(event.run() - 1);
0703           }
0704           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != std::get<1>(cacheTuple)->value_) {
0705             throw cms::Exception("UnexpectedValue")
0706                 << "InputProcessBlockIntAnalyzerG::analyze second cached value was " << std::get<1>(cacheTuple)->value_
0707                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
0708           }
0709           if (testGlobalCache->expectedByRun_.at(event.run() - 1) !=
0710               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
0711             throw cms::Exception("UnexpectedValue")
0712                 << "InputProcessBlockIntAnalyzerG::analyze third cached value was "
0713                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
0714                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
0715           }
0716         }
0717         ++testGlobalCache->transitions_;
0718 
0719         // Force events to be processed concurrently
0720         if (testGlobalCache->sleepTime_ > 0) {
0721           usleep(testGlobalCache->sleepTime_);
0722         }
0723       }
0724 
0725       static void globalEndJob(InputProcessBlockGlobalCacheAn* testGlobalCache) {
0726         if (testGlobalCache->transitions_ != testGlobalCache->expectedTransitions_) {
0727           throw cms::Exception("transitions")
0728               << "InputProcessBlockIntAnalyzerG transitions " << testGlobalCache->transitions_
0729               << " but it was supposed to be " << testGlobalCache->expectedTransitions_;
0730         }
0731 
0732         if (testGlobalCache->sum_ != testGlobalCache->expectedSum_) {
0733           throw cms::Exception("UnexpectedValue") << "InputProcessBlockIntAnalyzerG sum " << testGlobalCache->sum_
0734                                                   << " but it was supposed to be " << testGlobalCache->expectedSum_;
0735         }
0736       }
0737 
0738     private:
0739       edm::EDGetTokenT<IntProduct> getTokenBegin_;
0740       edm::EDGetTokenT<IntProduct> getTokenEnd_;
0741       edm::EDGetTokenT<IntProduct> getTokenBeginM_;
0742       edm::EDGetTokenT<IntProduct> getTokenEndM_;
0743     };
0744 
0745     // The next two test that modules without the
0746     // static accessInputProcessBlock function will build.
0747     // And also that modules with no functor registered run.
0748 
0749     class InputProcessBlockIntAnalyzerNS
0750         : public edm::stream::EDAnalyzer<edm::InputProcessBlockCache<int, TestInputProcessBlockCache>> {
0751     public:
0752       explicit InputProcessBlockIntAnalyzerNS(edm::ParameterSet const& pset) {}
0753       void analyze(edm::Event const&, edm::EventSetup const&) override {}
0754     };
0755 
0756     // Same thing as previous class except with a GlobalCache added
0757     class InputProcessBlockIntAnalyzerGNS
0758         : public edm::stream::EDAnalyzer<edm::InputProcessBlockCache<int, TestInputProcessBlockCache>,
0759                                          edm::GlobalCache<TestGlobalCacheAn>> {
0760     public:
0761       explicit InputProcessBlockIntAnalyzerGNS(edm::ParameterSet const& pset,
0762                                                TestGlobalCacheAn const* testGlobalCache) {}
0763       static std::unique_ptr<TestGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const&) {
0764         return std::make_unique<TestGlobalCacheAn>();
0765       }
0766       void analyze(edm::Event const&, edm::EventSetup const&) override {}
0767       static void globalEndJob(TestGlobalCacheAn* testGlobalCache) {}
0768     };
0769 
0770   }  // namespace stream
0771 }  // namespace edmtest
0772 std::atomic<unsigned int> edmtest::stream::GlobalIntAnalyzer::m_count{0};
0773 std::atomic<unsigned int> edmtest::stream::RunIntAnalyzer::m_count{0};
0774 std::atomic<unsigned int> edmtest::stream::LumiIntAnalyzer::m_count{0};
0775 std::atomic<unsigned int> edmtest::stream::RunSummaryIntAnalyzer::m_count{0};
0776 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntAnalyzer::m_count{0};
0777 std::atomic<unsigned int> edmtest::stream::GlobalIntAnalyzer::cvalue_{0};
0778 std::atomic<unsigned int> edmtest::stream::RunIntAnalyzer::cvalue_{0};
0779 std::atomic<unsigned int> edmtest::stream::LumiIntAnalyzer::cvalue_{0};
0780 std::atomic<unsigned int> edmtest::stream::RunSummaryIntAnalyzer::cvalue_{0};
0781 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntAnalyzer::cvalue_{0};
0782 std::atomic<bool> edmtest::stream::RunSummaryIntAnalyzer::globalBeginRunCalled_{false};
0783 std::atomic<bool> edmtest::stream::LumiSummaryIntAnalyzer::globalBeginLumiCalled_{false};
0784 DEFINE_FWK_MODULE(edmtest::stream::GlobalIntAnalyzer);
0785 DEFINE_FWK_MODULE(edmtest::stream::RunIntAnalyzer);
0786 DEFINE_FWK_MODULE(edmtest::stream::LumiIntAnalyzer);
0787 DEFINE_FWK_MODULE(edmtest::stream::RunSummaryIntAnalyzer);
0788 DEFINE_FWK_MODULE(edmtest::stream::LumiSummaryIntAnalyzer);
0789 DEFINE_FWK_MODULE(edmtest::stream::ProcessBlockIntAnalyzer);
0790 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzer);
0791 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerG);
0792 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerNS);
0793 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerGNS);