Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 11:02:43

0001 /*
0002  *  proxyfactoryproducer_t.cc
0003  *  EDMProto
0004  *
0005  */
0006 #include <iostream>
0007 #include <atomic>
0008 #include <vector>
0009 #include <map>
0010 #include <functional>
0011 #include "oneapi/tbb/global_control.h"
0012 #include "FWCore/Framework/interface/limited/OutputModule.h"
0013 #include "FWCore/Framework/interface/OutputModuleCommunicatorT.h"
0014 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0015 #include "FWCore/Framework/interface/maker/WorkerT.h"
0016 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0017 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0018 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0019 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0020 #include "FWCore/Framework/interface/HistoryAppender.h"
0021 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0022 #include "FWCore/Framework/interface/TriggerNamesService.h"
0023 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0024 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0025 #include "FWCore/ServiceRegistry/interface/Service.h"
0026 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0027 #include "FWCore/Framework/interface/FileBlock.h"
0028 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0031 
0032 #include "FWCore/Utilities/interface/Exception.h"
0033 
0034 #include "cppunit/extensions/HelperMacros.h"
0035 
0036 namespace edm {
0037   class ModuleCallingContext;
0038 }
0039 
0040 class testLimitedOutputModule : public CppUnit::TestFixture {
0041   CPPUNIT_TEST_SUITE(testLimitedOutputModule);
0042 
0043   CPPUNIT_TEST(basicTest);
0044   CPPUNIT_TEST(fileTest);
0045 
0046   CPPUNIT_TEST_SUITE_END();
0047 
0048 public:
0049   testLimitedOutputModule();
0050 
0051   void setUp() {}
0052   void tearDown() {}
0053 
0054   void basicTest();
0055   void fileTest();
0056 
0057   enum class Trans {
0058     kBeginJob,
0059     kGlobalOpenInputFile,
0060     kGlobalBeginRun,
0061     kGlobalBeginRunProduce,
0062     kGlobalBeginLuminosityBlock,
0063     kEvent,
0064     kGlobalEndLuminosityBlock,
0065     kGlobalEndRun,
0066     kGlobalCloseInputFile,
0067     kEndJob
0068   };
0069 
0070   typedef std::vector<Trans> Expectations;
0071 
0072 private:
0073   std::map<Trans, std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)>> m_transToFunc;
0074 
0075   edm::ProcessConfiguration m_procConfig;
0076   edm::PreallocationConfiguration m_preallocConfig;
0077   std::shared_ptr<edm::ProductRegistry> m_prodReg;
0078   std::shared_ptr<edm::BranchIDListHelper> m_idHelper;
0079   std::shared_ptr<edm::ThinnedAssociationsHelper> m_associationsHelper;
0080   std::unique_ptr<edm::EventPrincipal> m_ep;
0081   edm::HistoryAppender historyAppender_;
0082   std::shared_ptr<edm::LuminosityBlockPrincipal> m_lbp;
0083   std::shared_ptr<edm::RunPrincipal> m_rp;
0084   std::shared_ptr<edm::ActivityRegistry>
0085       m_actReg;  // We do not use propagate_const because the registry itself is mutable.
0086   edm::EventSetupImpl* m_es = nullptr;
0087   edm::ModuleDescription m_desc = {"Dummy", "dummy"};
0088   edm::WorkerParams m_params;
0089 
0090   typedef edm::service::TriggerNamesService TNS;
0091   typedef edm::serviceregistry::ServiceWrapper<TNS> w_TNS;
0092   std::shared_ptr<w_TNS> tnsptr_;
0093   edm::ServiceToken serviceToken_;
0094 
0095   template <typename T>
0096   void testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect);
0097 
0098   template <typename Traits, typename Info>
0099   void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) {
0100     oneapi::tbb::task_group group;
0101     edm::FinalWaitingTask task{group};
0102     edm::ServiceToken token;
0103     iBase->doWorkAsync<Traits>(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr);
0104     task.wait();
0105   }
0106 
0107   class BasicOutputModule : public edm::limited::OutputModule<> {
0108   public:
0109     using edm::limited::OutputModuleBase::doPreallocate;
0110     BasicOutputModule(edm::ParameterSet const& iPSet)
0111         : edm::limited::OutputModuleBase(iPSet), edm::limited::OutputModule<>(iPSet) {}
0112     std::atomic<unsigned int> m_count{0};
0113 
0114     void write(edm::EventForOutput const&) override { ++m_count; }
0115     void writeRun(edm::RunForOutput const&) override { ++m_count; }
0116     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0117   };
0118 
0119   class FileOutputModule : public edm::limited::OutputModule<edm::WatchInputFiles> {
0120   public:
0121     using edm::limited::OutputModuleBase::doPreallocate;
0122     FileOutputModule(edm::ParameterSet const& iPSet)
0123         : edm::limited::OutputModuleBase(iPSet), edm::limited::OutputModule<edm::WatchInputFiles>(iPSet) {}
0124     std::atomic<unsigned int> m_count{0};
0125     void write(edm::EventForOutput const&) override { ++m_count; }
0126     void writeRun(edm::RunForOutput const&) override { ++m_count; }
0127     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0128 
0129     void respondToOpenInputFile(edm::FileBlock const&) override { ++m_count; }
0130 
0131     void respondToCloseInputFile(edm::FileBlock const&) override { ++m_count; }
0132   };
0133 };
0134 
0135 namespace {
0136 
0137   edm::ActivityRegistry activityRegistry;
0138 
0139   struct ShadowStreamID {
0140     constexpr ShadowStreamID() : value(0) {}
0141     unsigned int value;
0142   };
0143 
0144   union IDUnion {
0145     IDUnion() : m_shadow() {}
0146     ShadowStreamID m_shadow;
0147     edm::StreamID m_id;
0148   };
0149 }  // namespace
0150 static edm::StreamID makeID() {
0151   IDUnion u;
0152   assert(u.m_id.value() == 0);
0153   return u.m_id;
0154 }
0155 static const edm::StreamID s_streamID0 = makeID();
0156 
0157 ///registration of the test so that the runner can find it
0158 CPPUNIT_TEST_SUITE_REGISTRATION(testLimitedOutputModule);
0159 
0160 testLimitedOutputModule::testLimitedOutputModule()
0161     : m_prodReg(new edm::ProductRegistry{}),
0162       m_idHelper(new edm::BranchIDListHelper{}),
0163       m_associationsHelper(new edm::ThinnedAssociationsHelper{}),
0164       m_ep() {
0165   //Setup the principals
0166   m_prodReg->setFrozen();
0167   m_idHelper->updateFromRegistry(*m_prodReg);
0168   edm::EventID eventID = edm::EventID::firstValidEvent();
0169 
0170   std::string uuid = edm::createGlobalIdentifier();
0171   edm::Timestamp now(1234567UL);
0172   m_rp.reset(new edm::RunPrincipal(m_prodReg, m_procConfig, &historyAppender_, 0));
0173   m_rp->setAux(edm::RunAuxiliary(eventID.run(), now, now));
0174   m_lbp.reset(new edm::LuminosityBlockPrincipal(m_prodReg, m_procConfig, &historyAppender_, 0));
0175   m_lbp->setAux(edm::LuminosityBlockAuxiliary(m_rp->run(), 1, now, now));
0176   m_lbp->setRunPrincipal(m_rp);
0177   edm::EventAuxiliary eventAux(eventID, uuid, now, true);
0178 
0179   m_ep.reset(new edm::EventPrincipal(m_prodReg, m_idHelper, m_associationsHelper, m_procConfig, nullptr));
0180   m_ep->fillEventPrincipal(eventAux, nullptr);
0181   m_ep->setLuminosityBlockPrincipal(m_lbp.get());
0182   m_actReg.reset(new edm::ActivityRegistry);
0183 
0184   //For each transition, bind a lambda which will call the proper method of the Worker
0185   m_transToFunc[Trans::kGlobalOpenInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0186     edm::FileBlock fb;
0187     iBase->respondToOpenInputFile(fb);
0188   };
0189 
0190   m_transToFunc[Trans::kGlobalBeginRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0191     typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalBegin> Traits;
0192     edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginRun, nullptr);
0193     edm::ParentContext parentContext(&gc);
0194     iBase->setActivityRegistry(m_actReg);
0195     edm::RunTransitionInfo info(*m_rp, *m_es);
0196     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0197   };
0198 
0199   m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0200     typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalBegin> Traits;
0201     edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginLuminosityBlock, nullptr);
0202     edm::ParentContext parentContext(&gc);
0203     iBase->setActivityRegistry(m_actReg);
0204     edm::LumiTransitionInfo info(*m_lbp, *m_es);
0205     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0206   };
0207 
0208   m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0209     typedef edm::OccurrenceTraits<edm::EventPrincipal, edm::BranchActionStreamBegin> Traits;
0210     edm::StreamContext streamContext(s_streamID0, nullptr);
0211     edm::ParentContext parentContext(&streamContext);
0212     iBase->setActivityRegistry(m_actReg);
0213     edm::EventTransitionInfo info(*m_ep, *m_es);
0214     doWork<Traits>(iBase, info, s_streamID0, parentContext);
0215   };
0216 
0217   m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0218     typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalEnd> Traits;
0219     edm::GlobalContext gc(edm::GlobalContext::Transition::kEndLuminosityBlock, nullptr);
0220     edm::ParentContext parentContext(&gc);
0221     iBase->setActivityRegistry(m_actReg);
0222     edm::LumiTransitionInfo info(*m_lbp, *m_es);
0223     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0224     oneapi::tbb::task_group group;
0225     edm::FinalWaitingTask task{group};
0226     iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry);
0227     task.wait();
0228   };
0229 
0230   m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0231     typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalEnd> Traits;
0232     edm::GlobalContext gc(edm::GlobalContext::Transition::kEndRun, nullptr);
0233     edm::ParentContext parentContext(&gc);
0234     iBase->setActivityRegistry(m_actReg);
0235     edm::RunTransitionInfo info(*m_rp, *m_es);
0236     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0237     oneapi::tbb::task_group group;
0238     edm::FinalWaitingTask task{group};
0239     iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr);
0240     task.wait();
0241   };
0242 
0243   m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0244     edm::FileBlock fb;
0245     iBase->respondToCloseInputFile(fb);
0246   };
0247 
0248   // We want to create the TriggerNamesService because it is used in
0249   // the tests.  We do that here, but first we need to build a minimal
0250   // parameter set to pass to its constructor.  Then we build the
0251   // service and setup the service system.
0252   edm::ParameterSet proc_pset;
0253 
0254   std::string processName("HLT");
0255   proc_pset.addParameter<std::string>("@process_name", processName);
0256 
0257   std::vector<std::string> paths;
0258   edm::ParameterSet trigPaths;
0259   trigPaths.addParameter<std::vector<std::string>>("@trigger_paths", paths);
0260   proc_pset.addParameter<edm::ParameterSet>("@trigger_paths", trigPaths);
0261 
0262   std::vector<std::string> endPaths;
0263   proc_pset.addParameter<std::vector<std::string>>("@end_paths", endPaths);
0264 
0265   // Now create and setup the service
0266   tnsptr_.reset(new w_TNS(std::make_unique<TNS>(proc_pset)));
0267 
0268   serviceToken_ = edm::ServiceRegistry::createContaining(tnsptr_);
0269 }
0270 
0271 namespace {
0272   template <typename T>
0273   void testTransition(std::shared_ptr<T> iMod,
0274                       edm::Worker* iWorker,
0275                       edm::OutputModuleCommunicator* iComm,
0276                       testLimitedOutputModule::Trans iTrans,
0277                       testLimitedOutputModule::Expectations const& iExpect,
0278                       std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)> iFunc) {
0279     assert(0 == iMod->m_count);
0280     iFunc(iWorker, iComm);
0281     auto count = std::count(iExpect.begin(), iExpect.end(), iTrans);
0282     if (count != iMod->m_count) {
0283       std::cout << "For trans " << static_cast<std::underlying_type<testLimitedOutputModule::Trans>::type>(iTrans)
0284                 << " expected " << count << " and got " << iMod->m_count << std::endl;
0285     }
0286     CPPUNIT_ASSERT(iMod->m_count == count);
0287     iMod->m_count = 0;
0288     iWorker->reset();
0289   }
0290 }  // namespace
0291 
0292 template <typename T>
0293 void testLimitedOutputModule::testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect) {
0294   oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 1);
0295 
0296   iMod->doPreallocate(m_preallocConfig);
0297   edm::WorkerT<edm::limited::OutputModuleBase> w{iMod, m_desc, m_params.actions_};
0298   edm::OutputModuleCommunicatorT<edm::limited::OutputModuleBase> comm(iMod.get());
0299   for (auto& keyVal : m_transToFunc) {
0300     testTransition(iMod, &w, &comm, keyVal.first, iExpect, keyVal.second);
0301   }
0302 }
0303 
0304 void testLimitedOutputModule::basicTest() {
0305   //make the services available
0306   edm::ServiceRegistry::Operate operate(serviceToken_);
0307 
0308   edm::ParameterSet pset;
0309   const unsigned int kLimit = 1;
0310   pset.addUntrackedParameter("concurrencyLimit", kLimit);
0311   auto testProd = std::make_shared<BasicOutputModule>(pset);
0312 
0313   CPPUNIT_ASSERT(0 == testProd->m_count);
0314   testTransitions(testProd, {Trans::kEvent, Trans::kGlobalEndLuminosityBlock, Trans::kGlobalEndRun});
0315 }
0316 
0317 void testLimitedOutputModule::fileTest() {
0318   //make the services available
0319   edm::ServiceRegistry::Operate operate(serviceToken_);
0320 
0321   edm::ParameterSet pset;
0322   const unsigned int kLimit = 1;
0323   pset.addUntrackedParameter("concurrencyLimit", kLimit);
0324   auto testProd = std::make_shared<FileOutputModule>(pset);
0325 
0326   CPPUNIT_ASSERT(0 == testProd->m_count);
0327   testTransitions(testProd,
0328                   {Trans::kGlobalOpenInputFile,
0329                    Trans::kEvent,
0330                    Trans::kGlobalEndLuminosityBlock,
0331                    Trans::kGlobalEndRun,
0332                    Trans::kGlobalCloseInputFile});
0333 }