File indexing completed on 2024-04-06 12:12:27
0001
0002
0003
0004
0005
0006 #include <iostream>
0007 #include <atomic>
0008 #include <vector>
0009 #include <map>
0010 #include <functional>
0011 #include "oneapi/tbb/global_control.h"
0012 #include "FWCore/Framework/interface/limited/OutputModule.h"
0013 #include "FWCore/Framework/interface/OutputModuleCommunicatorT.h"
0014 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0015 #include "FWCore/Framework/interface/maker/WorkerT.h"
0016 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0017 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0018 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0019 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0020 #include "FWCore/Framework/interface/HistoryAppender.h"
0021 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0022 #include "FWCore/Framework/interface/TriggerNamesService.h"
0023 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0024 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0025 #include "FWCore/ServiceRegistry/interface/Service.h"
0026 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0027 #include "FWCore/Framework/interface/FileBlock.h"
0028 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0031
0032 #include "FWCore/Utilities/interface/Exception.h"
0033
0034 #include "cppunit/extensions/HelperMacros.h"
0035
0036 namespace edm {
0037 class ModuleCallingContext;
0038 }
0039
0040 class testLimitedOutputModule : public CppUnit::TestFixture {
0041 CPPUNIT_TEST_SUITE(testLimitedOutputModule);
0042
0043 CPPUNIT_TEST(basicTest);
0044 CPPUNIT_TEST(fileTest);
0045
0046 CPPUNIT_TEST_SUITE_END();
0047
0048 public:
0049 testLimitedOutputModule();
0050
0051 void setUp() {}
0052 void tearDown() {}
0053
0054 void basicTest();
0055 void fileTest();
0056
0057 enum class Trans {
0058 kBeginJob,
0059 kGlobalOpenInputFile,
0060 kGlobalBeginRun,
0061 kGlobalBeginRunProduce,
0062 kGlobalBeginLuminosityBlock,
0063 kEvent,
0064 kGlobalEndLuminosityBlock,
0065 kGlobalEndRun,
0066 kGlobalCloseInputFile,
0067 kEndJob
0068 };
0069
0070 typedef std::vector<Trans> Expectations;
0071
0072 private:
0073 std::map<Trans, std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)>> m_transToFunc;
0074
0075 edm::ProcessConfiguration m_procConfig;
0076 edm::PreallocationConfiguration m_preallocConfig;
0077 std::shared_ptr<edm::ProductRegistry> m_prodReg;
0078 std::shared_ptr<edm::BranchIDListHelper> m_idHelper;
0079 std::shared_ptr<edm::ThinnedAssociationsHelper> m_associationsHelper;
0080 std::unique_ptr<edm::EventPrincipal> m_ep;
0081 edm::HistoryAppender historyAppender_;
0082 std::shared_ptr<edm::LuminosityBlockPrincipal> m_lbp;
0083 std::shared_ptr<edm::RunPrincipal> m_rp;
0084 std::shared_ptr<edm::ActivityRegistry>
0085 m_actReg;
0086 edm::EventSetupImpl* m_es = nullptr;
0087 edm::ModuleDescription m_desc = {"Dummy", "dummy"};
0088 edm::WorkerParams m_params;
0089
0090 typedef edm::service::TriggerNamesService TNS;
0091 typedef edm::serviceregistry::ServiceWrapper<TNS> w_TNS;
0092 std::shared_ptr<w_TNS> tnsptr_;
0093 edm::ServiceToken serviceToken_;
0094
0095 template <typename T>
0096 void testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect);
0097
0098 template <typename Traits, typename Info>
0099 void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) {
0100 oneapi::tbb::task_group group;
0101 edm::FinalWaitingTask task{group};
0102 edm::ServiceToken token;
0103 iBase->doWorkAsync<Traits>(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr);
0104 task.wait();
0105 }
0106
0107 class BasicOutputModule : public edm::limited::OutputModule<> {
0108 public:
0109 using edm::limited::OutputModuleBase::doPreallocate;
0110 BasicOutputModule(edm::ParameterSet const& iPSet)
0111 : edm::limited::OutputModuleBase(iPSet), edm::limited::OutputModule<>(iPSet) {}
0112 std::atomic<unsigned int> m_count{0};
0113
0114 void write(edm::EventForOutput const&) override { ++m_count; }
0115 void writeRun(edm::RunForOutput const&) override { ++m_count; }
0116 void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0117 };
0118
0119 class FileOutputModule : public edm::limited::OutputModule<edm::WatchInputFiles> {
0120 public:
0121 using edm::limited::OutputModuleBase::doPreallocate;
0122 FileOutputModule(edm::ParameterSet const& iPSet)
0123 : edm::limited::OutputModuleBase(iPSet), edm::limited::OutputModule<edm::WatchInputFiles>(iPSet) {}
0124 std::atomic<unsigned int> m_count{0};
0125 void write(edm::EventForOutput const&) override { ++m_count; }
0126 void writeRun(edm::RunForOutput const&) override { ++m_count; }
0127 void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0128
0129 void respondToOpenInputFile(edm::FileBlock const&) override { ++m_count; }
0130
0131 void respondToCloseInputFile(edm::FileBlock const&) override { ++m_count; }
0132 };
0133 };
0134
0135 namespace {
0136
0137 edm::ActivityRegistry activityRegistry;
0138
0139 struct ShadowStreamID {
0140 constexpr ShadowStreamID() : value(0) {}
0141 unsigned int value;
0142 };
0143
0144 union IDUnion {
0145 IDUnion() : m_shadow() {}
0146 ShadowStreamID m_shadow;
0147 edm::StreamID m_id;
0148 };
0149 }
0150 static edm::StreamID makeID() {
0151 IDUnion u;
0152 assert(u.m_id.value() == 0);
0153 return u.m_id;
0154 }
0155 static const edm::StreamID s_streamID0 = makeID();
0156
0157
0158 CPPUNIT_TEST_SUITE_REGISTRATION(testLimitedOutputModule);
0159
0160 testLimitedOutputModule::testLimitedOutputModule()
0161 : m_prodReg(new edm::ProductRegistry{}),
0162 m_idHelper(new edm::BranchIDListHelper{}),
0163 m_associationsHelper(new edm::ThinnedAssociationsHelper{}),
0164 m_ep() {
0165
0166 m_prodReg->setFrozen();
0167 m_idHelper->updateFromRegistry(*m_prodReg);
0168 edm::EventID eventID = edm::EventID::firstValidEvent();
0169
0170 std::string uuid = edm::createGlobalIdentifier();
0171 edm::Timestamp now(1234567UL);
0172 m_rp.reset(new edm::RunPrincipal(m_prodReg, m_procConfig, &historyAppender_, 0));
0173 m_rp->setAux(edm::RunAuxiliary(eventID.run(), now, now));
0174 m_lbp.reset(new edm::LuminosityBlockPrincipal(m_prodReg, m_procConfig, &historyAppender_, 0));
0175 m_lbp->setAux(edm::LuminosityBlockAuxiliary(m_rp->run(), 1, now, now));
0176 m_lbp->setRunPrincipal(m_rp);
0177 edm::EventAuxiliary eventAux(eventID, uuid, now, true);
0178
0179 m_ep.reset(new edm::EventPrincipal(m_prodReg, m_idHelper, m_associationsHelper, m_procConfig, nullptr));
0180 m_ep->fillEventPrincipal(eventAux, nullptr);
0181 m_ep->setLuminosityBlockPrincipal(m_lbp.get());
0182 m_actReg.reset(new edm::ActivityRegistry);
0183
0184
0185 m_transToFunc[Trans::kGlobalOpenInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0186 edm::FileBlock fb;
0187 iBase->respondToOpenInputFile(fb);
0188 };
0189
0190 m_transToFunc[Trans::kGlobalBeginRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0191 typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalBegin> Traits;
0192 edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginRun, nullptr);
0193 edm::ParentContext parentContext(&gc);
0194 iBase->setActivityRegistry(m_actReg);
0195 edm::RunTransitionInfo info(*m_rp, *m_es);
0196 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0197 };
0198
0199 m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0200 typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalBegin> Traits;
0201 edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginLuminosityBlock, nullptr);
0202 edm::ParentContext parentContext(&gc);
0203 iBase->setActivityRegistry(m_actReg);
0204 edm::LumiTransitionInfo info(*m_lbp, *m_es);
0205 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0206 };
0207
0208 m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0209 typedef edm::OccurrenceTraits<edm::EventPrincipal, edm::BranchActionStreamBegin> Traits;
0210 edm::StreamContext streamContext(s_streamID0, nullptr);
0211 edm::ParentContext parentContext(&streamContext);
0212 iBase->setActivityRegistry(m_actReg);
0213 edm::EventTransitionInfo info(*m_ep, *m_es);
0214 doWork<Traits>(iBase, info, s_streamID0, parentContext);
0215 };
0216
0217 m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0218 typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalEnd> Traits;
0219 edm::GlobalContext gc(edm::GlobalContext::Transition::kEndLuminosityBlock, nullptr);
0220 edm::ParentContext parentContext(&gc);
0221 iBase->setActivityRegistry(m_actReg);
0222 edm::LumiTransitionInfo info(*m_lbp, *m_es);
0223 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0224 oneapi::tbb::task_group group;
0225 edm::FinalWaitingTask task{group};
0226 iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry);
0227 task.wait();
0228 };
0229
0230 m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0231 typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalEnd> Traits;
0232 edm::GlobalContext gc(edm::GlobalContext::Transition::kEndRun, nullptr);
0233 edm::ParentContext parentContext(&gc);
0234 iBase->setActivityRegistry(m_actReg);
0235 edm::RunTransitionInfo info(*m_rp, *m_es);
0236 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0237 oneapi::tbb::task_group group;
0238 edm::FinalWaitingTask task{group};
0239 iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr);
0240 task.wait();
0241 };
0242
0243 m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0244 edm::FileBlock fb;
0245 iBase->respondToCloseInputFile(fb);
0246 };
0247
0248
0249
0250
0251
0252 edm::ParameterSet proc_pset;
0253
0254 std::string processName("HLT");
0255 proc_pset.addParameter<std::string>("@process_name", processName);
0256
0257 std::vector<std::string> paths;
0258 edm::ParameterSet trigPaths;
0259 trigPaths.addParameter<std::vector<std::string>>("@trigger_paths", paths);
0260 proc_pset.addParameter<edm::ParameterSet>("@trigger_paths", trigPaths);
0261
0262 std::vector<std::string> endPaths;
0263 proc_pset.addParameter<std::vector<std::string>>("@end_paths", endPaths);
0264
0265
0266 tnsptr_.reset(new w_TNS(std::make_unique<TNS>(proc_pset)));
0267
0268 serviceToken_ = edm::ServiceRegistry::createContaining(tnsptr_);
0269 }
0270
0271 namespace {
0272 template <typename T>
0273 void testTransition(std::shared_ptr<T> iMod,
0274 edm::Worker* iWorker,
0275 edm::OutputModuleCommunicator* iComm,
0276 testLimitedOutputModule::Trans iTrans,
0277 testLimitedOutputModule::Expectations const& iExpect,
0278 std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)> iFunc) {
0279 assert(0 == iMod->m_count);
0280 iFunc(iWorker, iComm);
0281 auto count = std::count(iExpect.begin(), iExpect.end(), iTrans);
0282 if (count != iMod->m_count) {
0283 std::cout << "For trans " << static_cast<std::underlying_type<testLimitedOutputModule::Trans>::type>(iTrans)
0284 << " expected " << count << " and got " << iMod->m_count << std::endl;
0285 }
0286 CPPUNIT_ASSERT(iMod->m_count == count);
0287 iMod->m_count = 0;
0288 iWorker->reset();
0289 }
0290 }
0291
0292 template <typename T>
0293 void testLimitedOutputModule::testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect) {
0294 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 1);
0295
0296 iMod->doPreallocate(m_preallocConfig);
0297 edm::WorkerT<edm::limited::OutputModuleBase> w{iMod, m_desc, m_params.actions_};
0298 edm::OutputModuleCommunicatorT<edm::limited::OutputModuleBase> comm(iMod.get());
0299 for (auto& keyVal : m_transToFunc) {
0300 testTransition(iMod, &w, &comm, keyVal.first, iExpect, keyVal.second);
0301 }
0302 }
0303
0304 void testLimitedOutputModule::basicTest() {
0305
0306 edm::ServiceRegistry::Operate operate(serviceToken_);
0307
0308 edm::ParameterSet pset;
0309 const unsigned int kLimit = 1;
0310 pset.addUntrackedParameter("concurrencyLimit", kLimit);
0311 auto testProd = std::make_shared<BasicOutputModule>(pset);
0312
0313 CPPUNIT_ASSERT(0 == testProd->m_count);
0314 testTransitions(testProd, {Trans::kEvent, Trans::kGlobalEndLuminosityBlock, Trans::kGlobalEndRun});
0315 }
0316
0317 void testLimitedOutputModule::fileTest() {
0318
0319 edm::ServiceRegistry::Operate operate(serviceToken_);
0320
0321 edm::ParameterSet pset;
0322 const unsigned int kLimit = 1;
0323 pset.addUntrackedParameter("concurrencyLimit", kLimit);
0324 auto testProd = std::make_shared<FileOutputModule>(pset);
0325
0326 CPPUNIT_ASSERT(0 == testProd->m_count);
0327 testTransitions(testProd,
0328 {Trans::kGlobalOpenInputFile,
0329 Trans::kEvent,
0330 Trans::kGlobalEndLuminosityBlock,
0331 Trans::kGlobalEndRun,
0332 Trans::kGlobalCloseInputFile});
0333 }