Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-14 02:38:42

0001 /*
0002  *  proxyfactoryproducer_t.cc
0003  *  EDMProto
0004  *
0005  */
0006 #include <iostream>
0007 #include <atomic>
0008 #include <vector>
0009 #include <map>
0010 #include <functional>
0011 #include "oneapi/tbb/global_control.h"
0012 #include "FWCore/Framework/interface/global/OutputModule.h"
0013 #include "FWCore/Framework/interface/OutputModuleCommunicatorT.h"
0014 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0015 #include "FWCore/Framework/interface/maker/WorkerT.h"
0016 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0017 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0018 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0019 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0020 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0021 #include "FWCore/Framework/interface/HistoryAppender.h"
0022 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0023 #include "FWCore/Framework/interface/TriggerNamesService.h"
0024 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0025 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0026 #include "FWCore/ServiceRegistry/interface/Service.h"
0027 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0028 #include "FWCore/Framework/interface/FileBlock.h"
0029 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0030 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0031 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0032 
0033 #include "FWCore/Utilities/interface/Exception.h"
0034 
0035 #include "cppunit/extensions/HelperMacros.h"
0036 
0037 namespace edm {
0038   class ModuleCallingContext;
0039 }
0040 
0041 class testGlobalOutputModule : public CppUnit::TestFixture {
0042   CPPUNIT_TEST_SUITE(testGlobalOutputModule);
0043 
0044   CPPUNIT_TEST(basicTest);
0045   CPPUNIT_TEST(fileTest);
0046 
0047   CPPUNIT_TEST_SUITE_END();
0048 
0049 public:
0050   testGlobalOutputModule();
0051 
0052   void setUp() {}
0053   void tearDown() {}
0054 
0055   void basicTest();
0056   void fileTest();
0057 
0058   enum class Trans {
0059     kBeginJob,
0060     kGlobalOpenInputFile,
0061     kGlobalBeginRun,
0062     kGlobalBeginRunProduce,
0063     kGlobalBeginLuminosityBlock,
0064     kEvent,
0065     kGlobalEndLuminosityBlock,
0066     kGlobalEndRun,
0067     kGlobalCloseInputFile,
0068     kEndJob
0069   };
0070 
0071   typedef std::vector<Trans> Expectations;
0072 
0073 private:
0074   std::map<Trans, std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)>> m_transToFunc;
0075 
0076   edm::ProcessConfiguration m_procConfig;
0077   edm::PreallocationConfiguration m_preallocConfig;
0078   std::shared_ptr<edm::ProductRegistry> m_prodReg;
0079   std::shared_ptr<edm::BranchIDListHelper> m_idHelper;
0080   std::shared_ptr<edm::ThinnedAssociationsHelper> m_associationsHelper;
0081   std::unique_ptr<edm::EventPrincipal> m_ep;
0082   edm::HistoryAppender historyAppender_;
0083   std::shared_ptr<edm::LuminosityBlockPrincipal> m_lbp;
0084   std::shared_ptr<edm::RunPrincipal> m_rp;
0085   std::shared_ptr<edm::ActivityRegistry>
0086       m_actReg;  // We do not use propagate_const because the registry itself is mutable.
0087   edm::EventSetupImpl* m_es = nullptr;
0088   edm::ModuleDescription m_desc = {"Dummy", "dummy"};
0089   edm::WorkerParams m_params;
0090 
0091   typedef edm::service::TriggerNamesService TNS;
0092   typedef edm::serviceregistry::ServiceWrapper<TNS> w_TNS;
0093   std::shared_ptr<w_TNS> tnsptr_;
0094   edm::ServiceToken serviceToken_;
0095 
0096   template <typename T>
0097   void testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect);
0098 
0099   template <typename Traits, typename Info>
0100   void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) {
0101     oneapi::tbb::task_group group;
0102     edm::FinalWaitingTask task{group};
0103     edm::ServiceToken token;
0104     iBase->doWorkAsync<Traits>(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr);
0105     task.wait();
0106   }
0107 
0108   class BasicOutputModule : public edm::global::OutputModule<> {
0109   public:
0110     using edm::global::OutputModuleBase::doPreallocate;
0111     BasicOutputModule(edm::ParameterSet const& iPSet)
0112         : edm::global::OutputModuleBase(iPSet), edm::global::OutputModule<>(iPSet) {}
0113     unsigned int m_count = 0;
0114 
0115     void write(edm::EventForOutput const&) override { ++m_count; }
0116     void writeRun(edm::RunForOutput const&) override { ++m_count; }
0117     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0118   };
0119 
0120   class FileOutputModule : public edm::global::OutputModule<edm::WatchInputFiles> {
0121   public:
0122     using edm::global::OutputModuleBase::doPreallocate;
0123     FileOutputModule(edm::ParameterSet const& iPSet)
0124         : edm::global::OutputModuleBase(iPSet), edm::global::OutputModule<edm::WatchInputFiles>(iPSet) {}
0125     unsigned int m_count = 0;
0126     void write(edm::EventForOutput const&) override { ++m_count; }
0127     void writeRun(edm::RunForOutput const&) override { ++m_count; }
0128     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0129 
0130     void respondToOpenInputFile(edm::FileBlock const&) override { ++m_count; }
0131 
0132     void respondToCloseInputFile(edm::FileBlock const&) override { ++m_count; }
0133   };
0134 };
0135 
0136 namespace {
0137 
0138   edm::ActivityRegistry activityRegistry;
0139 
0140   struct ShadowStreamID {
0141     constexpr ShadowStreamID() : value(0) {}
0142     unsigned int value;
0143   };
0144 
0145   union IDUnion {
0146     IDUnion() : m_shadow() {}
0147     ShadowStreamID m_shadow;
0148     edm::StreamID m_id;
0149   };
0150 }  // namespace
0151 static edm::StreamID makeID() {
0152   IDUnion u;
0153   assert(u.m_id.value() == 0);
0154   return u.m_id;
0155 }
0156 static const edm::StreamID s_streamID0 = makeID();
0157 
0158 ///registration of the test so that the runner can find it
0159 CPPUNIT_TEST_SUITE_REGISTRATION(testGlobalOutputModule);
0160 
0161 testGlobalOutputModule::testGlobalOutputModule()
0162     : m_prodReg(new edm::ProductRegistry{}),
0163       m_idHelper(new edm::BranchIDListHelper{}),
0164       m_associationsHelper(new edm::ThinnedAssociationsHelper{}),
0165       m_ep() {
0166   //Setup the principals
0167   m_prodReg->setFrozen();
0168   m_idHelper->updateFromRegistry(*m_prodReg);
0169   edm::EventID eventID = edm::EventID::firstValidEvent();
0170 
0171   std::string uuid = edm::createGlobalIdentifier();
0172   edm::Timestamp now(1234567UL);
0173   m_rp.reset(
0174       new edm::RunPrincipal(m_prodReg, edm::productResolversFactory::makePrimary, m_procConfig, &historyAppender_, 0));
0175   m_rp->setAux(edm::RunAuxiliary(eventID.run(), now, now));
0176   edm::LuminosityBlockAuxiliary lumiAux(m_rp->run(), 1, now, now);
0177   m_lbp.reset(new edm::LuminosityBlockPrincipal(
0178       m_prodReg, edm::productResolversFactory::makePrimary, m_procConfig, &historyAppender_, 0));
0179   m_lbp->setAux(lumiAux);
0180   m_lbp->setRunPrincipal(m_rp);
0181   edm::EventAuxiliary eventAux(eventID, uuid, now, true);
0182 
0183   m_ep.reset(new edm::EventPrincipal(
0184       m_prodReg, edm::productResolversFactory::makePrimary, m_idHelper, m_associationsHelper, m_procConfig, nullptr));
0185   m_ep->fillEventPrincipal(eventAux, nullptr);
0186   m_ep->setLuminosityBlockPrincipal(m_lbp.get());
0187   m_actReg.reset(new edm::ActivityRegistry);
0188 
0189   //For each transition, bind a lambda which will call the proper method of the Worker
0190   m_transToFunc[Trans::kGlobalOpenInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0191     edm::FileBlock fb;
0192     iBase->respondToOpenInputFile(fb);
0193   };
0194 
0195   m_transToFunc[Trans::kGlobalBeginRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0196     typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalBegin> Traits;
0197     edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginRun, nullptr);
0198     edm::ParentContext parentContext(&gc);
0199     iBase->setActivityRegistry(m_actReg);
0200     edm::RunTransitionInfo info(*m_rp, *m_es);
0201     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0202   };
0203 
0204   m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0205     typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalBegin> Traits;
0206     edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginLuminosityBlock, nullptr);
0207     edm::ParentContext parentContext(&gc);
0208     iBase->setActivityRegistry(m_actReg);
0209     edm::LumiTransitionInfo info(*m_lbp, *m_es);
0210     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0211   };
0212 
0213   m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0214     typedef edm::OccurrenceTraits<edm::EventPrincipal, edm::BranchActionStreamBegin> Traits;
0215     edm::StreamContext streamContext(s_streamID0, nullptr);
0216     edm::ParentContext parentContext(&streamContext);
0217     iBase->setActivityRegistry(m_actReg);
0218     edm::EventTransitionInfo info(*m_ep, *m_es);
0219     doWork<Traits>(iBase, info, s_streamID0, parentContext);
0220   };
0221 
0222   m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0223     typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalEnd> Traits;
0224     edm::GlobalContext gc(edm::GlobalContext::Transition::kEndLuminosityBlock, nullptr);
0225     edm::ParentContext parentContext(&gc);
0226     iBase->setActivityRegistry(m_actReg);
0227     edm::LumiTransitionInfo info(*m_lbp, *m_es);
0228     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0229     oneapi::tbb::task_group group;
0230     edm::FinalWaitingTask task{group};
0231     iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry);
0232     task.wait();
0233   };
0234 
0235   m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0236     typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalEnd> Traits;
0237     edm::GlobalContext gc(edm::GlobalContext::Transition::kEndRun, nullptr);
0238     edm::ParentContext parentContext(&gc);
0239     iBase->setActivityRegistry(m_actReg);
0240     edm::RunTransitionInfo info(*m_rp, *m_es);
0241     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0242     oneapi::tbb::task_group group;
0243     edm::FinalWaitingTask task{group};
0244     iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr);
0245     task.wait();
0246   };
0247 
0248   m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0249     edm::FileBlock fb;
0250     iBase->respondToCloseInputFile(fb);
0251   };
0252 
0253   // We want to create the TriggerNamesService because it is used in
0254   // the tests.  We do that here, but first we need to build a minimal
0255   // parameter set to pass to its constructor.  Then we build the
0256   // service and setup the service system.
0257   edm::ParameterSet proc_pset;
0258 
0259   std::string processName("HLT");
0260   proc_pset.addParameter<std::string>("@process_name", processName);
0261 
0262   std::vector<std::string> paths;
0263   edm::ParameterSet trigPaths;
0264   trigPaths.addParameter<std::vector<std::string>>("@trigger_paths", paths);
0265   proc_pset.addParameter<edm::ParameterSet>("@trigger_paths", trigPaths);
0266 
0267   std::vector<std::string> endPaths;
0268   proc_pset.addParameter<std::vector<std::string>>("@end_paths", endPaths);
0269 
0270   // Now create and setup the service
0271   tnsptr_.reset(new w_TNS(std::make_unique<TNS>(proc_pset)));
0272 
0273   serviceToken_ = edm::ServiceRegistry::createContaining(tnsptr_);
0274 }
0275 
0276 namespace {
0277   template <typename T>
0278   void testTransition(std::shared_ptr<T> iMod,
0279                       edm::Worker* iWorker,
0280                       edm::OutputModuleCommunicator* iComm,
0281                       testGlobalOutputModule::Trans iTrans,
0282                       testGlobalOutputModule::Expectations const& iExpect,
0283                       std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)> iFunc) {
0284     assert(0 == iMod->m_count);
0285     iFunc(iWorker, iComm);
0286     auto count = std::count(iExpect.begin(), iExpect.end(), iTrans);
0287     if (count != iMod->m_count) {
0288       std::cout << "For trans " << static_cast<std::underlying_type<testGlobalOutputModule::Trans>::type>(iTrans)
0289                 << " expected " << count << " and got " << iMod->m_count << std::endl;
0290     }
0291     CPPUNIT_ASSERT(iMod->m_count == count);
0292     iMod->m_count = 0;
0293     iWorker->reset();
0294   }
0295 }  // namespace
0296 
0297 template <typename T>
0298 void testGlobalOutputModule::testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect) {
0299   oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 1);
0300 
0301   oneapi::tbb::task_arena arena(1);
0302   arena.execute([&]() {
0303     iMod->doPreallocate(m_preallocConfig);
0304     edm::WorkerT<edm::global::OutputModuleBase> w{iMod, m_desc, m_params.actions_};
0305     edm::OutputModuleCommunicatorT<edm::global::OutputModuleBase> comm(iMod.get());
0306     for (auto& keyVal : m_transToFunc) {
0307       testTransition(iMod, &w, &comm, keyVal.first, iExpect, keyVal.second);
0308     }
0309   });
0310 }
0311 
0312 void testGlobalOutputModule::basicTest() {
0313   //make the services available
0314   edm::ServiceRegistry::Operate operate(serviceToken_);
0315 
0316   edm::ParameterSet pset;
0317   auto testProd = std::make_shared<BasicOutputModule>(pset);
0318 
0319   CPPUNIT_ASSERT(0 == testProd->m_count);
0320   testTransitions(testProd, {Trans::kEvent, Trans::kGlobalEndLuminosityBlock, Trans::kGlobalEndRun});
0321 }
0322 
0323 void testGlobalOutputModule::fileTest() {
0324   //make the services available
0325   edm::ServiceRegistry::Operate operate(serviceToken_);
0326 
0327   edm::ParameterSet pset;
0328   auto testProd = std::make_shared<FileOutputModule>(pset);
0329 
0330   CPPUNIT_ASSERT(0 == testProd->m_count);
0331   testTransitions(testProd,
0332                   {Trans::kGlobalOpenInputFile,
0333                    Trans::kEvent,
0334                    Trans::kGlobalEndLuminosityBlock,
0335                    Trans::kGlobalEndRun,
0336                    Trans::kGlobalCloseInputFile});
0337 }