Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-12-24 02:18:51

0001 /*
0002  *  proxyfactoryproducer_t.cc
0003  *  EDMProto
0004  *
0005  */
0006 #include <iostream>
0007 #include <atomic>
0008 #include <vector>
0009 #include <map>
0010 #include <functional>
0011 #include "oneapi/tbb/global_control.h"
0012 #include "FWCore/Framework/interface/limited/OutputModule.h"
0013 #include "FWCore/Framework/interface/OutputModuleCommunicatorT.h"
0014 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0015 #include "FWCore/Framework/interface/maker/WorkerT.h"
0016 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0017 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0018 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0019 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0020 #include "FWCore/Framework/interface/HistoryAppender.h"
0021 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0022 #include "FWCore/Framework/interface/TriggerNamesService.h"
0023 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0024 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0025 #include "FWCore/ServiceRegistry/interface/Service.h"
0026 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0027 #include "FWCore/Framework/interface/FileBlock.h"
0028 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 
0031 #include "FWCore/Utilities/interface/Exception.h"
0032 
0033 #include "cppunit/extensions/HelperMacros.h"
0034 
0035 namespace edm {
0036   class ModuleCallingContext;
0037 }
0038 
0039 class testLimitedOutputModule : public CppUnit::TestFixture {
0040   CPPUNIT_TEST_SUITE(testLimitedOutputModule);
0041 
0042   CPPUNIT_TEST(basicTest);
0043   CPPUNIT_TEST(fileTest);
0044 
0045   CPPUNIT_TEST_SUITE_END();
0046 
0047 public:
0048   testLimitedOutputModule();
0049 
0050   void setUp() {}
0051   void tearDown() {}
0052 
0053   void basicTest();
0054   void fileTest();
0055 
0056   enum class Trans {
0057     kBeginJob,
0058     kGlobalOpenInputFile,
0059     kGlobalBeginRun,
0060     kGlobalBeginRunProduce,
0061     kGlobalBeginLuminosityBlock,
0062     kEvent,
0063     kGlobalEndLuminosityBlock,
0064     kGlobalEndRun,
0065     kGlobalCloseInputFile,
0066     kEndJob
0067   };
0068 
0069   typedef std::vector<Trans> Expectations;
0070 
0071 private:
0072   std::map<Trans, std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)>> m_transToFunc;
0073 
0074   edm::ProcessConfiguration m_procConfig;
0075   edm::PreallocationConfiguration m_preallocConfig;
0076   std::shared_ptr<edm::ProductRegistry> m_prodReg;
0077   std::shared_ptr<edm::BranchIDListHelper> m_idHelper;
0078   std::shared_ptr<edm::ThinnedAssociationsHelper> m_associationsHelper;
0079   std::unique_ptr<edm::EventPrincipal> m_ep;
0080   edm::HistoryAppender historyAppender_;
0081   std::shared_ptr<edm::LuminosityBlockPrincipal> m_lbp;
0082   std::shared_ptr<edm::RunPrincipal> m_rp;
0083   std::shared_ptr<edm::ActivityRegistry>
0084       m_actReg;  // We do not use propagate_const because the registry itself is mutable.
0085   edm::EventSetupImpl* m_es = nullptr;
0086   edm::ModuleDescription m_desc = {"Dummy", "dummy"};
0087   edm::WorkerParams m_params;
0088 
0089   typedef edm::service::TriggerNamesService TNS;
0090   typedef edm::serviceregistry::ServiceWrapper<TNS> w_TNS;
0091   std::shared_ptr<w_TNS> tnsptr_;
0092   edm::ServiceToken serviceToken_;
0093 
0094   template <typename T>
0095   void testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect);
0096 
0097   template <typename Traits, typename Info>
0098   void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) {
0099     edm::FinalWaitingTask task;
0100     oneapi::tbb::task_group group;
0101     edm::ServiceToken token;
0102     iBase->doWorkAsync<Traits>(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr);
0103     do {
0104       group.wait();
0105     } while (not task.done());
0106     if (auto e = task.exceptionPtr()) {
0107       std::rethrow_exception(*e);
0108     }
0109   }
0110 
0111   class BasicOutputModule : public edm::limited::OutputModule<> {
0112   public:
0113     using edm::limited::OutputModuleBase::doPreallocate;
0114     BasicOutputModule(edm::ParameterSet const& iPSet)
0115         : edm::limited::OutputModuleBase(iPSet), edm::limited::OutputModule<>(iPSet) {}
0116     std::atomic<unsigned int> m_count{0};
0117 
0118     void write(edm::EventForOutput const&) override { ++m_count; }
0119     void writeRun(edm::RunForOutput const&) override { ++m_count; }
0120     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0121   };
0122 
0123   class FileOutputModule : public edm::limited::OutputModule<edm::WatchInputFiles> {
0124   public:
0125     using edm::limited::OutputModuleBase::doPreallocate;
0126     FileOutputModule(edm::ParameterSet const& iPSet)
0127         : edm::limited::OutputModuleBase(iPSet), edm::limited::OutputModule<edm::WatchInputFiles>(iPSet) {}
0128     std::atomic<unsigned int> m_count{0};
0129     void write(edm::EventForOutput const&) override { ++m_count; }
0130     void writeRun(edm::RunForOutput const&) override { ++m_count; }
0131     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0132 
0133     void respondToOpenInputFile(edm::FileBlock const&) override { ++m_count; }
0134 
0135     void respondToCloseInputFile(edm::FileBlock const&) override { ++m_count; }
0136   };
0137 };
0138 
0139 namespace {
0140 
0141   edm::ActivityRegistry activityRegistry;
0142 
0143   struct ShadowStreamID {
0144     constexpr ShadowStreamID() : value(0) {}
0145     unsigned int value;
0146   };
0147 
0148   union IDUnion {
0149     IDUnion() : m_shadow() {}
0150     ShadowStreamID m_shadow;
0151     edm::StreamID m_id;
0152   };
0153 }  // namespace
0154 static edm::StreamID makeID() {
0155   IDUnion u;
0156   assert(u.m_id.value() == 0);
0157   return u.m_id;
0158 }
0159 static const edm::StreamID s_streamID0 = makeID();
0160 
0161 ///registration of the test so that the runner can find it
0162 CPPUNIT_TEST_SUITE_REGISTRATION(testLimitedOutputModule);
0163 
0164 testLimitedOutputModule::testLimitedOutputModule()
0165     : m_prodReg(new edm::ProductRegistry{}),
0166       m_idHelper(new edm::BranchIDListHelper{}),
0167       m_associationsHelper(new edm::ThinnedAssociationsHelper{}),
0168       m_ep() {
0169   //Setup the principals
0170   m_prodReg->setFrozen();
0171   m_idHelper->updateFromRegistry(*m_prodReg);
0172   edm::EventID eventID = edm::EventID::firstValidEvent();
0173 
0174   std::string uuid = edm::createGlobalIdentifier();
0175   edm::Timestamp now(1234567UL);
0176   auto runAux = std::make_shared<edm::RunAuxiliary>(eventID.run(), now, now);
0177   m_rp.reset(new edm::RunPrincipal(runAux, m_prodReg, m_procConfig, &historyAppender_, 0));
0178   m_lbp.reset(new edm::LuminosityBlockPrincipal(m_prodReg, m_procConfig, &historyAppender_, 0));
0179   m_lbp->setAux(edm::LuminosityBlockAuxiliary(m_rp->run(), 1, now, now));
0180   m_lbp->setRunPrincipal(m_rp);
0181   edm::EventAuxiliary eventAux(eventID, uuid, now, true);
0182 
0183   m_ep.reset(new edm::EventPrincipal(m_prodReg, m_idHelper, m_associationsHelper, m_procConfig, nullptr));
0184   m_ep->fillEventPrincipal(eventAux, nullptr);
0185   m_ep->setLuminosityBlockPrincipal(m_lbp.get());
0186   m_actReg.reset(new edm::ActivityRegistry);
0187 
0188   //For each transition, bind a lambda which will call the proper method of the Worker
0189   m_transToFunc[Trans::kGlobalOpenInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0190     edm::FileBlock fb;
0191     iBase->respondToOpenInputFile(fb);
0192   };
0193 
0194   m_transToFunc[Trans::kGlobalBeginRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0195     typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalBegin> Traits;
0196     edm::ParentContext parentContext;
0197     edm::RunTransitionInfo info(*m_rp, *m_es);
0198     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0199   };
0200 
0201   m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0202     typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalBegin> Traits;
0203     edm::ParentContext parentContext;
0204     edm::LumiTransitionInfo info(*m_lbp, *m_es);
0205     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0206   };
0207 
0208   m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0209     typedef edm::OccurrenceTraits<edm::EventPrincipal, edm::BranchActionStreamBegin> Traits;
0210     edm::StreamContext streamContext(s_streamID0, nullptr);
0211     edm::ParentContext parentContext(&streamContext);
0212     iBase->setActivityRegistry(m_actReg);
0213     edm::EventTransitionInfo info(*m_ep, *m_es);
0214     doWork<Traits>(iBase, info, s_streamID0, parentContext);
0215   };
0216 
0217   m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0218     typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalEnd> Traits;
0219     edm::ParentContext parentContext;
0220     edm::LumiTransitionInfo info(*m_lbp, *m_es);
0221     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0222     edm::FinalWaitingTask task;
0223     oneapi::tbb::task_group group;
0224     iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry);
0225     do {
0226       group.wait();
0227     } while (not task.done());
0228     if (task.exceptionPtr() != nullptr) {
0229       std::rethrow_exception(*task.exceptionPtr());
0230     }
0231   };
0232 
0233   m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0234     typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalEnd> Traits;
0235     edm::ParentContext parentContext;
0236     edm::RunTransitionInfo info(*m_rp, *m_es);
0237     doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0238     edm::FinalWaitingTask task;
0239     oneapi::tbb::task_group group;
0240     iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr);
0241     do {
0242       group.wait();
0243     } while (not task.done());
0244     if (task.exceptionPtr() != nullptr) {
0245       std::rethrow_exception(*task.exceptionPtr());
0246     }
0247   };
0248 
0249   m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0250     edm::FileBlock fb;
0251     iBase->respondToCloseInputFile(fb);
0252   };
0253 
0254   // We want to create the TriggerNamesService because it is used in
0255   // the tests.  We do that here, but first we need to build a minimal
0256   // parameter set to pass to its constructor.  Then we build the
0257   // service and setup the service system.
0258   edm::ParameterSet proc_pset;
0259 
0260   std::string processName("HLT");
0261   proc_pset.addParameter<std::string>("@process_name", processName);
0262 
0263   std::vector<std::string> paths;
0264   edm::ParameterSet trigPaths;
0265   trigPaths.addParameter<std::vector<std::string>>("@trigger_paths", paths);
0266   proc_pset.addParameter<edm::ParameterSet>("@trigger_paths", trigPaths);
0267 
0268   std::vector<std::string> endPaths;
0269   proc_pset.addParameter<std::vector<std::string>>("@end_paths", endPaths);
0270 
0271   // Now create and setup the service
0272   tnsptr_.reset(new w_TNS(std::make_unique<TNS>(proc_pset)));
0273 
0274   serviceToken_ = edm::ServiceRegistry::createContaining(tnsptr_);
0275 }
0276 
0277 namespace {
0278   template <typename T>
0279   void testTransition(std::shared_ptr<T> iMod,
0280                       edm::Worker* iWorker,
0281                       edm::OutputModuleCommunicator* iComm,
0282                       testLimitedOutputModule::Trans iTrans,
0283                       testLimitedOutputModule::Expectations const& iExpect,
0284                       std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)> iFunc) {
0285     assert(0 == iMod->m_count);
0286     iFunc(iWorker, iComm);
0287     auto count = std::count(iExpect.begin(), iExpect.end(), iTrans);
0288     if (count != iMod->m_count) {
0289       std::cout << "For trans " << static_cast<std::underlying_type<testLimitedOutputModule::Trans>::type>(iTrans)
0290                 << " expected " << count << " and got " << iMod->m_count << std::endl;
0291     }
0292     CPPUNIT_ASSERT(iMod->m_count == count);
0293     iMod->m_count = 0;
0294     iWorker->reset();
0295   }
0296 }  // namespace
0297 
0298 template <typename T>
0299 void testLimitedOutputModule::testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect) {
0300   oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 1);
0301 
0302   iMod->doPreallocate(m_preallocConfig);
0303   edm::WorkerT<edm::limited::OutputModuleBase> w{iMod, m_desc, m_params.actions_};
0304   edm::OutputModuleCommunicatorT<edm::limited::OutputModuleBase> comm(iMod.get());
0305   for (auto& keyVal : m_transToFunc) {
0306     testTransition(iMod, &w, &comm, keyVal.first, iExpect, keyVal.second);
0307   }
0308 }
0309 
0310 void testLimitedOutputModule::basicTest() {
0311   //make the services available
0312   edm::ServiceRegistry::Operate operate(serviceToken_);
0313 
0314   edm::ParameterSet pset;
0315   const unsigned int kLimit = 1;
0316   pset.addUntrackedParameter("concurrencyLimit", kLimit);
0317   auto testProd = std::make_shared<BasicOutputModule>(pset);
0318 
0319   CPPUNIT_ASSERT(0 == testProd->m_count);
0320   testTransitions(testProd, {Trans::kEvent, Trans::kGlobalEndLuminosityBlock, Trans::kGlobalEndRun});
0321 }
0322 
0323 void testLimitedOutputModule::fileTest() {
0324   //make the services available
0325   edm::ServiceRegistry::Operate operate(serviceToken_);
0326 
0327   edm::ParameterSet pset;
0328   const unsigned int kLimit = 1;
0329   pset.addUntrackedParameter("concurrencyLimit", kLimit);
0330   auto testProd = std::make_shared<FileOutputModule>(pset);
0331 
0332   CPPUNIT_ASSERT(0 == testProd->m_count);
0333   testTransitions(testProd,
0334                   {Trans::kGlobalOpenInputFile,
0335                    Trans::kEvent,
0336                    Trans::kGlobalEndLuminosityBlock,
0337                    Trans::kGlobalEndRun,
0338                    Trans::kGlobalCloseInputFile});
0339 }