Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-12-24 02:18:50

0001 /*
0002  *  proxyfactoryproducer_t.cc
0003  *  EDMProto
0004  *
0005  */
0006 #include <iostream>
0007 #include <atomic>
0008 #include <vector>
0009 #include <map>
0010 #include <functional>
0011 #include "oneapi/tbb/global_control.h"
0012 #include "FWCore/Framework/interface/global/OutputModule.h"
0013 #include "FWCore/Framework/interface/OutputModuleCommunicatorT.h"
0014 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0015 #include "FWCore/Framework/interface/maker/WorkerT.h"
0016 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0017 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0018 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0019 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0020 #include "FWCore/Framework/interface/HistoryAppender.h"
0021 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0022 #include "FWCore/Framework/interface/TriggerNamesService.h"
0023 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0024 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0025 #include "FWCore/ServiceRegistry/interface/Service.h"
0026 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0027 #include "FWCore/Framework/interface/FileBlock.h"
0028 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 
0031 #include "FWCore/Utilities/interface/Exception.h"
0032 
0033 #include "cppunit/extensions/HelperMacros.h"
0034 
0035 namespace edm {
0036   class ModuleCallingContext;
0037 }
0038 
0039 class testGlobalOutputModule : public CppUnit::TestFixture {
0040   CPPUNIT_TEST_SUITE(testGlobalOutputModule);
0041 
0042   CPPUNIT_TEST(basicTest);
0043   CPPUNIT_TEST(fileTest);
0044 
0045   CPPUNIT_TEST_SUITE_END();
0046 
0047 public:
0048   testGlobalOutputModule();
0049 
0050   void setUp() {}
0051   void tearDown() {}
0052 
0053   void basicTest();
0054   void fileTest();
0055 
0056   enum class Trans {
0057     kBeginJob,
0058     kGlobalOpenInputFile,
0059     kGlobalBeginRun,
0060     kGlobalBeginRunProduce,
0061     kGlobalBeginLuminosityBlock,
0062     kEvent,
0063     kGlobalEndLuminosityBlock,
0064     kGlobalEndRun,
0065     kGlobalCloseInputFile,
0066     kEndJob
0067   };
0068 
0069   typedef std::vector<Trans> Expectations;
0070 
0071 private:
0072   std::map<Trans, std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)>> m_transToFunc;
0073 
0074   edm::ProcessConfiguration m_procConfig;
0075   edm::PreallocationConfiguration m_preallocConfig;
0076   std::shared_ptr<edm::ProductRegistry> m_prodReg;
0077   std::shared_ptr<edm::BranchIDListHelper> m_idHelper;
0078   std::shared_ptr<edm::ThinnedAssociationsHelper> m_associationsHelper;
0079   std::unique_ptr<edm::EventPrincipal> m_ep;
0080   edm::HistoryAppender historyAppender_;
0081   std::shared_ptr<edm::LuminosityBlockPrincipal> m_lbp;
0082   std::shared_ptr<edm::RunPrincipal> m_rp;
0083   std::shared_ptr<edm::ActivityRegistry>
0084       m_actReg;  // We do not use propagate_const because the registry itself is mutable.
0085   edm::EventSetupImpl* m_es = nullptr;
0086   edm::ModuleDescription m_desc = {"Dummy", "dummy"};
0087   edm::WorkerParams m_params;
0088 
0089   typedef edm::service::TriggerNamesService TNS;
0090   typedef edm::serviceregistry::ServiceWrapper<TNS> w_TNS;
0091   std::shared_ptr<w_TNS> tnsptr_;
0092   edm::ServiceToken serviceToken_;
0093 
0094   template <typename T>
0095   void testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect);
0096 
0097   template <typename Traits, typename Info>
0098   void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) {
0099     edm::FinalWaitingTask task;
0100     oneapi::tbb::task_group group;
0101     edm::ServiceToken token;
0102     iBase->doWorkAsync<Traits>(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr);
0103     do {
0104       group.wait();
0105     } while (not task.done());
0106     if (auto e = task.exceptionPtr()) {
0107       std::rethrow_exception(*e);
0108     }
0109   }
0110 
0111   class BasicOutputModule : public edm::global::OutputModule<> {
0112   public:
0113     using edm::global::OutputModuleBase::doPreallocate;
0114     BasicOutputModule(edm::ParameterSet const& iPSet)
0115         : edm::global::OutputModuleBase(iPSet), edm::global::OutputModule<>(iPSet) {}
0116     unsigned int m_count = 0;
0117 
0118     void write(edm::EventForOutput const&) override { ++m_count; }
0119     void writeRun(edm::RunForOutput const&) override { ++m_count; }
0120     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0121   };
0122 
0123   class FileOutputModule : public edm::global::OutputModule<edm::WatchInputFiles> {
0124   public:
0125     using edm::global::OutputModuleBase::doPreallocate;
0126     FileOutputModule(edm::ParameterSet const& iPSet)
0127         : edm::global::OutputModuleBase(iPSet), edm::global::OutputModule<edm::WatchInputFiles>(iPSet) {}
0128     unsigned int m_count = 0;
0129     void write(edm::EventForOutput const&) override { ++m_count; }
0130     void writeRun(edm::RunForOutput const&) override { ++m_count; }
0131     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0132 
0133     void respondToOpenInputFile(edm::FileBlock const&) override { ++m_count; }
0134 
0135     void respondToCloseInputFile(edm::FileBlock const&) override { ++m_count; }
0136   };
0137 };
0138 
0139 namespace {
0140 
0141   edm::ActivityRegistry activityRegistry;
0142 
0143   struct ShadowStreamID {
0144     constexpr ShadowStreamID() : value(0) {}
0145     unsigned int value;
0146   };
0147 
0148   union IDUnion {
0149     IDUnion() : m_shadow() {}
0150     ShadowStreamID m_shadow;
0151     edm::StreamID m_id;
0152   };
0153 }  // namespace
0154 static edm::StreamID makeID() {
0155   IDUnion u;
0156   assert(u.m_id.value() == 0);
0157   return u.m_id;
0158 }
0159 static const edm::StreamID s_streamID0 = makeID();
0160 
0161 ///registration of the test so that the runner can find it
0162 CPPUNIT_TEST_SUITE_REGISTRATION(testGlobalOutputModule);
0163 
0164 testGlobalOutputModule::testGlobalOutputModule()
0165     : m_prodReg(new edm::ProductRegistry{}),
0166       m_idHelper(new edm::BranchIDListHelper{}),
0167       m_associationsHelper(new edm::ThinnedAssociationsHelper{}),
0168       m_ep() {
0169   //Setup the principals
0170   m_prodReg->setFrozen();
0171   m_idHelper->updateFromRegistry(*m_prodReg);
0172   edm::EventID eventID = edm::EventID::firstValidEvent();
0173 
0174   std::string uuid = edm::createGlobalIdentifier();
0175   edm::Timestamp now(1234567UL);
0176   auto runAux = std::make_shared<edm::RunAuxiliary>(eventID.run(), now, now);
0177   m_rp.reset(new edm::RunPrincipal(runAux, m_prodReg, m_procConfig, &historyAppender_, 0));
0178   edm::LuminosityBlockAuxiliary lumiAux(m_rp->run(), 1, now, now);
0179   m_lbp.reset(new edm::LuminosityBlockPrincipal(m_prodReg, m_procConfig, &historyAppender_, 0));
0180   m_lbp->setAux(lumiAux);
0181   m_lbp->setRunPrincipal(m_rp);
0182   edm::EventAuxiliary eventAux(eventID, uuid, now, true);
0183 
0184   m_ep.reset(new edm::EventPrincipal(m_prodReg, m_idHelper, m_associationsHelper, m_procConfig, nullptr));
0185   m_ep->fillEventPrincipal(eventAux, nullptr);
0186   m_ep->setLuminosityBlockPrincipal(m_lbp.get());
0187   m_actReg.reset(new edm::ActivityRegistry);
0188 
0189   //For each transition, bind a lambda which will call the proper method of the Worker
0190   m_transToFunc[Trans::kGlobalOpenInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0191     edm::FileBlock fb;
0192     iBase->respondToOpenInputFile(fb);
0193   };
0194 
0195   m_transToFunc[Trans::kGlobalBeginRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0196     typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalBegin> Traits;
0197     edm::ParentContext parentContext;
0198     edm::RunTransitionInfo info(*m_rp, *m_es);
0199     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0200   };
0201 
0202   m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0203     typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalBegin> Traits;
0204     edm::ParentContext parentContext;
0205     edm::LumiTransitionInfo info(*m_lbp, *m_es);
0206     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0207   };
0208 
0209   m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0210     typedef edm::OccurrenceTraits<edm::EventPrincipal, edm::BranchActionStreamBegin> Traits;
0211     edm::StreamContext streamContext(s_streamID0, nullptr);
0212     edm::ParentContext parentContext(&streamContext);
0213     iBase->setActivityRegistry(m_actReg);
0214     edm::EventTransitionInfo info(*m_ep, *m_es);
0215     doWork<Traits>(iBase, info, s_streamID0, parentContext);
0216   };
0217 
0218   m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0219     typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalEnd> Traits;
0220     edm::ParentContext parentContext;
0221     edm::LumiTransitionInfo info(*m_lbp, *m_es);
0222     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0223     edm::FinalWaitingTask task;
0224     oneapi::tbb::task_group group;
0225     iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry);
0226     do {
0227       group.wait();
0228     } while (not task.done());
0229     if (task.exceptionPtr() != nullptr) {
0230       std::rethrow_exception(*task.exceptionPtr());
0231     }
0232   };
0233 
0234   m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0235     typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalEnd> Traits;
0236     edm::ParentContext parentContext;
0237     edm::RunTransitionInfo info(*m_rp, *m_es);
0238     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0239     edm::FinalWaitingTask task;
0240     oneapi::tbb::task_group group;
0241     iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr);
0242     do {
0243       group.wait();
0244     } while (not task.done());
0245     if (task.exceptionPtr() != nullptr) {
0246       std::rethrow_exception(*task.exceptionPtr());
0247     }
0248   };
0249 
0250   m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0251     edm::FileBlock fb;
0252     iBase->respondToCloseInputFile(fb);
0253   };
0254 
0255   // We want to create the TriggerNamesService because it is used in
0256   // the tests.  We do that here, but first we need to build a minimal
0257   // parameter set to pass to its constructor.  Then we build the
0258   // service and setup the service system.
0259   edm::ParameterSet proc_pset;
0260 
0261   std::string processName("HLT");
0262   proc_pset.addParameter<std::string>("@process_name", processName);
0263 
0264   std::vector<std::string> paths;
0265   edm::ParameterSet trigPaths;
0266   trigPaths.addParameter<std::vector<std::string>>("@trigger_paths", paths);
0267   proc_pset.addParameter<edm::ParameterSet>("@trigger_paths", trigPaths);
0268 
0269   std::vector<std::string> endPaths;
0270   proc_pset.addParameter<std::vector<std::string>>("@end_paths", endPaths);
0271 
0272   // Now create and setup the service
0273   tnsptr_.reset(new w_TNS(std::make_unique<TNS>(proc_pset)));
0274 
0275   serviceToken_ = edm::ServiceRegistry::createContaining(tnsptr_);
0276 }
0277 
0278 namespace {
0279   template <typename T>
0280   void testTransition(std::shared_ptr<T> iMod,
0281                       edm::Worker* iWorker,
0282                       edm::OutputModuleCommunicator* iComm,
0283                       testGlobalOutputModule::Trans iTrans,
0284                       testGlobalOutputModule::Expectations const& iExpect,
0285                       std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)> iFunc) {
0286     assert(0 == iMod->m_count);
0287     iFunc(iWorker, iComm);
0288     auto count = std::count(iExpect.begin(), iExpect.end(), iTrans);
0289     if (count != iMod->m_count) {
0290       std::cout << "For trans " << static_cast<std::underlying_type<testGlobalOutputModule::Trans>::type>(iTrans)
0291                 << " expected " << count << " and got " << iMod->m_count << std::endl;
0292     }
0293     CPPUNIT_ASSERT(iMod->m_count == count);
0294     iMod->m_count = 0;
0295     iWorker->reset();
0296   }
0297 }  // namespace
0298 
0299 template <typename T>
0300 void testGlobalOutputModule::testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect) {
0301   oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 1);
0302 
0303   iMod->doPreallocate(m_preallocConfig);
0304   edm::WorkerT<edm::global::OutputModuleBase> w{iMod, m_desc, m_params.actions_};
0305   edm::OutputModuleCommunicatorT<edm::global::OutputModuleBase> comm(iMod.get());
0306   for (auto& keyVal : m_transToFunc) {
0307     testTransition(iMod, &w, &comm, keyVal.first, iExpect, keyVal.second);
0308   }
0309 }
0310 
0311 void testGlobalOutputModule::basicTest() {
0312   //make the services available
0313   edm::ServiceRegistry::Operate operate(serviceToken_);
0314 
0315   edm::ParameterSet pset;
0316   auto testProd = std::make_shared<BasicOutputModule>(pset);
0317 
0318   CPPUNIT_ASSERT(0 == testProd->m_count);
0319   testTransitions(testProd, {Trans::kEvent, Trans::kGlobalEndLuminosityBlock, Trans::kGlobalEndRun});
0320 }
0321 
0322 void testGlobalOutputModule::fileTest() {
0323   //make the services available
0324   edm::ServiceRegistry::Operate operate(serviceToken_);
0325 
0326   edm::ParameterSet pset;
0327   auto testProd = std::make_shared<FileOutputModule>(pset);
0328 
0329   CPPUNIT_ASSERT(0 == testProd->m_count);
0330   testTransitions(testProd,
0331                   {Trans::kGlobalOpenInputFile,
0332                    Trans::kEvent,
0333                    Trans::kGlobalEndLuminosityBlock,
0334                    Trans::kGlobalEndRun,
0335                    Trans::kGlobalCloseInputFile});
0336 }