File indexing completed on 2025-04-22 06:27:24
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0017 #include "FWCore/TestProcessor/interface/EventSetupTestHelper.h"
0018
0019 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0020 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0021 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0022 #include "FWCore/Framework/interface/ScheduleItems.h"
0023 #include "FWCore/Framework/interface/EventPrincipal.h"
0024 #include "FWCore/Framework/interface/EventSetupProvider.h"
0025 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0026 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0027 #include "FWCore/Framework/interface/ExceptionActions.h"
0028 #include "FWCore/Framework/interface/HistoryAppender.h"
0029 #include "FWCore/Framework/interface/RunPrincipal.h"
0030 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0031 #include "FWCore/Framework/interface/EventSetupsController.h"
0032 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0033 #include "FWCore/Framework/interface/ProductPutterBase.h"
0034 #include "FWCore/Framework/interface/DelayedReader.h"
0035 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0036 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0037 #include "FWCore/Framework/interface/FileBlock.h"
0038 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0039 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0040
0041 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0042 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0043
0044 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0045 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0046 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0047
0048 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0049
0050 #include "oneTimeInitialization.h"
0051
0052 #include <mutex>
0053
0054 #define xstr(s) str(s)
0055 #define str(s) #s
0056
0057 namespace edm {
0058 namespace test {
0059
0060
0061
0062
0063 TestProcessor::TestProcessor(Config const& iConfig, ServiceToken iToken)
0064 : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0065 arena_(1),
0066 historyAppender_(std::make_unique<HistoryAppender>()),
0067 moduleRegistry_(std::make_shared<ModuleRegistry>()) {
0068
0069 (void)testprocessor::oneTimeInitialization();
0070
0071 ProcessDescImpl desc(iConfig.pythonConfiguration(), false);
0072
0073 auto psetPtr = desc.parameterSet();
0074 moduleTypeResolverMaker_ = makeModuleTypeResolverMaker(*psetPtr);
0075 espController_ = std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get());
0076
0077 validateTopLevelParameterSets(psetPtr.get());
0078
0079 ensureAvailableAccelerators(*psetPtr);
0080
0081 labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
0082
0083 auto procDesc = desc.processDesc();
0084
0085 ScheduleItems items;
0086
0087
0088 auto& serviceSets = procDesc->getServicesPSets();
0089 ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0090 serviceToken_ = items.addTNS(*psetPtr, token);
0091
0092
0093 ServiceRegistry::Operate operate(serviceToken_);
0094
0095
0096 std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
0097
0098
0099 esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
0100
0101 auto nThreads = 1U;
0102 auto nStreams = 1U;
0103 auto nConcurrentLumis = 1U;
0104 auto nConcurrentRuns = 1U;
0105 preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0106
0107 if (not iConfig.esProduceEntries().empty()) {
0108 esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
0109 esp_->add(std::dynamic_pointer_cast<eventsetup::ESProductResolverProvider>(esHelper_));
0110 esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
0111 }
0112
0113 auto tempReg = items.preg();
0114 processConfiguration_ = items.processConfiguration();
0115
0116 edm::ParameterSet emptyPSet;
0117 emptyPSet.registerIt();
0118 auto psetid = emptyPSet.id();
0119
0120 for (auto const& p : iConfig.extraProcesses()) {
0121 processHistory_.emplace_back(p, psetid, xstr(PROJECT_VERSION), HardwareResourcesDescription());
0122 processHistoryRegistry_.registerProcessHistory(processHistory_);
0123 }
0124
0125
0126 for (auto const& produce : iConfig.produceEntries()) {
0127 auto processName = produce.processName_;
0128 if (processName.empty()) {
0129 processName = processConfiguration_->processName();
0130 }
0131 edm::TypeWithDict twd(produce.type_.typeInfo());
0132 edm::ProductDescription product(edm::InEvent,
0133 produce.moduleLabel_,
0134 processName,
0135 twd.userClassName(),
0136 twd.friendlyClassName(),
0137 produce.instanceLabel_,
0138 twd,
0139 true
0140 );
0141 product.init();
0142 dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
0143 tempReg->addProduct(product);
0144 }
0145
0146 processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0147
0148 schedule_ = items.initSchedule(
0149 *psetPtr, preallocations_, &processContext_, moduleTypeResolverMaker_.get(), *processBlockHelper_);
0150
0151 act_table_ = std::move(items.act_table_);
0152 actReg_ = items.actReg_;
0153 branchIDListHelper_ = items.branchIDListHelper();
0154 thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0155 processContext_.setProcessConfiguration(processConfiguration_.get());
0156
0157 principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0158
0159 tempReg->setFrozen();
0160 preg_ = std::make_shared<edm::ProductRegistry>(tempReg->moveTo());
0161 mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0162
0163 for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0164
0165 auto ep = std::make_shared<EventPrincipal>(preg_,
0166 edm::productResolversFactory::makePrimary,
0167 branchIDListHelper_,
0168 thinnedAssociationsHelper_,
0169 *processConfiguration_,
0170 historyAppender_.get(),
0171 index);
0172 principalCache_.insert(std::move(ep));
0173 }
0174 for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0175 auto rp = std::make_unique<RunPrincipal>(preg_,
0176 edm::productResolversFactory::makePrimary,
0177 *processConfiguration_,
0178 historyAppender_.get(),
0179 index,
0180 &mergeableRunProductProcesses_);
0181 principalCache_.insert(std::move(rp));
0182 }
0183 for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0184 auto lp = std::make_unique<LuminosityBlockPrincipal>(
0185 preg_, edm::productResolversFactory::makePrimary, *processConfiguration_, historyAppender_.get(), index);
0186 principalCache_.insert(std::move(lp));
0187 }
0188 {
0189 auto pb = std::make_unique<ProcessBlockPrincipal>(
0190 preg_, edm::productResolversFactory::makePrimary, *processConfiguration_);
0191 principalCache_.insert(std::move(pb));
0192 }
0193 }
0194
0195 TestProcessor::~TestProcessor() noexcept(false) { teardownProcessing(); }
0196
0197
0198
0199
0200 void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
0201 if (index >= dataProducts_.size()) {
0202 throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
0203 "with a call to the function \'produces\' BEFORE passing the\n"
0204 "TestProcessor::Config object to the TestProcessor constructor";
0205 }
0206 dataProducts_[index].second = std::move(iWrapper);
0207 }
0208
0209 edm::test::Event TestProcessor::testImpl() {
0210 bool result = arena_.execute([this]() {
0211 setupProcessing();
0212 event();
0213
0214 return schedule_->totalEventsPassed() > 0;
0215 });
0216 schedule_->clearCounters();
0217 if (esHelper_) {
0218
0219 esHelper_->resetAllResolvers();
0220 }
0221 return edm::test::Event(
0222 principalCache_.eventPrincipal(0), labelOfTestModule_, processConfiguration_->processName(), result);
0223 }
0224
0225 edm::test::LuminosityBlock TestProcessor::testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum) {
0226 arena_.execute([this, iNum]() {
0227 if (not beginJobCalled_) {
0228 beginJob();
0229 }
0230 if (not respondToOpenInputFileCalled_) {
0231 respondToOpenInputFile();
0232 }
0233 if (not beginProcessBlockCalled_) {
0234 beginProcessBlock();
0235 }
0236 if (not openOutputFilesCalled_) {
0237 openOutputFiles();
0238 }
0239
0240 if (not beginRunCalled_) {
0241 beginRun();
0242 }
0243 if (beginLumiCalled_) {
0244 endLuminosityBlock();
0245 assert(lumiNumber_ != iNum);
0246 }
0247 lumiNumber_ = iNum;
0248 beginLuminosityBlock();
0249 });
0250
0251 if (esHelper_) {
0252
0253 esHelper_->resetAllResolvers();
0254 }
0255 return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0256 }
0257
0258 edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() {
0259
0260
0261 std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
0262 arena_.execute([this, &lumi]() {
0263 if (not beginJobCalled_) {
0264 beginJob();
0265 }
0266 if (not respondToOpenInputFileCalled_) {
0267 respondToOpenInputFile();
0268 }
0269 if (not beginProcessBlockCalled_) {
0270 beginProcessBlock();
0271 }
0272 if (not openOutputFilesCalled_) {
0273 openOutputFiles();
0274 }
0275 if (not beginRunCalled_) {
0276 beginRun();
0277 }
0278 if (not beginLumiCalled_) {
0279 beginLuminosityBlock();
0280 }
0281 lumi = endLuminosityBlock();
0282 });
0283 if (esHelper_) {
0284
0285 esHelper_->resetAllResolvers();
0286 }
0287
0288 return edm::test::LuminosityBlock(std::move(lumi), labelOfTestModule_, processConfiguration_->processName());
0289 }
0290
0291 edm::test::Run TestProcessor::testBeginRunImpl(edm::RunNumber_t iNum) {
0292 arena_.execute([this, iNum]() {
0293 if (not beginJobCalled_) {
0294 beginJob();
0295 }
0296 if (not respondToOpenInputFileCalled_) {
0297 respondToOpenInputFile();
0298 }
0299 if (not beginProcessBlockCalled_) {
0300 beginProcessBlock();
0301 }
0302 if (not openOutputFilesCalled_) {
0303 openOutputFiles();
0304 }
0305 if (beginRunCalled_) {
0306 assert(runNumber_ != iNum);
0307 endRun();
0308 }
0309 runNumber_ = iNum;
0310 beginRun();
0311 });
0312 if (esHelper_) {
0313
0314 esHelper_->resetAllResolvers();
0315 }
0316 return edm::test::Run(runPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0317 }
0318 edm::test::Run TestProcessor::testEndRunImpl() {
0319
0320
0321 std::shared_ptr<edm::RunPrincipal> rp;
0322 arena_.execute([this, &rp]() {
0323 if (not beginJobCalled_) {
0324 beginJob();
0325 }
0326 if (not respondToOpenInputFileCalled_) {
0327 respondToOpenInputFile();
0328 }
0329 if (not beginProcessBlockCalled_) {
0330 beginProcessBlock();
0331 }
0332 if (not openOutputFilesCalled_) {
0333 openOutputFiles();
0334 }
0335 if (not beginRunCalled_) {
0336 beginRun();
0337 }
0338 rp = endRun();
0339 });
0340 if (esHelper_) {
0341
0342 esHelper_->resetAllResolvers();
0343 }
0344
0345 return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
0346 }
0347
0348 edm::test::ProcessBlock TestProcessor::testBeginProcessBlockImpl() {
0349 arena_.execute([this]() {
0350 if (not beginJobCalled_) {
0351 beginJob();
0352 }
0353 beginProcessBlock();
0354 });
0355 return edm::test::ProcessBlock(
0356 &principalCache_.processBlockPrincipal(), labelOfTestModule_, processConfiguration_->processName());
0357 }
0358 edm::test::ProcessBlock TestProcessor::testEndProcessBlockImpl() {
0359 auto pbp = arena_.execute([this]() {
0360 if (not beginJobCalled_) {
0361 beginJob();
0362 }
0363 if (not beginProcessBlockCalled_) {
0364 beginProcessBlock();
0365 }
0366 return endProcessBlock();
0367 });
0368 return edm::test::ProcessBlock(pbp, labelOfTestModule_, processConfiguration_->processName());
0369 }
0370
0371 void TestProcessor::setupProcessing() {
0372 if (not beginJobCalled_) {
0373 beginJob();
0374 }
0375 if (not respondToOpenInputFileCalled_) {
0376 respondToOpenInputFile();
0377 }
0378 if (not beginProcessBlockCalled_) {
0379 beginProcessBlock();
0380 }
0381 if (not openOutputFilesCalled_) {
0382 openOutputFiles();
0383 }
0384 if (not beginRunCalled_) {
0385 beginRun();
0386 }
0387 if (not beginLumiCalled_) {
0388 beginLuminosityBlock();
0389 }
0390 }
0391
0392 void TestProcessor::teardownProcessing() {
0393 arena_.execute([this]() {
0394 if (beginLumiCalled_) {
0395 endLuminosityBlock();
0396 beginLumiCalled_ = false;
0397 }
0398 if (beginRunCalled_) {
0399 endRun();
0400 beginRunCalled_ = false;
0401 }
0402 if (respondToOpenInputFileCalled_) {
0403 respondToCloseInputFile();
0404 }
0405 if (beginProcessBlockCalled_) {
0406 endProcessBlock();
0407 beginProcessBlockCalled_ = false;
0408 }
0409 if (openOutputFilesCalled_) {
0410 closeOutputFiles();
0411 openOutputFilesCalled_ = false;
0412 }
0413 if (beginJobCalled_) {
0414 endJob();
0415 }
0416 edm::FinalWaitingTask task{taskGroup_};
0417 espController_->endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &task));
0418 task.waitNoThrow();
0419 });
0420 }
0421
0422 void TestProcessor::beginJob() {
0423 ServiceRegistry::Operate operate(serviceToken_);
0424
0425 service::SystemBounds bounds(preallocations_.numberOfStreams(),
0426 preallocations_.numberOfLuminosityBlocks(),
0427 preallocations_.numberOfRuns(),
0428 preallocations_.numberOfThreads());
0429 actReg_->preallocateSignal_(bounds);
0430 schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0431
0432 espController_->finishConfiguration();
0433 actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
0434
0435 schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_, processContext_);
0436
0437 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0438 schedule_->beginStream(i);
0439 }
0440 beginJobCalled_ = true;
0441 }
0442
0443 void TestProcessor::beginProcessBlock() {
0444 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0445 processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0446
0447 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0448 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0449 processGlobalTransition<Traits>(transitionInfo);
0450
0451 beginProcessBlockCalled_ = true;
0452 }
0453
0454 void TestProcessor::openOutputFiles() {
0455
0456 ServiceRegistry::Operate operate(serviceToken_);
0457
0458 edm::FileBlock fb;
0459 schedule_->openOutputFiles(fb);
0460 openOutputFilesCalled_ = true;
0461 }
0462
0463 void TestProcessor::closeOutputFiles() {
0464 if (openOutputFilesCalled_) {
0465
0466 ServiceRegistry::Operate operate(serviceToken_);
0467 schedule_->closeOutputFiles();
0468
0469 openOutputFilesCalled_ = false;
0470 }
0471 }
0472
0473 void TestProcessor::respondToOpenInputFile() {
0474 respondToOpenInputFileCalled_ = true;
0475 edm::FileBlock fb;
0476
0477 ServiceRegistry::Operate operate(serviceToken_);
0478 schedule_->respondToOpenInputFile(fb);
0479 }
0480
0481 void TestProcessor::respondToCloseInputFile() {
0482 if (respondToOpenInputFileCalled_) {
0483 edm::FileBlock fb;
0484
0485 ServiceRegistry::Operate operate(serviceToken_);
0486
0487 schedule_->respondToCloseInputFile(fb);
0488 respondToOpenInputFileCalled_ = false;
0489 }
0490 }
0491
0492 void TestProcessor::beginRun() {
0493 runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0494 runPrincipal_->clearPrincipal();
0495 assert(runPrincipal_);
0496 edm::RunAuxiliary aux(runNumber_, Timestamp(), Timestamp());
0497 aux.setProcessHistoryID(processHistory_.id());
0498 runPrincipal_->setAux(aux);
0499
0500 runPrincipal_->fillRunPrincipal(processHistoryRegistry_);
0501
0502 IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
0503 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0504
0505 auto const& es = esp_->eventSetupImpl();
0506
0507 RunTransitionInfo transitionInfo(*runPrincipal_, es, nullptr);
0508 {
0509 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0510 processGlobalTransition<Traits>(transitionInfo);
0511 }
0512 {
0513 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0514 processTransitionForAllStreams<Traits>(transitionInfo);
0515 }
0516 beginRunCalled_ = true;
0517 }
0518
0519 void TestProcessor::beginLuminosityBlock() {
0520 LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp());
0521 aux.setProcessHistoryID(processHistory_.id());
0522 lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0523 lumiPrincipal_->clearPrincipal();
0524 assert(lumiPrincipal_);
0525 lumiPrincipal_->setAux(aux);
0526
0527 lumiPrincipal_->setRunPrincipal(runPrincipal_);
0528 lumiPrincipal_->fillLuminosityBlockPrincipal(&processHistory_);
0529
0530 IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime());
0531 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0532
0533 auto const& es = esp_->eventSetupImpl();
0534
0535 LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
0536
0537 {
0538 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0539 processGlobalTransition<Traits>(transitionInfo);
0540 }
0541 {
0542 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0543 processTransitionForAllStreams<Traits>(transitionInfo);
0544 }
0545 beginLumiCalled_ = true;
0546 }
0547
0548 void TestProcessor::event() {
0549 auto pep = &(principalCache_.eventPrincipal(0));
0550
0551
0552 pep->clearEventPrincipal();
0553 edm::EventAuxiliary aux(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false);
0554 aux.setProcessHistoryID(processHistory_.id());
0555 pep->fillEventPrincipal(aux, nullptr, nullptr);
0556 assert(lumiPrincipal_.get() != nullptr);
0557 pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
0558
0559 for (auto& p : dataProducts_) {
0560 if (p.second) {
0561 pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
0562 } else {
0563
0564
0565 auto r = pep->getProductResolver(p.first.branchID());
0566 dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
0567 }
0568 }
0569
0570 ServiceRegistry::Operate operate(serviceToken_);
0571
0572 FinalWaitingTask waitTask{taskGroup_};
0573
0574 EventTransitionInfo info(*pep, esp_->eventSetupImpl());
0575 schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
0576
0577 waitTask.wait();
0578 ++eventNumber_;
0579 }
0580
0581 std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
0582 auto lumiPrincipal = lumiPrincipal_;
0583 if (beginLumiCalled_) {
0584
0585 ServiceRegistry::Operate operate(serviceToken_);
0586
0587 beginLumiCalled_ = false;
0588 lumiPrincipal_.reset();
0589
0590 IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
0591 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0592
0593 auto const& es = esp_->eventSetupImpl();
0594
0595 LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
0596
0597 {
0598 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0599 processTransitionForAllStreams<Traits>(transitionInfo);
0600 }
0601 {
0602 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0603 processGlobalTransition<Traits>(transitionInfo);
0604 }
0605 {
0606 FinalWaitingTask globalWaitTask{taskGroup_};
0607 schedule_->writeLumiAsync(
0608 WaitingTaskHolder(taskGroup_, &globalWaitTask), *lumiPrincipal, &processContext_, actReg_.get());
0609 globalWaitTask.wait();
0610 }
0611 }
0612 lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
0613 return lumiPrincipal;
0614 }
0615
0616 std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
0617 auto runPrincipal = runPrincipal_;
0618 runPrincipal_.reset();
0619 if (beginRunCalled_) {
0620 beginRunCalled_ = false;
0621
0622
0623 ServiceRegistry::Operate operate(serviceToken_);
0624
0625 IOVSyncValue ts(
0626 EventID(runPrincipal->run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
0627 runPrincipal->endTime());
0628 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0629
0630 auto const& es = esp_->eventSetupImpl();
0631
0632 RunTransitionInfo transitionInfo(*runPrincipal, es);
0633
0634 {
0635 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0636 processTransitionForAllStreams<Traits>(transitionInfo);
0637 }
0638 {
0639 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0640 processGlobalTransition<Traits>(transitionInfo);
0641 }
0642 {
0643 FinalWaitingTask globalWaitTask{taskGroup_};
0644 schedule_->writeRunAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0645 *runPrincipal,
0646 &processContext_,
0647 actReg_.get(),
0648 runPrincipal->mergeableRunProductMetadata());
0649 globalWaitTask.wait();
0650 }
0651 }
0652 return runPrincipal;
0653 }
0654
0655 ProcessBlockPrincipal const* TestProcessor::endProcessBlock() {
0656 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0657 if (beginProcessBlockCalled_) {
0658 beginProcessBlockCalled_ = false;
0659
0660 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0661 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0662 processGlobalTransition<Traits>(transitionInfo);
0663 }
0664 return &processBlockPrincipal;
0665 }
0666
0667 void TestProcessor::endJob() {
0668 if (!beginJobCalled_) {
0669 return;
0670 }
0671 beginJobCalled_ = false;
0672
0673
0674 ExceptionCollector c(
0675 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0676 std::mutex collectorMutex;
0677
0678
0679 ServiceRegistry::Operate operate(serviceToken_);
0680
0681
0682 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0683 schedule_->endStream(i, c, collectorMutex);
0684 }
0685 auto actReg = actReg_.get();
0686 c.call([actReg]() { actReg->preEndJobSignal_(); });
0687 schedule_->endJob(c);
0688 c.call([actReg]() { actReg->postEndJobSignal_(); });
0689 if (c.hasThrown()) {
0690 c.rethrow();
0691 }
0692 }
0693
0694 void TestProcessor::setRunNumber(edm::RunNumber_t iRun) {
0695 if (beginRunCalled_) {
0696 endLuminosityBlock();
0697 endRun();
0698 }
0699 runNumber_ = iRun;
0700 }
0701 void TestProcessor::setLuminosityBlockNumber(edm::LuminosityBlockNumber_t iLumi) {
0702 endLuminosityBlock();
0703 lumiNumber_ = iLumi;
0704 }
0705
0706 void TestProcessor::setEventNumber(edm::EventNumber_t iEv) { eventNumber_ = iEv; }
0707
0708 template <typename Traits>
0709 void TestProcessor::processTransitionForAllStreams(typename Traits::TransitionInfoType& transitionInfo) {
0710 FinalWaitingTask finalWaitTask{taskGroup_};
0711 {
0712 WaitingTaskHolder holder(taskGroup_, &finalWaitTask);
0713
0714 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0715 schedule_->processOneStreamAsync<Traits>(holder, i, transitionInfo, serviceToken_);
0716 }
0717 }
0718 finalWaitTask.wait();
0719 }
0720
0721 template <typename Traits>
0722 void TestProcessor::processGlobalTransition(typename Traits::TransitionInfoType& transitionInfo) {
0723 FinalWaitingTask finalWaitTask{taskGroup_};
0724 schedule_->processOneGlobalAsync<Traits>(
0725 WaitingTaskHolder(taskGroup_, &finalWaitTask), transitionInfo, serviceToken_);
0726 finalWaitTask.wait();
0727 }
0728
0729 }
0730 }