File indexing completed on 2023-03-17 11:02:42
0001
0002
0003
0004
0005
0006 #include <iostream>
0007 #include <atomic>
0008 #include <vector>
0009 #include <map>
0010 #include <functional>
0011 #include "oneapi/tbb/global_control.h"
0012 #include "FWCore/Framework/interface/global/OutputModule.h"
0013 #include "FWCore/Framework/interface/OutputModuleCommunicatorT.h"
0014 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0015 #include "FWCore/Framework/interface/maker/WorkerT.h"
0016 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0017 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0018 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0019 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0020 #include "FWCore/Framework/interface/HistoryAppender.h"
0021 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
0022 #include "FWCore/Framework/interface/TriggerNamesService.h"
0023 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0024 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0025 #include "FWCore/ServiceRegistry/interface/Service.h"
0026 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0027 #include "FWCore/Framework/interface/FileBlock.h"
0028 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0031
0032 #include "FWCore/Utilities/interface/Exception.h"
0033
0034 #include "cppunit/extensions/HelperMacros.h"
0035
0036 namespace edm {
0037 class ModuleCallingContext;
0038 }
0039
0040 class testGlobalOutputModule : public CppUnit::TestFixture {
0041 CPPUNIT_TEST_SUITE(testGlobalOutputModule);
0042
0043 CPPUNIT_TEST(basicTest);
0044 CPPUNIT_TEST(fileTest);
0045
0046 CPPUNIT_TEST_SUITE_END();
0047
0048 public:
0049 testGlobalOutputModule();
0050
0051 void setUp() {}
0052 void tearDown() {}
0053
0054 void basicTest();
0055 void fileTest();
0056
0057 enum class Trans {
0058 kBeginJob,
0059 kGlobalOpenInputFile,
0060 kGlobalBeginRun,
0061 kGlobalBeginRunProduce,
0062 kGlobalBeginLuminosityBlock,
0063 kEvent,
0064 kGlobalEndLuminosityBlock,
0065 kGlobalEndRun,
0066 kGlobalCloseInputFile,
0067 kEndJob
0068 };
0069
0070 typedef std::vector<Trans> Expectations;
0071
0072 private:
0073 std::map<Trans, std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)>> m_transToFunc;
0074
0075 edm::ProcessConfiguration m_procConfig;
0076 edm::PreallocationConfiguration m_preallocConfig;
0077 std::shared_ptr<edm::ProductRegistry> m_prodReg;
0078 std::shared_ptr<edm::BranchIDListHelper> m_idHelper;
0079 std::shared_ptr<edm::ThinnedAssociationsHelper> m_associationsHelper;
0080 std::unique_ptr<edm::EventPrincipal> m_ep;
0081 edm::HistoryAppender historyAppender_;
0082 std::shared_ptr<edm::LuminosityBlockPrincipal> m_lbp;
0083 std::shared_ptr<edm::RunPrincipal> m_rp;
0084 std::shared_ptr<edm::ActivityRegistry>
0085 m_actReg;
0086 edm::EventSetupImpl* m_es = nullptr;
0087 edm::ModuleDescription m_desc = {"Dummy", "dummy"};
0088 edm::WorkerParams m_params;
0089
0090 typedef edm::service::TriggerNamesService TNS;
0091 typedef edm::serviceregistry::ServiceWrapper<TNS> w_TNS;
0092 std::shared_ptr<w_TNS> tnsptr_;
0093 edm::ServiceToken serviceToken_;
0094
0095 template <typename T>
0096 void testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect);
0097
0098 template <typename Traits, typename Info>
0099 void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) {
0100 oneapi::tbb::task_group group;
0101 edm::FinalWaitingTask task{group};
0102 edm::ServiceToken token;
0103 iBase->doWorkAsync<Traits>(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr);
0104 task.wait();
0105 }
0106
0107 class BasicOutputModule : public edm::global::OutputModule<> {
0108 public:
0109 using edm::global::OutputModuleBase::doPreallocate;
0110 BasicOutputModule(edm::ParameterSet const& iPSet)
0111 : edm::global::OutputModuleBase(iPSet), edm::global::OutputModule<>(iPSet) {}
0112 unsigned int m_count = 0;
0113
0114 void write(edm::EventForOutput const&) override { ++m_count; }
0115 void writeRun(edm::RunForOutput const&) override { ++m_count; }
0116 void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0117 };
0118
0119 class FileOutputModule : public edm::global::OutputModule<edm::WatchInputFiles> {
0120 public:
0121 using edm::global::OutputModuleBase::doPreallocate;
0122 FileOutputModule(edm::ParameterSet const& iPSet)
0123 : edm::global::OutputModuleBase(iPSet), edm::global::OutputModule<edm::WatchInputFiles>(iPSet) {}
0124 unsigned int m_count = 0;
0125 void write(edm::EventForOutput const&) override { ++m_count; }
0126 void writeRun(edm::RunForOutput const&) override { ++m_count; }
0127 void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override { ++m_count; }
0128
0129 void respondToOpenInputFile(edm::FileBlock const&) override { ++m_count; }
0130
0131 void respondToCloseInputFile(edm::FileBlock const&) override { ++m_count; }
0132 };
0133 };
0134
0135 namespace {
0136
0137 edm::ActivityRegistry activityRegistry;
0138
0139 struct ShadowStreamID {
0140 constexpr ShadowStreamID() : value(0) {}
0141 unsigned int value;
0142 };
0143
0144 union IDUnion {
0145 IDUnion() : m_shadow() {}
0146 ShadowStreamID m_shadow;
0147 edm::StreamID m_id;
0148 };
0149 }
0150 static edm::StreamID makeID() {
0151 IDUnion u;
0152 assert(u.m_id.value() == 0);
0153 return u.m_id;
0154 }
0155 static const edm::StreamID s_streamID0 = makeID();
0156
0157
0158 CPPUNIT_TEST_SUITE_REGISTRATION(testGlobalOutputModule);
0159
0160 testGlobalOutputModule::testGlobalOutputModule()
0161 : m_prodReg(new edm::ProductRegistry{}),
0162 m_idHelper(new edm::BranchIDListHelper{}),
0163 m_associationsHelper(new edm::ThinnedAssociationsHelper{}),
0164 m_ep() {
0165
0166 m_prodReg->setFrozen();
0167 m_idHelper->updateFromRegistry(*m_prodReg);
0168 edm::EventID eventID = edm::EventID::firstValidEvent();
0169
0170 std::string uuid = edm::createGlobalIdentifier();
0171 edm::Timestamp now(1234567UL);
0172 m_rp.reset(new edm::RunPrincipal(m_prodReg, m_procConfig, &historyAppender_, 0));
0173 m_rp->setAux(edm::RunAuxiliary(eventID.run(), now, now));
0174 edm::LuminosityBlockAuxiliary lumiAux(m_rp->run(), 1, now, now);
0175 m_lbp.reset(new edm::LuminosityBlockPrincipal(m_prodReg, m_procConfig, &historyAppender_, 0));
0176 m_lbp->setAux(lumiAux);
0177 m_lbp->setRunPrincipal(m_rp);
0178 edm::EventAuxiliary eventAux(eventID, uuid, now, true);
0179
0180 m_ep.reset(new edm::EventPrincipal(m_prodReg, m_idHelper, m_associationsHelper, m_procConfig, nullptr));
0181 m_ep->fillEventPrincipal(eventAux, nullptr);
0182 m_ep->setLuminosityBlockPrincipal(m_lbp.get());
0183 m_actReg.reset(new edm::ActivityRegistry);
0184
0185
0186 m_transToFunc[Trans::kGlobalOpenInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0187 edm::FileBlock fb;
0188 iBase->respondToOpenInputFile(fb);
0189 };
0190
0191 m_transToFunc[Trans::kGlobalBeginRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0192 typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalBegin> Traits;
0193 edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginRun, nullptr);
0194 edm::ParentContext parentContext(&gc);
0195 iBase->setActivityRegistry(m_actReg);
0196 edm::RunTransitionInfo info(*m_rp, *m_es);
0197 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0198 };
0199
0200 m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0201 typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalBegin> Traits;
0202 edm::GlobalContext gc(edm::GlobalContext::Transition::kBeginLuminosityBlock, nullptr);
0203 edm::ParentContext parentContext(&gc);
0204 iBase->setActivityRegistry(m_actReg);
0205 edm::LumiTransitionInfo info(*m_lbp, *m_es);
0206 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0207 };
0208
0209 m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0210 typedef edm::OccurrenceTraits<edm::EventPrincipal, edm::BranchActionStreamBegin> Traits;
0211 edm::StreamContext streamContext(s_streamID0, nullptr);
0212 edm::ParentContext parentContext(&streamContext);
0213 iBase->setActivityRegistry(m_actReg);
0214 edm::EventTransitionInfo info(*m_ep, *m_es);
0215 doWork<Traits>(iBase, info, s_streamID0, parentContext);
0216 };
0217
0218 m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0219 typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalEnd> Traits;
0220 edm::GlobalContext gc(edm::GlobalContext::Transition::kEndLuminosityBlock, nullptr);
0221 edm::ParentContext parentContext(&gc);
0222 iBase->setActivityRegistry(m_actReg);
0223 edm::LumiTransitionInfo info(*m_lbp, *m_es);
0224 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0225 oneapi::tbb::task_group group;
0226 edm::FinalWaitingTask task{group};
0227 iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry);
0228 task.wait();
0229 };
0230
0231 m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
0232 typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalEnd> Traits;
0233 edm::GlobalContext gc(edm::GlobalContext::Transition::kEndRun, nullptr);
0234 edm::ParentContext parentContext(&gc);
0235 iBase->setActivityRegistry(m_actReg);
0236 edm::RunTransitionInfo info(*m_rp, *m_es);
0237 doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
0238 oneapi::tbb::task_group group;
0239 edm::FinalWaitingTask task{group};
0240 iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr);
0241 task.wait();
0242 };
0243
0244 m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
0245 edm::FileBlock fb;
0246 iBase->respondToCloseInputFile(fb);
0247 };
0248
0249
0250
0251
0252
0253 edm::ParameterSet proc_pset;
0254
0255 std::string processName("HLT");
0256 proc_pset.addParameter<std::string>("@process_name", processName);
0257
0258 std::vector<std::string> paths;
0259 edm::ParameterSet trigPaths;
0260 trigPaths.addParameter<std::vector<std::string>>("@trigger_paths", paths);
0261 proc_pset.addParameter<edm::ParameterSet>("@trigger_paths", trigPaths);
0262
0263 std::vector<std::string> endPaths;
0264 proc_pset.addParameter<std::vector<std::string>>("@end_paths", endPaths);
0265
0266
0267 tnsptr_.reset(new w_TNS(std::make_unique<TNS>(proc_pset)));
0268
0269 serviceToken_ = edm::ServiceRegistry::createContaining(tnsptr_);
0270 }
0271
0272 namespace {
0273 template <typename T>
0274 void testTransition(std::shared_ptr<T> iMod,
0275 edm::Worker* iWorker,
0276 edm::OutputModuleCommunicator* iComm,
0277 testGlobalOutputModule::Trans iTrans,
0278 testGlobalOutputModule::Expectations const& iExpect,
0279 std::function<void(edm::Worker*, edm::OutputModuleCommunicator*)> iFunc) {
0280 assert(0 == iMod->m_count);
0281 iFunc(iWorker, iComm);
0282 auto count = std::count(iExpect.begin(), iExpect.end(), iTrans);
0283 if (count != iMod->m_count) {
0284 std::cout << "For trans " << static_cast<std::underlying_type<testGlobalOutputModule::Trans>::type>(iTrans)
0285 << " expected " << count << " and got " << iMod->m_count << std::endl;
0286 }
0287 CPPUNIT_ASSERT(iMod->m_count == count);
0288 iMod->m_count = 0;
0289 iWorker->reset();
0290 }
0291 }
0292
0293 template <typename T>
0294 void testGlobalOutputModule::testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect) {
0295 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 1);
0296
0297 oneapi::tbb::task_arena arena(1);
0298 arena.execute([&]() {
0299 iMod->doPreallocate(m_preallocConfig);
0300 edm::WorkerT<edm::global::OutputModuleBase> w{iMod, m_desc, m_params.actions_};
0301 edm::OutputModuleCommunicatorT<edm::global::OutputModuleBase> comm(iMod.get());
0302 for (auto& keyVal : m_transToFunc) {
0303 testTransition(iMod, &w, &comm, keyVal.first, iExpect, keyVal.second);
0304 }
0305 });
0306 }
0307
0308 void testGlobalOutputModule::basicTest() {
0309
0310 edm::ServiceRegistry::Operate operate(serviceToken_);
0311
0312 edm::ParameterSet pset;
0313 auto testProd = std::make_shared<BasicOutputModule>(pset);
0314
0315 CPPUNIT_ASSERT(0 == testProd->m_count);
0316 testTransitions(testProd, {Trans::kEvent, Trans::kGlobalEndLuminosityBlock, Trans::kGlobalEndRun});
0317 }
0318
0319 void testGlobalOutputModule::fileTest() {
0320
0321 edm::ServiceRegistry::Operate operate(serviceToken_);
0322
0323 edm::ParameterSet pset;
0324 auto testProd = std::make_shared<FileOutputModule>(pset);
0325
0326 CPPUNIT_ASSERT(0 == testProd->m_count);
0327 testTransitions(testProd,
0328 {Trans::kGlobalOpenInputFile,
0329 Trans::kEvent,
0330 Trans::kGlobalEndLuminosityBlock,
0331 Trans::kGlobalEndRun,
0332 Trans::kGlobalCloseInputFile});
0333 }