Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:20

0001 
0002 /*----------------------------------------------------------------------
0003 
0004 Toy edm::stream::EDAnalyzer modules of
0005 edm::*Cache templates
0006 for testing purposes only.
0007 
0008 ----------------------------------------------------------------------*/
0009 #include <atomic>
0010 #include <functional>
0011 #include <iostream>
0012 #include <map>
0013 #include <tuple>
0014 #include <unistd.h>
0015 #include <vector>
0016 
0017 #include "FWCore/Framework/interface/CacheHandle.h"
0018 #include "FWCore/Framework/interface/stream/EDAnalyzer.h"
0019 #include "FWCore/Framework/interface/maker/WorkerT.h"
0020 #include "FWCore/Framework/interface/HistoryAppender.h"
0021 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0022 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0023 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0024 #include "FWCore/Framework/interface/Event.h"
0025 #include "FWCore/Framework/interface/MakerMacros.h"
0026 #include "FWCore/Framework/interface/ProcessBlock.h"
0027 #include "FWCore/Framework/interface/TriggerNamesService.h"
0028 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0030 #include "FWCore/ServiceRegistry/interface/Service.h"
0031 #include "FWCore/Utilities/interface/EDMException.h"
0032 #include "DataFormats/Provenance/interface/BranchDescription.h"
0033 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0034 
0035 namespace edmtest {
0036   namespace stream {
0037 
0038     // anonymous namespace here causes build warnings
0039     namespace cache {
0040       struct Cache {
0041         Cache() : value(0), run(0), lumi(0) {}
0042         //Using mutable since we want to update the value.
0043         mutable std::atomic<unsigned int> value;
0044         mutable std::atomic<unsigned int> run;
0045         mutable std::atomic<unsigned int> lumi;
0046       };
0047       struct SummaryCache {
0048         // Intentionally not thread safe, not atomic
0049         unsigned int value = 0;
0050       };
0051       struct TestGlobalCacheAn {
0052         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenBegin_;
0053         CMS_THREAD_SAFE mutable edm::EDGetTokenT<unsigned int> getTokenEnd_;
0054         unsigned int trans_{0};
0055         mutable std::atomic<unsigned int> m_count{0};
0056       };
0057     }  // namespace cache
0058 
0059     using Cache = cache::Cache;
0060     using SummaryCache = cache::SummaryCache;
0061     using TestGlobalCacheAn = cache::TestGlobalCacheAn;
0062 
0063     class GlobalIntAnalyzer : public edm::stream::EDAnalyzer<edm::GlobalCache<Cache>> {
0064     public:
0065       static std::atomic<unsigned int> m_count;
0066       unsigned int trans_;
0067       static std::atomic<unsigned int> cvalue_;
0068 
0069       static std::unique_ptr<Cache> initializeGlobalCache(edm::ParameterSet const& p) {
0070         ++m_count;
0071         return std::make_unique<Cache>();
0072       }
0073 
0074       GlobalIntAnalyzer(edm::ParameterSet const& p, Cache const* iGlobal) {
0075         trans_ = p.getParameter<int>("transitions");
0076         cvalue_ = p.getParameter<int>("cachevalue");
0077         callWhenNewProductsRegistered([](edm::BranchDescription const& desc) {
0078           std::cout << "stream::GlobalIntAnalyzer " << desc.moduleLabel() << std::endl;
0079         });
0080       }
0081 
0082       static void globalBeginJob(Cache* iGlobal) {
0083         ++m_count;
0084         if (iGlobal->value != 0) {
0085           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
0086         }
0087       }
0088 
0089       void analyze(edm::Event const&, edm::EventSetup const&) {
0090         ++m_count;
0091         ++((globalCache())->value);
0092       }
0093 
0094       static void globalEndJob(Cache* iGlobal) {
0095         ++m_count;
0096         if (iGlobal->value != cvalue_) {
0097           throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be " << cvalue_;
0098         }
0099       }
0100 
0101       ~GlobalIntAnalyzer() {
0102         if (m_count != trans_) {
0103           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0104         }
0105       }
0106     };
0107 
0108     class RunIntAnalyzer : public edm::stream::EDAnalyzer<edm::RunCache<Cache>, edm::stream::WatchRuns> {
0109     public:
0110       static std::atomic<unsigned int> m_count;
0111       unsigned int trans_;
0112       static std::atomic<unsigned int> cvalue_;
0113 
0114       RunIntAnalyzer(edm::ParameterSet const& p) {
0115         trans_ = p.getParameter<int>("transitions");
0116         cvalue_ = p.getParameter<int>("cachevalue");
0117         m_count = 0;
0118       }
0119 
0120       void analyze(edm::Event const&, edm::EventSetup const&) override {
0121         if (moduleDescription().processName() != edm::Service<edm::service::TriggerNamesService>()->getProcessName()) {
0122           throw cms::Exception("LogicError") << "module description not properly initialized in stream analyzer";
0123         }
0124         ++m_count;
0125         ++(runCache()->value);
0126       }
0127 
0128       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0129         ++m_count;
0130         auto pCache = std::make_shared<Cache>();
0131         pCache->run = iRun.runAuxiliary().run();
0132         return pCache;
0133       }
0134 
0135       void beginRun(edm::Run const& iRun, edm::EventSetup const&) override {
0136         if (runCache()->run != iRun.runAuxiliary().run()) {
0137           throw cms::Exception("begin out of sequence") << "beginRun seen before globalBeginRun";
0138         }
0139       }
0140 
0141       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0142         ++m_count;
0143         auto pCache = iContext->run();
0144         if (pCache->run != iRun.runAuxiliary().run()) {
0145           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0146         }
0147         pCache->run = 0;
0148         if (iContext->run()->value != cvalue_) {
0149           throw cms::Exception("cache value") << iContext->run()->value << " but it was supposed to be " << cvalue_;
0150         }
0151       }
0152 
0153       void endRun(edm::Run const& iRun, edm::EventSetup const&) override {
0154         if (runCache()->run != iRun.runAuxiliary().run()) {
0155           throw cms::Exception("end out of sequence") << "globalEndRun seen before endRun";
0156         }
0157       }
0158 
0159       ~RunIntAnalyzer() {
0160         if (m_count != trans_) {
0161           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0162         }
0163       }
0164     };
0165 
0166     class LumiIntAnalyzer
0167         : public edm::stream::EDAnalyzer<edm::LuminosityBlockCache<Cache>, edm::stream::WatchLuminosityBlocks> {
0168     public:
0169       static std::atomic<unsigned int> m_count;
0170       unsigned int trans_;
0171       static std::atomic<unsigned int> cvalue_;
0172 
0173       LumiIntAnalyzer(edm::ParameterSet const& p) {
0174         trans_ = p.getParameter<int>("transitions");
0175         cvalue_ = p.getParameter<int>("cachevalue");
0176         m_count = 0;
0177         // just to create a data dependence
0178         auto const& tag = p.getParameter<edm::InputTag>("moduleLabel");
0179         if (not tag.label().empty()) {
0180           consumes<unsigned int, edm::InLumi>(tag);
0181         }
0182       }
0183 
0184       void analyze(edm::Event const&, edm::EventSetup const&) override {
0185         ++m_count;
0186         ++(luminosityBlockCache()->value);
0187       }
0188 
0189       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0190                                                                edm::EventSetup const&,
0191                                                                RunContext const*) {
0192         ++m_count;
0193         auto pCache = std::make_shared<Cache>();
0194         pCache->run = iLB.luminosityBlockAuxiliary().run();
0195         pCache->lumi = iLB.luminosityBlockAuxiliary().luminosityBlock();
0196         return pCache;
0197       }
0198 
0199       void beginLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0200         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0201             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0202           throw cms::Exception("begin out of sequence")
0203               << "beginLuminosityBlock seen before globalBeginLuminosityBlock";
0204         }
0205       }
0206 
0207       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0208                                            edm::EventSetup const&,
0209                                            LuminosityBlockContext const* iLBContext) {
0210         ++m_count;
0211         auto pCache = iLBContext->luminosityBlock();
0212         if (pCache->run != iLB.luminosityBlockAuxiliary().run() ||
0213             pCache->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0214           throw cms::Exception("end out of sequence")
0215               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0216               << iLB.luminosityBlock();
0217         }
0218         pCache->run = 0;
0219         pCache->lumi = 0;
0220         if (iLBContext->luminosityBlock()->value != cvalue_) {
0221           throw cms::Exception("cache value")
0222               << iLBContext->luminosityBlock()->value << " but it was supposed to be " << cvalue_;
0223         }
0224       }
0225 
0226       void endLuminosityBlock(edm::LuminosityBlock const& iLB, edm::EventSetup const&) override {
0227         if (luminosityBlockCache()->run != iLB.luminosityBlockAuxiliary().run() ||
0228             luminosityBlockCache()->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
0229           throw cms::Exception("end out of sequence") << "globalEndLuminosityBlock seen before endLuminosityBlock";
0230         }
0231       }
0232 
0233       ~LumiIntAnalyzer() {
0234         if (m_count != trans_) {
0235           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0236         }
0237       }
0238     };
0239 
0240     class RunSummaryIntAnalyzer : public edm::stream::EDAnalyzer<edm::RunCache<Cache>,
0241                                                                  edm::RunSummaryCache<SummaryCache>,
0242                                                                  edm::stream::WatchRuns> {
0243     public:
0244       static std::atomic<unsigned int> m_count;
0245       unsigned int trans_;
0246       static std::atomic<unsigned int> cvalue_;
0247       static std::atomic<bool> globalBeginRunCalled_;
0248       unsigned int valueAccumulatedForStream_ = 0;
0249       bool endRunWasCalled_ = false;
0250 
0251       RunSummaryIntAnalyzer(edm::ParameterSet const& p) {
0252         trans_ = p.getParameter<int>("transitions");
0253         cvalue_ = p.getParameter<int>("cachevalue");
0254         m_count = 0;
0255       }
0256 
0257       void analyze(edm::Event const&, edm::EventSetup const&) override {
0258         ++m_count;
0259         ++(runCache()->value);
0260         ++valueAccumulatedForStream_;
0261       }
0262 
0263       void beginRun(edm::Run const&, edm::EventSetup const&) override {
0264         valueAccumulatedForStream_ = 0;
0265         endRunWasCalled_ = false;
0266       }
0267 
0268       static std::shared_ptr<Cache> globalBeginRun(edm::Run const& iRun, edm::EventSetup const&, GlobalCache const*) {
0269         ++m_count;
0270         globalBeginRunCalled_ = true;
0271         auto pCache = std::make_shared<Cache>();
0272         ++(pCache->run);
0273         return pCache;
0274       }
0275 
0276       static std::shared_ptr<SummaryCache> globalBeginRunSummary(edm::Run const&,
0277                                                                  edm::EventSetup const&,
0278                                                                  GlobalCache const*) {
0279         ++m_count;
0280         if (!globalBeginRunCalled_) {
0281           throw cms::Exception("begin out of sequence") << "globalBeginRunSummary seen before globalBeginRun";
0282         }
0283         return std::make_shared<SummaryCache>();
0284       }
0285 
0286       void endRunSummary(edm::Run const&, edm::EventSetup const&, SummaryCache* runSummaryCache) const override {
0287         runSummaryCache->value += valueAccumulatedForStream_;
0288         if (!endRunWasCalled_) {
0289           throw cms::Exception("end out of sequence") << "endRunSummary seen before endRun";
0290         }
0291       }
0292 
0293       static void globalEndRunSummary(edm::Run const&,
0294                                       edm::EventSetup const&,
0295                                       RunContext const*,
0296                                       SummaryCache* runSummaryCache) {
0297         ++m_count;
0298         if (runSummaryCache->value != cvalue_) {
0299           throw cms::Exception("unexpectedValue")
0300               << "run summary cache value = " << runSummaryCache->value << " but it was supposed to be " << cvalue_;
0301         }
0302       }
0303 
0304       static void globalEndRun(edm::Run const& iRun, edm::EventSetup const&, RunContext const* iContext) {
0305         ++m_count;
0306         auto pCache = iContext->run();
0307         if (pCache->value != cvalue_) {
0308           throw cms::Exception("unExpectedValue")
0309               << "run cache value " << pCache->value << " but it was supposed to be " << cvalue_;
0310         }
0311         if (pCache->run != 1) {
0312           throw cms::Exception("end out of sequence") << "globalEndRun seen before globalBeginRun in Run" << iRun.run();
0313         }
0314       }
0315 
0316       void endRun(edm::Run const&, edm::EventSetup const&) override { endRunWasCalled_ = true; }
0317 
0318       ~RunSummaryIntAnalyzer() {
0319         if (m_count != trans_) {
0320           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0321         }
0322       }
0323     };
0324 
0325     class LumiSummaryIntAnalyzer : public edm::stream::EDAnalyzer<edm::LuminosityBlockCache<Cache>,
0326                                                                   edm::LuminosityBlockSummaryCache<SummaryCache>,
0327                                                                   edm::stream::WatchLuminosityBlocks> {
0328     public:
0329       static std::atomic<unsigned int> m_count;
0330       unsigned int trans_;
0331       static std::atomic<unsigned int> cvalue_;
0332       static std::atomic<bool> globalBeginLumiCalled_;
0333       unsigned int valueAccumulatedForStream_ = 0;
0334       bool endLumiWasCalled_ = false;
0335 
0336       LumiSummaryIntAnalyzer(edm::ParameterSet const& p) {
0337         trans_ = p.getParameter<int>("transitions");
0338         cvalue_ = p.getParameter<int>("cachevalue");
0339         m_count = 0;
0340       }
0341 
0342       void analyze(edm::Event const&, edm::EventSetup const&) override {
0343         ++m_count;
0344         ++(luminosityBlockCache()->value);
0345         ++valueAccumulatedForStream_;
0346       }
0347 
0348       void beginLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0349         valueAccumulatedForStream_ = 0;
0350         endLumiWasCalled_ = false;
0351       }
0352 
0353       static std::shared_ptr<Cache> globalBeginLuminosityBlock(edm::LuminosityBlock const& iLB,
0354                                                                edm::EventSetup const&,
0355                                                                RunContext const*) {
0356         ++m_count;
0357         globalBeginLumiCalled_ = true;
0358         auto pCache = std::make_shared<Cache>();
0359         ++(pCache->lumi);
0360         return pCache;
0361       }
0362 
0363       static std::shared_ptr<SummaryCache> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
0364                                                                              edm::EventSetup const&,
0365                                                                              LuminosityBlockContext const*) {
0366         ++m_count;
0367         if (!globalBeginLumiCalled_) {
0368           throw cms::Exception("begin out of sequence")
0369               << "globalBeginLuminosityBlockSummary seen before globalBeginLuminosityBlock";
0370         }
0371         globalBeginLumiCalled_ = false;
0372         return std::make_shared<SummaryCache>();
0373       }
0374 
0375       void endLuminosityBlockSummary(edm::LuminosityBlock const&,
0376                                      edm::EventSetup const&,
0377                                      SummaryCache* lumiSummaryCache) const override {
0378         lumiSummaryCache->value += valueAccumulatedForStream_;
0379         if (!endLumiWasCalled_) {
0380           throw cms::Exception("end out of sequence") << "endLuminosityBlockSummary seen before endLuminosityBlock";
0381         }
0382       }
0383 
0384       static void globalEndLuminosityBlockSummary(edm::LuminosityBlock const&,
0385                                                   edm::EventSetup const&,
0386                                                   LuminosityBlockContext const* iLBContext,
0387                                                   SummaryCache* lumiSummaryCache) {
0388         ++m_count;
0389         if (lumiSummaryCache->value != cvalue_) {
0390           throw cms::Exception("unexpectedValue")
0391               << "lumi summary cache value = " << lumiSummaryCache->value << " but it was supposed to be " << cvalue_;
0392         }
0393         auto pCache = iLBContext->luminosityBlock();
0394         // Add one so globalEndLuminosityBlock can check this function was called first
0395         ++pCache->value;
0396       }
0397 
0398       static void globalEndLuminosityBlock(edm::LuminosityBlock const& iLB,
0399                                            edm::EventSetup const&,
0400                                            LuminosityBlockContext const* iLBContext) {
0401         ++m_count;
0402         auto pCache = iLBContext->luminosityBlock();
0403         if (pCache->value != cvalue_ + 1) {
0404           throw cms::Exception("unexpectedValue")
0405               << "lumi cache value " << pCache->value << " but it was supposed to be " << cvalue_ + 1;
0406         }
0407         if (pCache->lumi != 1) {
0408           throw cms::Exception("end out of sequence")
0409               << "globalEndLuminosityBlock seen before globalBeginLuminosityBlock in LuminosityBlock"
0410               << iLB.luminosityBlock();
0411         }
0412       }
0413 
0414       void endLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) override {
0415         endLumiWasCalled_ = true;
0416       }
0417 
0418       ~LumiSummaryIntAnalyzer() {
0419         if (m_count != trans_) {
0420           throw cms::Exception("transitions") << m_count << " but it was supposed to be " << trans_;
0421         }
0422       }
0423     };
0424 
0425     class ProcessBlockIntAnalyzer
0426         : public edm::stream::EDAnalyzer<edm::WatchProcessBlock, edm::GlobalCache<TestGlobalCacheAn>> {
0427     public:
0428       explicit ProcessBlockIntAnalyzer(edm::ParameterSet const& pset, TestGlobalCacheAn const* testGlobalCache) {
0429         {
0430           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0431           if (not tag.label().empty()) {
0432             testGlobalCache->getTokenBegin_ = consumes<unsigned int, edm::InProcess>(tag);
0433           }
0434         }
0435         {
0436           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0437           if (not tag.label().empty()) {
0438             testGlobalCache->getTokenEnd_ = consumes<unsigned int, edm::InProcess>(tag);
0439           }
0440         }
0441       }
0442 
0443       static std::unique_ptr<TestGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const& pset) {
0444         auto testGlobalCache = std::make_unique<TestGlobalCacheAn>();
0445         testGlobalCache->trans_ = pset.getParameter<int>("transitions");
0446         return testGlobalCache;
0447       }
0448 
0449       static void beginProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheAn* testGlobalCache) {
0450         if (testGlobalCache->m_count != 0) {
0451           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer::begin transitions "
0452                                               << testGlobalCache->m_count << " but it was supposed to be " << 0;
0453         }
0454         ++testGlobalCache->m_count;
0455 
0456         const unsigned int valueToGet = 51;
0457         if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0458           if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0459             throw cms::Exception("BadValue")
0460                 << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0461           }
0462         }
0463       }
0464 
0465       void analyze(edm::Event const&, edm::EventSetup const&) override {
0466         TestGlobalCacheAn const* testGlobalCache = globalCache();
0467         if (testGlobalCache->m_count < 1u) {
0468           throw cms::Exception("out of sequence") << "produce before beginProcessBlock " << testGlobalCache->m_count;
0469         }
0470         ++testGlobalCache->m_count;
0471       }
0472 
0473       static void endProcessBlock(edm::ProcessBlock const& processBlock, TestGlobalCacheAn* testGlobalCache) {
0474         ++testGlobalCache->m_count;
0475         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0476           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer::end transitions " << testGlobalCache->m_count
0477                                               << " but it was supposed to be " << testGlobalCache->trans_;
0478         }
0479         {
0480           const unsigned int valueToGet = 51;
0481           if (not testGlobalCache->getTokenBegin_.isUninitialized()) {
0482             if (processBlock.get(testGlobalCache->getTokenBegin_) != valueToGet) {
0483               throw cms::Exception("BadValue")
0484                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenBegin_);
0485             }
0486           }
0487         }
0488         {
0489           const unsigned int valueToGet = 61;
0490           if (not testGlobalCache->getTokenEnd_.isUninitialized()) {
0491             if (processBlock.get(testGlobalCache->getTokenEnd_) != valueToGet) {
0492               throw cms::Exception("BadValue")
0493                   << "expected " << valueToGet << " but got " << processBlock.get(testGlobalCache->getTokenEnd_);
0494             }
0495           }
0496         }
0497       }
0498 
0499       static void globalEndJob(TestGlobalCacheAn* testGlobalCache) {
0500         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0501           throw cms::Exception("transitions")
0502               << "TestBeginProcessBlockAnalyzer transitions " << testGlobalCache->m_count
0503               << " but it was supposed to be " << testGlobalCache->trans_;
0504         }
0505       }
0506 
0507       ~ProcessBlockIntAnalyzer() {
0508         TestGlobalCacheAn const* testGlobalCache = globalCache();
0509         if (testGlobalCache->m_count != testGlobalCache->trans_) {
0510           throw cms::Exception("transitions") << "ProcessBlockIntAnalyzer transitions " << testGlobalCache->m_count
0511                                               << " but it was supposed to be " << testGlobalCache->trans_;
0512         }
0513       }
0514     };
0515 
0516     class TestInputProcessBlockCache {
0517     public:
0518       long long int value_ = 0;
0519     };
0520 
0521     class TestInputProcessBlockCache1 {
0522     public:
0523       long long int value_ = 0;
0524     };
0525 
0526     class InputProcessBlockIntAnalyzer
0527         : public edm::stream::EDAnalyzer<
0528               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>> {
0529     public:
0530       explicit InputProcessBlockIntAnalyzer(edm::ParameterSet const& pset) {
0531         {
0532           expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
0533           sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
0534           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0535           if (not tag.label().empty()) {
0536             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0537           }
0538         }
0539         {
0540           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0541           if (not tag.label().empty()) {
0542             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0543           }
0544         }
0545         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
0546             getTokenBegin_,
0547             [this](edm::ProcessBlock const& processBlock,
0548                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
0549               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
0550               returnValue->value_ += processBlock.get(getTokenBegin_).value;
0551               returnValue->value_ += processBlock.get(getTokenEnd_).value;
0552               return returnValue;
0553             });
0554       }
0555 
0556       static void accessInputProcessBlock(edm::ProcessBlock const&) {
0557         edm::LogAbsolute("InputProcessBlockIntAnalyzer") << "InputProcessBlockIntAnalyzer::accessInputProcessBlock";
0558       }
0559 
0560       void analyze(edm::Event const& event, edm::EventSetup const&) override {
0561         auto cacheTuple = processBlockCaches(event);
0562         if (!expectedByRun_.empty()) {
0563           if (expectedByRun_.at(event.run() - 1) !=
0564               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
0565             throw cms::Exception("UnexpectedValue")
0566                 << "InputProcessBlockIntAnalyzer::analyze cached value was "
0567                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
0568                 << " but it was supposed to be " << expectedByRun_.at(event.run() - 1);
0569           }
0570         }
0571         // Force events to be processed concurrently
0572         if (sleepTime_ > 0) {
0573           usleep(sleepTime_);
0574         }
0575       }
0576 
0577     private:
0578       edm::EDGetTokenT<IntProduct> getTokenBegin_;
0579       edm::EDGetTokenT<IntProduct> getTokenEnd_;
0580       std::vector<int> expectedByRun_;
0581       unsigned int sleepTime_{0};
0582     };
0583 
0584     struct InputProcessBlockGlobalCacheAn {
0585       // The tokens are duplicated in this test module to prove that they
0586       // work both as GlobalCache members and module data members.
0587       // We need them as GlobalCache members for accessInputProcessBlock.
0588       // In registerProcessBlockCacheFiller we use tokens that are member
0589       // variables of the class and because the lambda captures the "this"
0590       // pointer of the zeroth stream module instance. We always
0591       // use the zeroth EDConsumer. In the case of registerProcessBlockCacheFiller,
0592       // either set of tokens would work. Note that in the GlobalCache case
0593       // there is a slight weirdness that the zeroth consumer is used but
0594       // the token comes from the last consumer instance. It works because
0595       // all the stream module instances have EDConsumer base classes with
0596       // containers with the same contents in the same order (not 100% guaranteed,
0597       // but it would be difficult to implement a module where this isn't true).
0598       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBegin_;
0599       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEnd_;
0600       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenBeginM_;
0601       CMS_THREAD_SAFE mutable edm::EDGetTokenT<IntProduct> getTokenEndM_;
0602       mutable std::atomic<unsigned int> transitions_{0};
0603       int sum_{0};
0604       unsigned int expectedTransitions_{0};
0605       std::vector<int> expectedByRun_;
0606       int expectedSum_{0};
0607       unsigned int sleepTime_{0};
0608     };
0609 
0610     // Same thing as previous class except with a GlobalCache added
0611     class InputProcessBlockIntAnalyzerG
0612         : public edm::stream::EDAnalyzer<
0613               edm::InputProcessBlockCache<int, TestInputProcessBlockCache, TestInputProcessBlockCache1>,
0614               edm::GlobalCache<InputProcessBlockGlobalCacheAn>> {
0615     public:
0616       explicit InputProcessBlockIntAnalyzerG(edm::ParameterSet const& pset,
0617                                              InputProcessBlockGlobalCacheAn const* testGlobalCache) {
0618         {
0619           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlock");
0620           if (not tag.label().empty()) {
0621             getTokenBegin_ = consumes<IntProduct, edm::InProcess>(tag);
0622             testGlobalCache->getTokenBegin_ = getTokenBegin_;
0623           }
0624         }
0625         {
0626           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlock");
0627           if (not tag.label().empty()) {
0628             getTokenEnd_ = consumes<IntProduct, edm::InProcess>(tag);
0629             testGlobalCache->getTokenEnd_ = getTokenEnd_;
0630           }
0631         }
0632         {
0633           auto tag = pset.getParameter<edm::InputTag>("consumesBeginProcessBlockM");
0634           if (not tag.label().empty()) {
0635             getTokenBeginM_ = consumes<IntProduct, edm::InProcess>(tag);
0636             testGlobalCache->getTokenBeginM_ = getTokenBeginM_;
0637           }
0638         }
0639         {
0640           auto tag = pset.getParameter<edm::InputTag>("consumesEndProcessBlockM");
0641           if (not tag.label().empty()) {
0642             getTokenEndM_ = consumes<IntProduct, edm::InProcess>(tag);
0643             testGlobalCache->getTokenEndM_ = getTokenEndM_;
0644           }
0645         }
0646         registerProcessBlockCacheFiller<int>(
0647             getTokenBegin_, [this](edm::ProcessBlock const& processBlock, std::shared_ptr<int> const& previousCache) {
0648               auto returnValue = std::make_shared<int>(0);
0649               *returnValue += processBlock.get(getTokenBegin_).value;
0650               *returnValue += processBlock.get(getTokenEnd_).value;
0651               ++globalCache()->transitions_;
0652               return returnValue;
0653             });
0654         registerProcessBlockCacheFiller<1>(getTokenBegin_,
0655                                            [this](edm::ProcessBlock const& processBlock,
0656                                                   std::shared_ptr<TestInputProcessBlockCache> const& previousCache) {
0657                                              auto returnValue = std::make_shared<TestInputProcessBlockCache>();
0658                                              returnValue->value_ += processBlock.get(getTokenBegin_).value;
0659                                              returnValue->value_ += processBlock.get(getTokenEnd_).value;
0660                                              ++globalCache()->transitions_;
0661                                              return returnValue;
0662                                            });
0663         registerProcessBlockCacheFiller<TestInputProcessBlockCache1>(
0664             getTokenBegin_,
0665             [this](edm::ProcessBlock const& processBlock,
0666                    std::shared_ptr<TestInputProcessBlockCache1> const& previousCache) {
0667               auto returnValue = std::make_shared<TestInputProcessBlockCache1>();
0668               returnValue->value_ += processBlock.get(getTokenBegin_).value;
0669               returnValue->value_ += processBlock.get(getTokenEnd_).value;
0670               ++globalCache()->transitions_;
0671               return returnValue;
0672             });
0673       }
0674 
0675       static std::unique_ptr<InputProcessBlockGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const& pset) {
0676         auto testGlobalCache = std::make_unique<InputProcessBlockGlobalCacheAn>();
0677         testGlobalCache->expectedTransitions_ = pset.getParameter<int>("transitions");
0678         testGlobalCache->expectedByRun_ = pset.getParameter<std::vector<int>>("expectedByRun");
0679         testGlobalCache->expectedSum_ = pset.getParameter<int>("expectedSum");
0680         testGlobalCache->sleepTime_ = pset.getParameter<unsigned int>("sleepTime");
0681         return testGlobalCache;
0682       }
0683 
0684       static void accessInputProcessBlock(edm::ProcessBlock const& processBlock,
0685                                           InputProcessBlockGlobalCacheAn* testGlobalCache) {
0686         if (processBlock.processName() == "PROD1") {
0687           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBegin_).value;
0688           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEnd_).value;
0689         }
0690         if (processBlock.processName() == "MERGE") {
0691           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenBeginM_).value;
0692           testGlobalCache->sum_ += processBlock.get(testGlobalCache->getTokenEndM_).value;
0693         }
0694         ++testGlobalCache->transitions_;
0695       }
0696 
0697       void analyze(edm::Event const& event, edm::EventSetup const&) override {
0698         auto cacheTuple = processBlockCaches(event);
0699         auto testGlobalCache = globalCache();
0700         if (!testGlobalCache->expectedByRun_.empty()) {
0701           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != *std::get<edm::CacheHandle<int>>(cacheTuple)) {
0702             throw cms::Exception("UnexpectedValue")
0703                 << "InputProcessBlockIntAnalyzerG::analyze cached value was "
0704                 << *std::get<edm::CacheHandle<int>>(cacheTuple) << " but it was supposed to be "
0705                 << testGlobalCache->expectedByRun_.at(event.run() - 1);
0706           }
0707           if (testGlobalCache->expectedByRun_.at(event.run() - 1) != std::get<1>(cacheTuple)->value_) {
0708             throw cms::Exception("UnexpectedValue")
0709                 << "InputProcessBlockIntAnalyzerG::analyze second cached value was " << std::get<1>(cacheTuple)->value_
0710                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
0711           }
0712           if (testGlobalCache->expectedByRun_.at(event.run() - 1) !=
0713               std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_) {
0714             throw cms::Exception("UnexpectedValue")
0715                 << "InputProcessBlockIntAnalyzerG::analyze third cached value was "
0716                 << std::get<edm::CacheHandle<TestInputProcessBlockCache1>>(cacheTuple)->value_
0717                 << " but it was supposed to be " << testGlobalCache->expectedByRun_.at(event.run() - 1);
0718           }
0719         }
0720         ++testGlobalCache->transitions_;
0721 
0722         // Force events to be processed concurrently
0723         if (testGlobalCache->sleepTime_ > 0) {
0724           usleep(testGlobalCache->sleepTime_);
0725         }
0726       }
0727 
0728       static void globalEndJob(InputProcessBlockGlobalCacheAn* testGlobalCache) {
0729         if (testGlobalCache->transitions_ != testGlobalCache->expectedTransitions_) {
0730           throw cms::Exception("transitions")
0731               << "InputProcessBlockIntAnalyzerG transitions " << testGlobalCache->transitions_
0732               << " but it was supposed to be " << testGlobalCache->expectedTransitions_;
0733         }
0734 
0735         if (testGlobalCache->sum_ != testGlobalCache->expectedSum_) {
0736           throw cms::Exception("UnexpectedValue") << "InputProcessBlockIntAnalyzerG sum " << testGlobalCache->sum_
0737                                                   << " but it was supposed to be " << testGlobalCache->expectedSum_;
0738         }
0739       }
0740 
0741     private:
0742       edm::EDGetTokenT<IntProduct> getTokenBegin_;
0743       edm::EDGetTokenT<IntProduct> getTokenEnd_;
0744       edm::EDGetTokenT<IntProduct> getTokenBeginM_;
0745       edm::EDGetTokenT<IntProduct> getTokenEndM_;
0746     };
0747 
0748     // The next two test that modules without the
0749     // static accessInputProcessBlock function will build.
0750     // And also that modules with no functor registered run.
0751 
0752     class InputProcessBlockIntAnalyzerNS
0753         : public edm::stream::EDAnalyzer<edm::InputProcessBlockCache<int, TestInputProcessBlockCache>> {
0754     public:
0755       explicit InputProcessBlockIntAnalyzerNS(edm::ParameterSet const& pset) {}
0756       void analyze(edm::Event const&, edm::EventSetup const&) override {}
0757     };
0758 
0759     // Same thing as previous class except with a GlobalCache added
0760     class InputProcessBlockIntAnalyzerGNS
0761         : public edm::stream::EDAnalyzer<edm::InputProcessBlockCache<int, TestInputProcessBlockCache>,
0762                                          edm::GlobalCache<TestGlobalCacheAn>> {
0763     public:
0764       explicit InputProcessBlockIntAnalyzerGNS(edm::ParameterSet const& pset,
0765                                                TestGlobalCacheAn const* testGlobalCache) {}
0766       static std::unique_ptr<TestGlobalCacheAn> initializeGlobalCache(edm::ParameterSet const&) {
0767         return std::make_unique<TestGlobalCacheAn>();
0768       }
0769       void analyze(edm::Event const&, edm::EventSetup const&) override {}
0770       static void globalEndJob(TestGlobalCacheAn* testGlobalCache) {}
0771     };
0772 
0773   }  // namespace stream
0774 }  // namespace edmtest
0775 std::atomic<unsigned int> edmtest::stream::GlobalIntAnalyzer::m_count{0};
0776 std::atomic<unsigned int> edmtest::stream::RunIntAnalyzer::m_count{0};
0777 std::atomic<unsigned int> edmtest::stream::LumiIntAnalyzer::m_count{0};
0778 std::atomic<unsigned int> edmtest::stream::RunSummaryIntAnalyzer::m_count{0};
0779 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntAnalyzer::m_count{0};
0780 std::atomic<unsigned int> edmtest::stream::GlobalIntAnalyzer::cvalue_{0};
0781 std::atomic<unsigned int> edmtest::stream::RunIntAnalyzer::cvalue_{0};
0782 std::atomic<unsigned int> edmtest::stream::LumiIntAnalyzer::cvalue_{0};
0783 std::atomic<unsigned int> edmtest::stream::RunSummaryIntAnalyzer::cvalue_{0};
0784 std::atomic<unsigned int> edmtest::stream::LumiSummaryIntAnalyzer::cvalue_{0};
0785 std::atomic<bool> edmtest::stream::RunSummaryIntAnalyzer::globalBeginRunCalled_{false};
0786 std::atomic<bool> edmtest::stream::LumiSummaryIntAnalyzer::globalBeginLumiCalled_{false};
0787 DEFINE_FWK_MODULE(edmtest::stream::GlobalIntAnalyzer);
0788 DEFINE_FWK_MODULE(edmtest::stream::RunIntAnalyzer);
0789 DEFINE_FWK_MODULE(edmtest::stream::LumiIntAnalyzer);
0790 DEFINE_FWK_MODULE(edmtest::stream::RunSummaryIntAnalyzer);
0791 DEFINE_FWK_MODULE(edmtest::stream::LumiSummaryIntAnalyzer);
0792 DEFINE_FWK_MODULE(edmtest::stream::ProcessBlockIntAnalyzer);
0793 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzer);
0794 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerG);
0795 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerNS);
0796 DEFINE_FWK_MODULE(edmtest::stream::InputProcessBlockIntAnalyzerGNS);