File indexing completed on 2025-01-14 02:38:42
0001
0002
0003
0004
0005
0006 #include <iostream>
0007 #include <atomic>
0008 #include <vector>
0009 #include <map>
0010 #include <functional>
0011 #include "oneapi/tbb/global_control.h"
0012 #include "FWCore/Framework/interface/global/OutputModule.h"
0013 #include "FWCore/Framework/interface/OutputModuleCommunicatorT.h"
0014 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0015 #include "FWCore/Framework/interface/maker/WorkerT.h"
0016 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0017 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0018 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0019 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0020 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0021 #include "FWCore/Framework/interface/HistoryAppender.h"
0022 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0023 #include "FWCore/Framework/interface/TriggerNamesService.h"
0024 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0025 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0026 #include "FWCore/ServiceRegistry/interface/Service.h"
0027 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0028 #include "FWCore/Framework/interface/FileBlock.h"
0029 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0030 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0031 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0032
0033 #include "FWCore/Utilities/interface/Exception.h"
0034
0035 #include "cppunit/extensions/HelperMacros.h"
0036
0037 namespace edm {
0038 class ModuleCallingContext;
0039 }
0040
0041 class testGlobalOutputModule : public CppUnit::TestFixture {
0042 CPPUNIT_TEST_SUITE(testGlobalOutputModule);
0043
0044 CPPUNIT_TEST(basicTest);
0045 CPPUNIT_TEST(fileTest);
0046
0047 CPPUNIT_TEST_SUITE_END();
0048
0049 public:
0050 testGlobalOutputModule();
0051
0052 void setUp() {}
0053 void tearDown() {}
0054
0055 void basicTest();
0056 void fileTest();
0057
0058 enum class Trans {
0059 kBeginJob,
0060 kGlobalOpenInputFile,
0061 kGlobalBeginRun,
0062 kGlobalBeginRunProduce,
0063 kGlobalBeginLuminosityBlock,
0064 kEvent,
0065 kGlobalEndLuminosityBlock,
0066 kGlobalEndRun,
0067 kGlobalCloseInputFile,
0068 kEndJob
0069 };
0070
0071 typedef std::vector<Trans> Expectations;
0072
0073 private:
0074 std::map<Trans, std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)>> m_transToFunc;
0075
0076 edm::ProcessConfiguration m_procConfig;
0077 edm::PreallocationConfiguration m_preallocConfig;
0078 std::shared_ptr<edm::ProductRegistry> m_prodReg;
0079 std::shared_ptr<edm::BranchIDListHelper> m_idHelper;
0080 std::shared_ptr<edm::ThinnedAssociationsHelper> m_associationsHelper;
0081 std::unique_ptr<edm::EventPrincipal> m_ep;
0082 edm::HistoryAppender historyAppender_;
0083 std::shared_ptr<edm::LuminosityBlockPrincipal> m_lbp;
0084 std::shared_ptr<edm::RunPrincipal> m_rp;
0085 std::shared_ptr<edm::ActivityRegistry>
0086 m_actReg;
0087 edm::EventSetupImpl* m_es = nullptr;
0088 edm::ModuleDescription m_desc = {"Dummy", "dummy"};
0089 edm::WorkerParams m_params;
0090
0091 typedef edm::service::TriggerNamesService TNS;
0092 typedef edm::serviceregistry::ServiceWrapper<TNS> w_TNS;
0093 std::shared_ptr<w_TNS> tnsptr_;
0094 edm::ServiceToken serviceToken_;
0095
0096 template <typename T>
0097 void testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect);
0098
0099 template <typename Traits, typename Info>
0100 void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) {
0101 oneapi::tbb::task_group group;
0102 edm::FinalWaitingTask task{group};
0103 edm::ServiceToken token;
0104 iBase->doWorkAsync<Traits>(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr);
0105 task.wait();
0106 }
0107
0108 class BasicOutputModule : public edm::global::OutputModule<> {
0109 public:
0110 using edm::global::OutputModuleBase::doPreallocate;
0111 BasicOutputModule(edm::ParameterSet const& iPSet)
0112 : edm::global::OutputModuleBase(iPSet), edm::global::OutputModule<>(iPSet) {}
0113 unsigned int m_count = 0;
0114
0115 void write(edm::EventForOutput const&) override { ++m_count; }
0116 void writeRun(edm::RunForOutput const&) override { ++m_count; }
0117 void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0118 };
0119
0120 class FileOutputModule : public edm::global::OutputModule<edm::WatchInputFiles> {
0121 public:
0122 using edm::global::OutputModuleBase::doPreallocate;
0123 FileOutputModule(edm::ParameterSet const& iPSet)
0124 : edm::global::OutputModuleBase(iPSet), edm::global::OutputModule<edm::WatchInputFiles>(iPSet) {}
0125 unsigned int m_count = 0;
0126 void write(edm::EventForOutput const&) override { ++m_count; }
0127 void writeRun(edm::RunForOutput const&) override { ++m_count; }
0128 void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0129
0130 void respondToOpenInputFile(edm::FileBlock const&) override { ++m_count; }
0131
0132 void respondToCloseInputFile(edm::FileBlock const&) override { ++m_count; }
0133 };
0134 };
0135
0136 namespace {
0137
0138 edm::ActivityRegistry activityRegistry;
0139
0140 struct ShadowStreamID {
0141 constexpr ShadowStreamID() : value(0) {}
0142 unsigned int value;
0143 };
0144
0145 union IDUnion {
0146 IDUnion() : m_shadow() {}
0147 ShadowStreamID m_shadow;
0148 edm::StreamID m_id;
0149 };
0150 }
0151 static edm::StreamID makeID() {
0152 IDUnion u;
0153 assert(u.m_id.value() == 0);
0154 return u.m_id;
0155 }
0156 static const edm::StreamID s_streamID0 = makeID();
0157
0158
0159 CPPUNIT_TEST_SUITE_REGISTRATION(testGlobalOutputModule);
0160
0161 testGlobalOutputModule::testGlobalOutputModule()
0162 : m_prodReg(new edm::ProductRegistry{}),
0163 m_idHelper(new edm::BranchIDListHelper{}),
0164 m_associationsHelper(new edm::ThinnedAssociationsHelper{}),
0165 m_ep() {
0166
0167 m_prodReg->setFrozen();
0168 m_idHelper->updateFromRegistry(*m_prodReg);
0169 edm::EventID eventID = edm::EventID::firstValidEvent();
0170
0171 std::string uuid = edm::createGlobalIdentifier();
0172 edm::Timestamp now(1234567UL);
0173 m_rp.reset(
0174 new edm::RunPrincipal(m_prodReg, edm::productResolversFactory::makePrimary, m_procConfig, &historyAppender_, 0));
0175 m_rp->setAux(edm::RunAuxiliary(eventID.run(), now, now));
0176 edm::LuminosityBlockAuxiliary lumiAux(m_rp->run(), 1, now, now);
0177 m_lbp.reset(new edm::LuminosityBlockPrincipal(
0178 m_prodReg, edm::productResolversFactory::makePrimary, m_procConfig, &historyAppender_, 0));
0179 m_lbp->setAux(lumiAux);
0180 m_lbp->setRunPrincipal(m_rp);
0181 edm::EventAuxiliary eventAux(eventID, uuid, now, true);
0182
0183 m_ep.reset(new edm::EventPrincipal(
0184 m_prodReg, edm::productResolversFactory::makePrimary, m_idHelper, m_associationsHelper, m_procConfig, nullptr));
0185 m_ep->fillEventPrincipal(eventAux, nullptr);
0186 m_ep->setLuminosityBlockPrincipal(m_lbp.get());
0187 m_actReg.reset(new edm::ActivityRegistry);
0188
0189
0190 m_transToFunc[Trans::kGlobalOpenInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0191 edm::FileBlock fb;
0192 iBase->respondToOpenInputFile(fb);
0193 };
0194
0195 m_transToFunc[Trans::kGlobalBeginRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0196 typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalBegin> Traits;
0197 edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginRun, nullptr);
0198 edm::ParentContext parentContext(&gc);
0199 iBase->setActivityRegistry(m_actReg);
0200 edm::RunTransitionInfo info(*m_rp, *m_es);
0201 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0202 };
0203
0204 m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0205 typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalBegin> Traits;
0206 edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginLuminosityBlock, nullptr);
0207 edm::ParentContext parentContext(&gc);
0208 iBase->setActivityRegistry(m_actReg);
0209 edm::LumiTransitionInfo info(*m_lbp, *m_es);
0210 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0211 };
0212
0213 m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0214 typedef edm::OccurrenceTraits<edm::EventPrincipal, edm::BranchActionStreamBegin> Traits;
0215 edm::StreamContext streamContext(s_streamID0, nullptr);
0216 edm::ParentContext parentContext(&streamContext);
0217 iBase->setActivityRegistry(m_actReg);
0218 edm::EventTransitionInfo info(*m_ep, *m_es);
0219 doWork<Traits>(iBase, info, s_streamID0, parentContext);
0220 };
0221
0222 m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0223 typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalEnd> Traits;
0224 edm::GlobalContext gc(edm::GlobalContext::Transition::kEndLuminosityBlock, nullptr);
0225 edm::ParentContext parentContext(&gc);
0226 iBase->setActivityRegistry(m_actReg);
0227 edm::LumiTransitionInfo info(*m_lbp, *m_es);
0228 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0229 oneapi::tbb::task_group group;
0230 edm::FinalWaitingTask task{group};
0231 iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry);
0232 task.wait();
0233 };
0234
0235 m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0236 typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalEnd> Traits;
0237 edm::GlobalContext gc(edm::GlobalContext::Transition::kEndRun, nullptr);
0238 edm::ParentContext parentContext(&gc);
0239 iBase->setActivityRegistry(m_actReg);
0240 edm::RunTransitionInfo info(*m_rp, *m_es);
0241 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0242 oneapi::tbb::task_group group;
0243 edm::FinalWaitingTask task{group};
0244 iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr);
0245 task.wait();
0246 };
0247
0248 m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0249 edm::FileBlock fb;
0250 iBase->respondToCloseInputFile(fb);
0251 };
0252
0253
0254
0255
0256
0257 edm::ParameterSet proc_pset;
0258
0259 std::string processName("HLT");
0260 proc_pset.addParameter<std::string>("@process_name", processName);
0261
0262 std::vector<std::string> paths;
0263 edm::ParameterSet trigPaths;
0264 trigPaths.addParameter<std::vector<std::string>>("@trigger_paths", paths);
0265 proc_pset.addParameter<edm::ParameterSet>("@trigger_paths", trigPaths);
0266
0267 std::vector<std::string> endPaths;
0268 proc_pset.addParameter<std::vector<std::string>>("@end_paths", endPaths);
0269
0270
0271 tnsptr_.reset(new w_TNS(std::make_unique<TNS>(proc_pset)));
0272
0273 serviceToken_ = edm::ServiceRegistry::createContaining(tnsptr_);
0274 }
0275
0276 namespace {
0277 template <typename T>
0278 void testTransition(std::shared_ptr<T> iMod,
0279 edm::Worker* iWorker,
0280 edm::OutputModuleCommunicator* iComm,
0281 testGlobalOutputModule::Trans iTrans,
0282 testGlobalOutputModule::Expectations const& iExpect,
0283 std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)> iFunc) {
0284 assert(0 == iMod->m_count);
0285 iFunc(iWorker, iComm);
0286 auto count = std::count(iExpect.begin(), iExpect.end(), iTrans);
0287 if (count != iMod->m_count) {
0288 std::cout << "For trans " << static_cast<std::underlying_type<testGlobalOutputModule::Trans>::type>(iTrans)
0289 << " expected " << count << " and got " << iMod->m_count << std::endl;
0290 }
0291 CPPUNIT_ASSERT(iMod->m_count == count);
0292 iMod->m_count = 0;
0293 iWorker->reset();
0294 }
0295 }
0296
0297 template <typename T>
0298 void testGlobalOutputModule::testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect) {
0299 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 1);
0300
0301 oneapi::tbb::task_arena arena(1);
0302 arena.execute([&]() {
0303 iMod->doPreallocate(m_preallocConfig);
0304 edm::WorkerT<edm::global::OutputModuleBase> w{iMod, m_desc, m_params.actions_};
0305 edm::OutputModuleCommunicatorT<edm::global::OutputModuleBase> comm(iMod.get());
0306 for (auto& keyVal : m_transToFunc) {
0307 testTransition(iMod, &w, &comm, keyVal.first, iExpect, keyVal.second);
0308 }
0309 });
0310 }
0311
0312 void testGlobalOutputModule::basicTest() {
0313
0314 edm::ServiceRegistry::Operate operate(serviceToken_);
0315
0316 edm::ParameterSet pset;
0317 auto testProd = std::make_shared<BasicOutputModule>(pset);
0318
0319 CPPUNIT_ASSERT(0 == testProd->m_count);
0320 testTransitions(testProd, {Trans::kEvent, Trans::kGlobalEndLuminosityBlock, Trans::kGlobalEndRun});
0321 }
0322
0323 void testGlobalOutputModule::fileTest() {
0324
0325 edm::ServiceRegistry::Operate operate(serviceToken_);
0326
0327 edm::ParameterSet pset;
0328 auto testProd = std::make_shared<FileOutputModule>(pset);
0329
0330 CPPUNIT_ASSERT(0 == testProd->m_count);
0331 testTransitions(testProd,
0332 {Trans::kGlobalOpenInputFile,
0333 Trans::kEvent,
0334 Trans::kGlobalEndLuminosityBlock,
0335 Trans::kGlobalEndRun,
0336 Trans::kGlobalCloseInputFile});
0337 }