File indexing completed on 2025-01-14 02:38:43
0001
0002
0003
0004
0005
0006 #include <iostream>
0007 #include <atomic>
0008 #include <vector>
0009 #include <map>
0010 #include <functional>
0011 #include "oneapi/tbb/global_control.h"
0012 #include "FWCore/Framework/interface/limited/OutputModule.h"
0013 #include "FWCore/Framework/interface/OutputModuleCommunicatorT.h"
0014 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0015 #include "FWCore/Framework/interface/maker/WorkerT.h"
0016 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0017 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0018 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0019 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0020 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0021 #include "FWCore/Framework/interface/HistoryAppender.h"
0022 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0023 #include "FWCore/Framework/interface/TriggerNamesService.h"
0024 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0025 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0026 #include "FWCore/ServiceRegistry/interface/Service.h"
0027 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0028 #include "FWCore/Framework/interface/FileBlock.h"
0029 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0030 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0031 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0032
0033 #include "FWCore/Utilities/interface/Exception.h"
0034
0035 #include "cppunit/extensions/HelperMacros.h"
0036
0037 namespace edm {
0038 class ModuleCallingContext;
0039 }
0040
0041 class testLimitedOutputModule : public CppUnit::TestFixture {
0042 CPPUNIT_TEST_SUITE(testLimitedOutputModule);
0043
0044 CPPUNIT_TEST(basicTest);
0045 CPPUNIT_TEST(fileTest);
0046
0047 CPPUNIT_TEST_SUITE_END();
0048
0049 public:
0050 testLimitedOutputModule();
0051
0052 void setUp() {}
0053 void tearDown() {}
0054
0055 void basicTest();
0056 void fileTest();
0057
0058 enum class Trans {
0059 kBeginJob,
0060 kGlobalOpenInputFile,
0061 kGlobalBeginRun,
0062 kGlobalBeginRunProduce,
0063 kGlobalBeginLuminosityBlock,
0064 kEvent,
0065 kGlobalEndLuminosityBlock,
0066 kGlobalEndRun,
0067 kGlobalCloseInputFile,
0068 kEndJob
0069 };
0070
0071 typedef std::vector<Trans> Expectations;
0072
0073 private:
0074 std::map<Trans, std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)>> m_transToFunc;
0075
0076 edm::ProcessConfiguration m_procConfig;
0077 edm::PreallocationConfiguration m_preallocConfig;
0078 std::shared_ptr<edm::ProductRegistry> m_prodReg;
0079 std::shared_ptr<edm::BranchIDListHelper> m_idHelper;
0080 std::shared_ptr<edm::ThinnedAssociationsHelper> m_associationsHelper;
0081 std::unique_ptr<edm::EventPrincipal> m_ep;
0082 edm::HistoryAppender historyAppender_;
0083 std::shared_ptr<edm::LuminosityBlockPrincipal> m_lbp;
0084 std::shared_ptr<edm::RunPrincipal> m_rp;
0085 std::shared_ptr<edm::ActivityRegistry>
0086 m_actReg;
0087 edm::EventSetupImpl* m_es = nullptr;
0088 edm::ModuleDescription m_desc = {"Dummy", "dummy"};
0089 edm::WorkerParams m_params;
0090
0091 typedef edm::service::TriggerNamesService TNS;
0092 typedef edm::serviceregistry::ServiceWrapper<TNS> w_TNS;
0093 std::shared_ptr<w_TNS> tnsptr_;
0094 edm::ServiceToken serviceToken_;
0095
0096 template <typename T>
0097 void testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect);
0098
0099 template <typename Traits, typename Info>
0100 void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) {
0101 oneapi::tbb::task_group group;
0102 edm::FinalWaitingTask task{group};
0103 edm::ServiceToken token;
0104 iBase->doWorkAsync<Traits>(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr);
0105 task.wait();
0106 }
0107
0108 class BasicOutputModule : public edm::limited::OutputModule<> {
0109 public:
0110 using edm::limited::OutputModuleBase::doPreallocate;
0111 BasicOutputModule(edm::ParameterSet const& iPSet)
0112 : edm::limited::OutputModuleBase(iPSet), edm::limited::OutputModule<>(iPSet) {}
0113 std::atomic<unsigned int> m_count{0};
0114
0115 void write(edm::EventForOutput const&) override { ++m_count; }
0116 void writeRun(edm::RunForOutput const&) override { ++m_count; }
0117 void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0118 };
0119
0120 class FileOutputModule : public edm::limited::OutputModule<edm::WatchInputFiles> {
0121 public:
0122 using edm::limited::OutputModuleBase::doPreallocate;
0123 FileOutputModule(edm::ParameterSet const& iPSet)
0124 : edm::limited::OutputModuleBase(iPSet), edm::limited::OutputModule<edm::WatchInputFiles>(iPSet) {}
0125 std::atomic<unsigned int> m_count{0};
0126 void write(edm::EventForOutput const&) override { ++m_count; }
0127 void writeRun(edm::RunForOutput const&) override { ++m_count; }
0128 void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0129
0130 void respondToOpenInputFile(edm::FileBlock const&) override { ++m_count; }
0131
0132 void respondToCloseInputFile(edm::FileBlock const&) override { ++m_count; }
0133 };
0134 };
0135
0136 namespace {
0137
0138 edm::ActivityRegistry activityRegistry;
0139
0140 struct ShadowStreamID {
0141 constexpr ShadowStreamID() : value(0) {}
0142 unsigned int value;
0143 };
0144
0145 union IDUnion {
0146 IDUnion() : m_shadow() {}
0147 ShadowStreamID m_shadow;
0148 edm::StreamID m_id;
0149 };
0150 }
0151 static edm::StreamID makeID() {
0152 IDUnion u;
0153 assert(u.m_id.value() == 0);
0154 return u.m_id;
0155 }
0156 static const edm::StreamID s_streamID0 = makeID();
0157
0158
0159 CPPUNIT_TEST_SUITE_REGISTRATION(testLimitedOutputModule);
0160
0161 testLimitedOutputModule::testLimitedOutputModule()
0162 : m_prodReg(new edm::ProductRegistry{}),
0163 m_idHelper(new edm::BranchIDListHelper{}),
0164 m_associationsHelper(new edm::ThinnedAssociationsHelper{}),
0165 m_ep() {
0166
0167 m_prodReg->setFrozen();
0168 m_idHelper->updateFromRegistry(*m_prodReg);
0169 edm::EventID eventID = edm::EventID::firstValidEvent();
0170
0171 std::string uuid = edm::createGlobalIdentifier();
0172 edm::Timestamp now(1234567UL);
0173 m_rp.reset(
0174 new edm::RunPrincipal(m_prodReg, edm::productResolversFactory::makePrimary, m_procConfig, &historyAppender_, 0));
0175 m_rp->setAux(edm::RunAuxiliary(eventID.run(), now, now));
0176 m_lbp.reset(new edm::LuminosityBlockPrincipal(
0177 m_prodReg, edm::productResolversFactory::makePrimary, m_procConfig, &historyAppender_, 0));
0178 m_lbp->setAux(edm::LuminosityBlockAuxiliary(m_rp->run(), 1, now, now));
0179 m_lbp->setRunPrincipal(m_rp);
0180 edm::EventAuxiliary eventAux(eventID, uuid, now, true);
0181
0182 m_ep.reset(new edm::EventPrincipal(
0183 m_prodReg, edm::productResolversFactory::makePrimary, m_idHelper, m_associationsHelper, m_procConfig, nullptr));
0184 m_ep->fillEventPrincipal(eventAux, nullptr);
0185 m_ep->setLuminosityBlockPrincipal(m_lbp.get());
0186 m_actReg.reset(new edm::ActivityRegistry);
0187
0188
0189 m_transToFunc[Trans::kGlobalOpenInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0190 edm::FileBlock fb;
0191 iBase->respondToOpenInputFile(fb);
0192 };
0193
0194 m_transToFunc[Trans::kGlobalBeginRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0195 typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalBegin> Traits;
0196 edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginRun, nullptr);
0197 edm::ParentContext parentContext(&gc);
0198 iBase->setActivityRegistry(m_actReg);
0199 edm::RunTransitionInfo info(*m_rp, *m_es);
0200 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0201 };
0202
0203 m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0204 typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalBegin> Traits;
0205 edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginLuminosityBlock, nullptr);
0206 edm::ParentContext parentContext(&gc);
0207 iBase->setActivityRegistry(m_actReg);
0208 edm::LumiTransitionInfo info(*m_lbp, *m_es);
0209 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0210 };
0211
0212 m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0213 typedef edm::OccurrenceTraits<edm::EventPrincipal, edm::BranchActionStreamBegin> Traits;
0214 edm::StreamContext streamContext(s_streamID0, nullptr);
0215 edm::ParentContext parentContext(&streamContext);
0216 iBase->setActivityRegistry(m_actReg);
0217 edm::EventTransitionInfo info(*m_ep, *m_es);
0218 doWork<Traits>(iBase, info, s_streamID0, parentContext);
0219 };
0220
0221 m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0222 typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalEnd> Traits;
0223 edm::GlobalContext gc(edm::GlobalContext::Transition::kEndLuminosityBlock, nullptr);
0224 edm::ParentContext parentContext(&gc);
0225 iBase->setActivityRegistry(m_actReg);
0226 edm::LumiTransitionInfo info(*m_lbp, *m_es);
0227 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0228 oneapi::tbb::task_group group;
0229 edm::FinalWaitingTask task{group};
0230 iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry);
0231 task.wait();
0232 };
0233
0234 m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0235 typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalEnd> Traits;
0236 edm::GlobalContext gc(edm::GlobalContext::Transition::kEndRun, nullptr);
0237 edm::ParentContext parentContext(&gc);
0238 iBase->setActivityRegistry(m_actReg);
0239 edm::RunTransitionInfo info(*m_rp, *m_es);
0240 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0241 oneapi::tbb::task_group group;
0242 edm::FinalWaitingTask task{group};
0243 iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr);
0244 task.wait();
0245 };
0246
0247 m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0248 edm::FileBlock fb;
0249 iBase->respondToCloseInputFile(fb);
0250 };
0251
0252
0253
0254
0255
0256 edm::ParameterSet proc_pset;
0257
0258 std::string processName("HLT");
0259 proc_pset.addParameter<std::string>("@process_name", processName);
0260
0261 std::vector<std::string> paths;
0262 edm::ParameterSet trigPaths;
0263 trigPaths.addParameter<std::vector<std::string>>("@trigger_paths", paths);
0264 proc_pset.addParameter<edm::ParameterSet>("@trigger_paths", trigPaths);
0265
0266 std::vector<std::string> endPaths;
0267 proc_pset.addParameter<std::vector<std::string>>("@end_paths", endPaths);
0268
0269
0270 tnsptr_.reset(new w_TNS(std::make_unique<TNS>(proc_pset)));
0271
0272 serviceToken_ = edm::ServiceRegistry::createContaining(tnsptr_);
0273 }
0274
0275 namespace {
0276 template <typename T>
0277 void testTransition(std::shared_ptr<T> iMod,
0278 edm::Worker* iWorker,
0279 edm::OutputModuleCommunicator* iComm,
0280 testLimitedOutputModule::Trans iTrans,
0281 testLimitedOutputModule::Expectations const& iExpect,
0282 std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)> iFunc) {
0283 assert(0 == iMod->m_count);
0284 iFunc(iWorker, iComm);
0285 auto count = std::count(iExpect.begin(), iExpect.end(), iTrans);
0286 if (count != iMod->m_count) {
0287 std::cout << "For trans " << static_cast<std::underlying_type<testLimitedOutputModule::Trans>::type>(iTrans)
0288 << " expected " << count << " and got " << iMod->m_count << std::endl;
0289 }
0290 CPPUNIT_ASSERT(iMod->m_count == count);
0291 iMod->m_count = 0;
0292 iWorker->reset();
0293 }
0294 }
0295
0296 template <typename T>
0297 void testLimitedOutputModule::testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect) {
0298 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 1);
0299
0300 iMod->doPreallocate(m_preallocConfig);
0301 edm::WorkerT<edm::limited::OutputModuleBase> w{iMod, m_desc, m_params.actions_};
0302 edm::OutputModuleCommunicatorT<edm::limited::OutputModuleBase> comm(iMod.get());
0303 for (auto& keyVal : m_transToFunc) {
0304 testTransition(iMod, &w, &comm, keyVal.first, iExpect, keyVal.second);
0305 }
0306 }
0307
0308 void testLimitedOutputModule::basicTest() {
0309
0310 edm::ServiceRegistry::Operate operate(serviceToken_);
0311
0312 edm::ParameterSet pset;
0313 const unsigned int kLimit = 1;
0314 pset.addUntrackedParameter("concurrencyLimit", kLimit);
0315 auto testProd = std::make_shared<BasicOutputModule>(pset);
0316
0317 CPPUNIT_ASSERT(0 == testProd->m_count);
0318 testTransitions(testProd, {Trans::kEvent, Trans::kGlobalEndLuminosityBlock, Trans::kGlobalEndRun});
0319 }
0320
0321 void testLimitedOutputModule::fileTest() {
0322
0323 edm::ServiceRegistry::Operate operate(serviceToken_);
0324
0325 edm::ParameterSet pset;
0326 const unsigned int kLimit = 1;
0327 pset.addUntrackedParameter("concurrencyLimit", kLimit);
0328 auto testProd = std::make_shared<FileOutputModule>(pset);
0329
0330 CPPUNIT_ASSERT(0 == testProd->m_count);
0331 testTransitions(testProd,
0332 {Trans::kGlobalOpenInputFile,
0333 Trans::kEvent,
0334 Trans::kGlobalEndLuminosityBlock,
0335 Trans::kGlobalEndRun,
0336 Trans::kGlobalCloseInputFile});
0337 }