Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-14 02:38:43

0001 /*
0002  *  proxyfactoryproducer_t.cc
0003  *  EDMProto
0004  *
0005  */
0006 #include <iostream>
0007 #include <atomic>
0008 #include <vector>
0009 #include <map>
0010 #include <functional>
0011 #include "oneapi/tbb/global_control.h"
0012 #include "FWCore/Framework/interface/limited/OutputModule.h"
0013 #include "FWCore/Framework/interface/OutputModuleCommunicatorT.h"
0014 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0015 #include "FWCore/Framework/interface/maker/WorkerT.h"
0016 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0017 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0018 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0019 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0020 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0021 #include "FWCore/Framework/interface/HistoryAppender.h"
0022 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0023 #include "FWCore/Framework/interface/TriggerNamesService.h"
0024 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0025 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0026 #include "FWCore/ServiceRegistry/interface/Service.h"
0027 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0028 #include "FWCore/Framework/interface/FileBlock.h"
0029 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0030 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0031 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0032 
0033 #include "FWCore/Utilities/interface/Exception.h"
0034 
0035 #include "cppunit/extensions/HelperMacros.h"
0036 
0037 namespace edm {
0038   class ModuleCallingContext;
0039 }
0040 
0041 class testLimitedOutputModule : public CppUnit::TestFixture {
0042   CPPUNIT_TEST_SUITE(testLimitedOutputModule);
0043 
0044   CPPUNIT_TEST(basicTest);
0045   CPPUNIT_TEST(fileTest);
0046 
0047   CPPUNIT_TEST_SUITE_END();
0048 
0049 public:
0050   testLimitedOutputModule();
0051 
0052   void setUp() {}
0053   void tearDown() {}
0054 
0055   void basicTest();
0056   void fileTest();
0057 
0058   enum class Trans {
0059     kBeginJob,
0060     kGlobalOpenInputFile,
0061     kGlobalBeginRun,
0062     kGlobalBeginRunProduce,
0063     kGlobalBeginLuminosityBlock,
0064     kEvent,
0065     kGlobalEndLuminosityBlock,
0066     kGlobalEndRun,
0067     kGlobalCloseInputFile,
0068     kEndJob
0069   };
0070 
0071   typedef std::vector<Trans> Expectations;
0072 
0073 private:
0074   std::map<Trans, std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)>> m_transToFunc;
0075 
0076   edm::ProcessConfiguration m_procConfig;
0077   edm::PreallocationConfiguration m_preallocConfig;
0078   std::shared_ptr<edm::ProductRegistry> m_prodReg;
0079   std::shared_ptr<edm::BranchIDListHelper> m_idHelper;
0080   std::shared_ptr<edm::ThinnedAssociationsHelper> m_associationsHelper;
0081   std::unique_ptr<edm::EventPrincipal> m_ep;
0082   edm::HistoryAppender historyAppender_;
0083   std::shared_ptr<edm::LuminosityBlockPrincipal> m_lbp;
0084   std::shared_ptr<edm::RunPrincipal> m_rp;
0085   std::shared_ptr<edm::ActivityRegistry>
0086       m_actReg;  // We do not use propagate_const because the registry itself is mutable.
0087   edm::EventSetupImpl* m_es = nullptr;
0088   edm::ModuleDescription m_desc = {"Dummy", "dummy"};
0089   edm::WorkerParams m_params;
0090 
0091   typedef edm::service::TriggerNamesService TNS;
0092   typedef edm::serviceregistry::ServiceWrapper<TNS> w_TNS;
0093   std::shared_ptr<w_TNS> tnsptr_;
0094   edm::ServiceToken serviceToken_;
0095 
0096   template <typename T>
0097   void testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect);
0098 
0099   template <typename Traits, typename Info>
0100   void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) {
0101     oneapi::tbb::task_group group;
0102     edm::FinalWaitingTask task{group};
0103     edm::ServiceToken token;
0104     iBase->doWorkAsync<Traits>(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr);
0105     task.wait();
0106   }
0107 
0108   class BasicOutputModule : public edm::limited::OutputModule<> {
0109   public:
0110     using edm::limited::OutputModuleBase::doPreallocate;
0111     BasicOutputModule(edm::ParameterSet const& iPSet)
0112         : edm::limited::OutputModuleBase(iPSet), edm::limited::OutputModule<>(iPSet) {}
0113     std::atomic<unsigned int> m_count{0};
0114 
0115     void write(edm::EventForOutput const&) override { ++m_count; }
0116     void writeRun(edm::RunForOutput const&) override { ++m_count; }
0117     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0118   };
0119 
0120   class FileOutputModule : public edm::limited::OutputModule<edm::WatchInputFiles> {
0121   public:
0122     using edm::limited::OutputModuleBase::doPreallocate;
0123     FileOutputModule(edm::ParameterSet const& iPSet)
0124         : edm::limited::OutputModuleBase(iPSet), edm::limited::OutputModule<edm::WatchInputFiles>(iPSet) {}
0125     std::atomic<unsigned int> m_count{0};
0126     void write(edm::EventForOutput const&) override { ++m_count; }
0127     void writeRun(edm::RunForOutput const&) override { ++m_count; }
0128     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0129 
0130     void respondToOpenInputFile(edm::FileBlock const&) override { ++m_count; }
0131 
0132     void respondToCloseInputFile(edm::FileBlock const&) override { ++m_count; }
0133   };
0134 };
0135 
0136 namespace {
0137 
0138   edm::ActivityRegistry activityRegistry;
0139 
0140   struct ShadowStreamID {
0141     constexpr ShadowStreamID() : value(0) {}
0142     unsigned int value;
0143   };
0144 
0145   union IDUnion {
0146     IDUnion() : m_shadow() {}
0147     ShadowStreamID m_shadow;
0148     edm::StreamID m_id;
0149   };
0150 }  // namespace
0151 static edm::StreamID makeID() {
0152   IDUnion u;
0153   assert(u.m_id.value() == 0);
0154   return u.m_id;
0155 }
0156 static const edm::StreamID s_streamID0 = makeID();
0157 
0158 ///registration of the test so that the runner can find it
0159 CPPUNIT_TEST_SUITE_REGISTRATION(testLimitedOutputModule);
0160 
0161 testLimitedOutputModule::testLimitedOutputModule()
0162     : m_prodReg(new edm::ProductRegistry{}),
0163       m_idHelper(new edm::BranchIDListHelper{}),
0164       m_associationsHelper(new edm::ThinnedAssociationsHelper{}),
0165       m_ep() {
0166   //Setup the principals
0167   m_prodReg->setFrozen();
0168   m_idHelper->updateFromRegistry(*m_prodReg);
0169   edm::EventID eventID = edm::EventID::firstValidEvent();
0170 
0171   std::string uuid = edm::createGlobalIdentifier();
0172   edm::Timestamp now(1234567UL);
0173   m_rp.reset(
0174       new edm::RunPrincipal(m_prodReg, edm::productResolversFactory::makePrimary, m_procConfig, &historyAppender_, 0));
0175   m_rp->setAux(edm::RunAuxiliary(eventID.run(), now, now));
0176   m_lbp.reset(new edm::LuminosityBlockPrincipal(
0177       m_prodReg, edm::productResolversFactory::makePrimary, m_procConfig, &historyAppender_, 0));
0178   m_lbp->setAux(edm::LuminosityBlockAuxiliary(m_rp->run(), 1, now, now));
0179   m_lbp->setRunPrincipal(m_rp);
0180   edm::EventAuxiliary eventAux(eventID, uuid, now, true);
0181 
0182   m_ep.reset(new edm::EventPrincipal(
0183       m_prodReg, edm::productResolversFactory::makePrimary, m_idHelper, m_associationsHelper, m_procConfig, nullptr));
0184   m_ep->fillEventPrincipal(eventAux, nullptr);
0185   m_ep->setLuminosityBlockPrincipal(m_lbp.get());
0186   m_actReg.reset(new edm::ActivityRegistry);
0187 
0188   //For each transition, bind a lambda which will call the proper method of the Worker
0189   m_transToFunc[Trans::kGlobalOpenInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0190     edm::FileBlock fb;
0191     iBase->respondToOpenInputFile(fb);
0192   };
0193 
0194   m_transToFunc[Trans::kGlobalBeginRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0195     typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalBegin> Traits;
0196     edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginRun, nullptr);
0197     edm::ParentContext parentContext(&gc);
0198     iBase->setActivityRegistry(m_actReg);
0199     edm::RunTransitionInfo info(*m_rp, *m_es);
0200     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0201   };
0202 
0203   m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0204     typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalBegin> Traits;
0205     edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginLuminosityBlock, nullptr);
0206     edm::ParentContext parentContext(&gc);
0207     iBase->setActivityRegistry(m_actReg);
0208     edm::LumiTransitionInfo info(*m_lbp, *m_es);
0209     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0210   };
0211 
0212   m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0213     typedef edm::OccurrenceTraits<edm::EventPrincipal, edm::BranchActionStreamBegin> Traits;
0214     edm::StreamContext streamContext(s_streamID0, nullptr);
0215     edm::ParentContext parentContext(&streamContext);
0216     iBase->setActivityRegistry(m_actReg);
0217     edm::EventTransitionInfo info(*m_ep, *m_es);
0218     doWork<Traits>(iBase, info, s_streamID0, parentContext);
0219   };
0220 
0221   m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0222     typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalEnd> Traits;
0223     edm::GlobalContext gc(edm::GlobalContext::Transition::kEndLuminosityBlock, nullptr);
0224     edm::ParentContext parentContext(&gc);
0225     iBase->setActivityRegistry(m_actReg);
0226     edm::LumiTransitionInfo info(*m_lbp, *m_es);
0227     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0228     oneapi::tbb::task_group group;
0229     edm::FinalWaitingTask task{group};
0230     iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry);
0231     task.wait();
0232   };
0233 
0234   m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0235     typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalEnd> Traits;
0236     edm::GlobalContext gc(edm::GlobalContext::Transition::kEndRun, nullptr);
0237     edm::ParentContext parentContext(&gc);
0238     iBase->setActivityRegistry(m_actReg);
0239     edm::RunTransitionInfo info(*m_rp, *m_es);
0240     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0241     oneapi::tbb::task_group group;
0242     edm::FinalWaitingTask task{group};
0243     iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr);
0244     task.wait();
0245   };
0246 
0247   m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0248     edm::FileBlock fb;
0249     iBase->respondToCloseInputFile(fb);
0250   };
0251 
0252   // We want to create the TriggerNamesService because it is used in
0253   // the tests.  We do that here, but first we need to build a minimal
0254   // parameter set to pass to its constructor.  Then we build the
0255   // service and setup the service system.
0256   edm::ParameterSet proc_pset;
0257 
0258   std::string processName("HLT");
0259   proc_pset.addParameter<std::string>("@process_name", processName);
0260 
0261   std::vector<std::string> paths;
0262   edm::ParameterSet trigPaths;
0263   trigPaths.addParameter<std::vector<std::string>>("@trigger_paths", paths);
0264   proc_pset.addParameter<edm::ParameterSet>("@trigger_paths", trigPaths);
0265 
0266   std::vector<std::string> endPaths;
0267   proc_pset.addParameter<std::vector<std::string>>("@end_paths", endPaths);
0268 
0269   // Now create and setup the service
0270   tnsptr_.reset(new w_TNS(std::make_unique<TNS>(proc_pset)));
0271 
0272   serviceToken_ = edm::ServiceRegistry::createContaining(tnsptr_);
0273 }
0274 
0275 namespace {
0276   template <typename T>
0277   void testTransition(std::shared_ptr<T> iMod,
0278                       edm::Worker* iWorker,
0279                       edm::OutputModuleCommunicator* iComm,
0280                       testLimitedOutputModule::Trans iTrans,
0281                       testLimitedOutputModule::Expectations const& iExpect,
0282                       std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)> iFunc) {
0283     assert(0 == iMod->m_count);
0284     iFunc(iWorker, iComm);
0285     auto count = std::count(iExpect.begin(), iExpect.end(), iTrans);
0286     if (count != iMod->m_count) {
0287       std::cout << "For trans " << static_cast<std::underlying_type<testLimitedOutputModule::Trans>::type>(iTrans)
0288                 << " expected " << count << " and got " << iMod->m_count << std::endl;
0289     }
0290     CPPUNIT_ASSERT(iMod->m_count == count);
0291     iMod->m_count = 0;
0292     iWorker->reset();
0293   }
0294 }  // namespace
0295 
0296 template <typename T>
0297 void testLimitedOutputModule::testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect) {
0298   oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 1);
0299 
0300   iMod->doPreallocate(m_preallocConfig);
0301   edm::WorkerT<edm::limited::OutputModuleBase> w{iMod, m_desc, m_params.actions_};
0302   edm::OutputModuleCommunicatorT<edm::limited::OutputModuleBase> comm(iMod.get());
0303   for (auto& keyVal : m_transToFunc) {
0304     testTransition(iMod, &w, &comm, keyVal.first, iExpect, keyVal.second);
0305   }
0306 }
0307 
0308 void testLimitedOutputModule::basicTest() {
0309   //make the services available
0310   edm::ServiceRegistry::Operate operate(serviceToken_);
0311 
0312   edm::ParameterSet pset;
0313   const unsigned int kLimit = 1;
0314   pset.addUntrackedParameter("concurrencyLimit", kLimit);
0315   auto testProd = std::make_shared<BasicOutputModule>(pset);
0316 
0317   CPPUNIT_ASSERT(0 == testProd->m_count);
0318   testTransitions(testProd, {Trans::kEvent, Trans::kGlobalEndLuminosityBlock, Trans::kGlobalEndRun});
0319 }
0320 
0321 void testLimitedOutputModule::fileTest() {
0322   //make the services available
0323   edm::ServiceRegistry::Operate operate(serviceToken_);
0324 
0325   edm::ParameterSet pset;
0326   const unsigned int kLimit = 1;
0327   pset.addUntrackedParameter("concurrencyLimit", kLimit);
0328   auto testProd = std::make_shared<FileOutputModule>(pset);
0329 
0330   CPPUNIT_ASSERT(0 == testProd->m_count);
0331   testTransitions(testProd,
0332                   {Trans::kGlobalOpenInputFile,
0333                    Trans::kEvent,
0334                    Trans::kGlobalEndLuminosityBlock,
0335                    Trans::kGlobalEndRun,
0336                    Trans::kGlobalCloseInputFile});
0337 }