File indexing completed on 2025-06-29 22:58:09
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0017 #include "FWCore/TestProcessor/interface/EventSetupTestHelper.h"
0018
0019 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0020 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0021 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0022 #include "FWCore/Framework/interface/ScheduleItems.h"
0023 #include "FWCore/Framework/interface/EventPrincipal.h"
0024 #include "FWCore/Framework/interface/EventSetupProvider.h"
0025 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0026 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0027 #include "FWCore/Framework/interface/ExceptionActions.h"
0028 #include "FWCore/Framework/interface/HistoryAppender.h"
0029 #include "FWCore/Framework/interface/RunPrincipal.h"
0030 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0031 #include "FWCore/Framework/interface/EventSetupsController.h"
0032 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0033 #include "FWCore/Framework/interface/ProductPutterBase.h"
0034 #include "FWCore/Framework/interface/DelayedReader.h"
0035 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0036 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0037 #include "FWCore/Framework/interface/FileBlock.h"
0038 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0039 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0040
0041 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0042 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0043
0044 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0045 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0046 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0047
0048 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0049
0050 #include "oneTimeInitialization.h"
0051
0052 #include <mutex>
0053
0054 #define xstr(s) str(s)
0055 #define str(s) #s
0056
0057 namespace edm {
0058 namespace test {
0059
0060
0061
0062
0063 TestProcessor::TestProcessor(Config const& iConfig, ServiceToken iToken)
0064 : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0065 arena_(1),
0066 historyAppender_(std::make_unique<HistoryAppender>()),
0067 moduleRegistry_(std::make_shared<ModuleRegistry>()) {
0068
0069 (void)testprocessor::oneTimeInitialization();
0070
0071 ProcessDescImpl desc(iConfig.pythonConfiguration(), false);
0072
0073 auto psetPtr = desc.parameterSet();
0074 moduleTypeResolverMaker_ = makeModuleTypeResolverMaker(*psetPtr);
0075 espController_ = std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get());
0076
0077 validateTopLevelParameterSets(psetPtr.get());
0078
0079 ensureAvailableAccelerators(*psetPtr);
0080
0081 labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
0082
0083 auto procDesc = desc.processDesc();
0084
0085 ScheduleItems items;
0086
0087
0088 auto& serviceSets = procDesc->getServicesPSets();
0089 ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError);
0090 serviceToken_ = items.addTNS(*psetPtr, token);
0091
0092
0093 ServiceRegistry::Operate operate(serviceToken_);
0094
0095
0096 std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
0097
0098
0099 esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
0100
0101 auto nThreads = 1U;
0102 auto nStreams = 1U;
0103 auto nConcurrentLumis = 1U;
0104 auto nConcurrentRuns = 1U;
0105 preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0106
0107 if (not iConfig.esProduceEntries().empty()) {
0108 esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
0109 esp_->add(std::dynamic_pointer_cast<eventsetup::ESProductResolverProvider>(esHelper_));
0110 esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
0111 }
0112
0113 auto tempReg = items.preg();
0114 processConfiguration_ = items.processConfiguration();
0115
0116 edm::ParameterSet emptyPSet;
0117 emptyPSet.registerIt();
0118 auto psetid = emptyPSet.id();
0119
0120 for (auto const& p : iConfig.extraProcesses()) {
0121 processHistory_.emplace_back(p, psetid, xstr(PROJECT_VERSION), HardwareResourcesDescription());
0122 processHistoryRegistry_.registerProcessHistory(processHistory_);
0123 }
0124
0125
0126 for (auto const& produce : iConfig.produceEntries()) {
0127 auto processName = produce.processName_;
0128 if (processName.empty()) {
0129 processName = processConfiguration_->processName();
0130 }
0131 edm::TypeWithDict twd(produce.type_.typeInfo());
0132 edm::ProductDescription product(edm::InEvent,
0133 produce.moduleLabel_,
0134 processName,
0135 twd.userClassName(),
0136 twd.friendlyClassName(),
0137 produce.instanceLabel_,
0138 twd,
0139 true
0140 );
0141 product.init();
0142 dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
0143 tempReg->addProduct(product);
0144 }
0145
0146 processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0147
0148 schedule_ = items.initSchedule(
0149 *psetPtr, preallocations_, &processContext_, moduleTypeResolverMaker_.get(), *processBlockHelper_);
0150
0151 act_table_ = std::move(items.act_table_);
0152 actReg_ = items.actReg_;
0153 branchIDListHelper_ = items.branchIDListHelper();
0154 thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0155 processContext_.setProcessConfiguration(processConfiguration_.get());
0156
0157 principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0158
0159 tempReg->setFrozen();
0160 preg_ = std::make_shared<edm::ProductRegistry>(tempReg->moveTo());
0161 mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0162
0163 for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0164
0165 auto ep = std::make_shared<EventPrincipal>(preg_,
0166 edm::productResolversFactory::makePrimary,
0167 branchIDListHelper_,
0168 thinnedAssociationsHelper_,
0169 *processConfiguration_,
0170 historyAppender_.get(),
0171 index);
0172 principalCache_.insert(std::move(ep));
0173 }
0174 for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0175 auto rp = std::make_unique<RunPrincipal>(preg_,
0176 edm::productResolversFactory::makePrimary,
0177 *processConfiguration_,
0178 historyAppender_.get(),
0179 index,
0180 &mergeableRunProductProcesses_);
0181 principalCache_.insert(std::move(rp));
0182 }
0183 for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0184 auto lp = std::make_unique<LuminosityBlockPrincipal>(
0185 preg_, edm::productResolversFactory::makePrimary, *processConfiguration_, historyAppender_.get(), index);
0186 principalCache_.insert(std::move(lp));
0187 }
0188 {
0189 auto pb = std::make_unique<ProcessBlockPrincipal>(
0190 preg_, edm::productResolversFactory::makePrimary, *processConfiguration_);
0191 principalCache_.insert(std::move(pb));
0192 }
0193 }
0194
0195 TestProcessor::~TestProcessor() noexcept(false) { teardownProcessing(); }
0196
0197
0198
0199
0200 void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
0201 if (index >= dataProducts_.size()) {
0202 throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
0203 "with a call to the function \'produces\' BEFORE passing the\n"
0204 "TestProcessor::Config object to the TestProcessor constructor";
0205 }
0206 dataProducts_[index].second = std::move(iWrapper);
0207 }
0208
0209 edm::test::Event TestProcessor::testImpl() {
0210 bool result = arena_.execute([this]() {
0211 setupProcessing();
0212 event();
0213
0214 return schedule_->totalEventsPassed() > 0;
0215 });
0216 schedule_->clearCounters();
0217 if (esHelper_) {
0218
0219 esHelper_->resetAllResolvers();
0220 }
0221 return edm::test::Event(
0222 principalCache_.eventPrincipal(0), labelOfTestModule_, processConfiguration_->processName(), result);
0223 }
0224
0225 edm::test::LuminosityBlock TestProcessor::testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum) {
0226 arena_.execute([this, iNum]() {
0227 if (not beginJobCalled_) {
0228 beginJob();
0229 }
0230 if (not respondToOpenInputFileCalled_) {
0231 respondToOpenInputFile();
0232 }
0233 if (not beginProcessBlockCalled_) {
0234 beginProcessBlock();
0235 }
0236 if (not openOutputFilesCalled_) {
0237 openOutputFiles();
0238 }
0239
0240 if (not beginRunCalled_) {
0241 beginRun();
0242 }
0243 if (beginLumiCalled_) {
0244 endLuminosityBlock();
0245 assert(lumiNumber_ != iNum);
0246 }
0247 lumiNumber_ = iNum;
0248 beginLuminosityBlock();
0249 });
0250
0251 if (esHelper_) {
0252
0253 esHelper_->resetAllResolvers();
0254 }
0255 return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0256 }
0257
0258 edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() {
0259
0260
0261 std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
0262 arena_.execute([this, &lumi]() {
0263 if (not beginJobCalled_) {
0264 beginJob();
0265 }
0266 if (not respondToOpenInputFileCalled_) {
0267 respondToOpenInputFile();
0268 }
0269 if (not beginProcessBlockCalled_) {
0270 beginProcessBlock();
0271 }
0272 if (not openOutputFilesCalled_) {
0273 openOutputFiles();
0274 }
0275 if (not beginRunCalled_) {
0276 beginRun();
0277 }
0278 if (not beginLumiCalled_) {
0279 beginLuminosityBlock();
0280 }
0281 lumi = endLuminosityBlock();
0282 });
0283 if (esHelper_) {
0284
0285 esHelper_->resetAllResolvers();
0286 }
0287
0288 return edm::test::LuminosityBlock(std::move(lumi), labelOfTestModule_, processConfiguration_->processName());
0289 }
0290
0291 edm::test::Run TestProcessor::testBeginRunImpl(edm::RunNumber_t iNum) {
0292 arena_.execute([this, iNum]() {
0293 if (not beginJobCalled_) {
0294 beginJob();
0295 }
0296 if (not respondToOpenInputFileCalled_) {
0297 respondToOpenInputFile();
0298 }
0299 if (not beginProcessBlockCalled_) {
0300 beginProcessBlock();
0301 }
0302 if (not openOutputFilesCalled_) {
0303 openOutputFiles();
0304 }
0305 if (beginRunCalled_) {
0306 assert(runNumber_ != iNum);
0307 endRun();
0308 }
0309 runNumber_ = iNum;
0310 beginRun();
0311 });
0312 if (esHelper_) {
0313
0314 esHelper_->resetAllResolvers();
0315 }
0316 return edm::test::Run(runPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0317 }
0318 edm::test::Run TestProcessor::testEndRunImpl() {
0319
0320
0321 std::shared_ptr<edm::RunPrincipal> rp;
0322 arena_.execute([this, &rp]() {
0323 if (not beginJobCalled_) {
0324 beginJob();
0325 }
0326 if (not respondToOpenInputFileCalled_) {
0327 respondToOpenInputFile();
0328 }
0329 if (not beginProcessBlockCalled_) {
0330 beginProcessBlock();
0331 }
0332 if (not openOutputFilesCalled_) {
0333 openOutputFiles();
0334 }
0335 if (not beginRunCalled_) {
0336 beginRun();
0337 }
0338 rp = endRun();
0339 });
0340 if (esHelper_) {
0341
0342 esHelper_->resetAllResolvers();
0343 }
0344
0345 return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
0346 }
0347
0348 edm::test::ProcessBlock TestProcessor::testBeginProcessBlockImpl() {
0349 arena_.execute([this]() {
0350 if (not beginJobCalled_) {
0351 beginJob();
0352 }
0353 beginProcessBlock();
0354 });
0355 return edm::test::ProcessBlock(
0356 &principalCache_.processBlockPrincipal(), labelOfTestModule_, processConfiguration_->processName());
0357 }
0358 edm::test::ProcessBlock TestProcessor::testEndProcessBlockImpl() {
0359 auto pbp = arena_.execute([this]() {
0360 if (not beginJobCalled_) {
0361 beginJob();
0362 }
0363 if (not beginProcessBlockCalled_) {
0364 beginProcessBlock();
0365 }
0366 return endProcessBlock();
0367 });
0368 return edm::test::ProcessBlock(pbp, labelOfTestModule_, processConfiguration_->processName());
0369 }
0370
0371 void TestProcessor::setupProcessing() {
0372 if (not beginJobCalled_) {
0373 beginJob();
0374 }
0375 if (not respondToOpenInputFileCalled_) {
0376 respondToOpenInputFile();
0377 }
0378 if (not beginProcessBlockCalled_) {
0379 beginProcessBlock();
0380 }
0381 if (not openOutputFilesCalled_) {
0382 openOutputFiles();
0383 }
0384 if (not beginRunCalled_) {
0385 beginRun();
0386 }
0387 if (not beginLumiCalled_) {
0388 beginLuminosityBlock();
0389 }
0390 }
0391
0392 void TestProcessor::teardownProcessing() {
0393 arena_.execute([this]() {
0394 if (beginLumiCalled_) {
0395 endLuminosityBlock();
0396 beginLumiCalled_ = false;
0397 }
0398 if (beginRunCalled_) {
0399 endRun();
0400 beginRunCalled_ = false;
0401 }
0402 if (respondToOpenInputFileCalled_) {
0403 respondToCloseInputFile();
0404 }
0405 if (beginProcessBlockCalled_) {
0406 endProcessBlock();
0407 beginProcessBlockCalled_ = false;
0408 }
0409 if (openOutputFilesCalled_) {
0410 closeOutputFiles();
0411 openOutputFilesCalled_ = false;
0412 }
0413 if (beginJobCalled_) {
0414 endJob();
0415 }
0416 edm::FinalWaitingTask task{taskGroup_};
0417 espController_->endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &task));
0418 task.waitNoThrow();
0419 });
0420 }
0421
0422 void TestProcessor::beginJob() {
0423 ServiceRegistry::Operate operate(serviceToken_);
0424
0425 service::SystemBounds bounds(preallocations_.numberOfStreams(),
0426 preallocations_.numberOfLuminosityBlocks(),
0427 preallocations_.numberOfRuns(),
0428 preallocations_.numberOfThreads());
0429 actReg_->preallocateSignal_(bounds);
0430 schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0431
0432 espController_->finishConfiguration();
0433 actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
0434
0435 schedule_->beginJob(
0436 *preg_, esp_->recordsToResolverIndices(), *processBlockHelper_, processContext_.processName());
0437
0438 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0439 schedule_->beginStream(i);
0440 }
0441 beginJobCalled_ = true;
0442 }
0443
0444 void TestProcessor::beginProcessBlock() {
0445 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0446 processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0447
0448 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0449 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0450 processGlobalTransition<Traits>(transitionInfo);
0451
0452 beginProcessBlockCalled_ = true;
0453 }
0454
0455 void TestProcessor::openOutputFiles() {
0456
0457 ServiceRegistry::Operate operate(serviceToken_);
0458
0459 edm::FileBlock fb;
0460 schedule_->openOutputFiles(fb);
0461 openOutputFilesCalled_ = true;
0462 }
0463
0464 void TestProcessor::closeOutputFiles() {
0465 if (openOutputFilesCalled_) {
0466
0467 ServiceRegistry::Operate operate(serviceToken_);
0468 schedule_->closeOutputFiles();
0469
0470 openOutputFilesCalled_ = false;
0471 }
0472 }
0473
0474 void TestProcessor::respondToOpenInputFile() {
0475 respondToOpenInputFileCalled_ = true;
0476 edm::FileBlock fb;
0477
0478 ServiceRegistry::Operate operate(serviceToken_);
0479 schedule_->respondToOpenInputFile(fb);
0480 }
0481
0482 void TestProcessor::respondToCloseInputFile() {
0483 if (respondToOpenInputFileCalled_) {
0484 edm::FileBlock fb;
0485
0486 ServiceRegistry::Operate operate(serviceToken_);
0487
0488 schedule_->respondToCloseInputFile(fb);
0489 respondToOpenInputFileCalled_ = false;
0490 }
0491 }
0492
0493 void TestProcessor::beginRun() {
0494 runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0495 runPrincipal_->clearPrincipal();
0496 assert(runPrincipal_);
0497 edm::RunAuxiliary aux(runNumber_, Timestamp(), Timestamp());
0498 aux.setProcessHistoryID(processHistory_.id());
0499 runPrincipal_->setAux(aux);
0500
0501 runPrincipal_->fillRunPrincipal(processHistoryRegistry_);
0502
0503 IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
0504 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0505
0506 auto const& es = esp_->eventSetupImpl();
0507
0508 RunTransitionInfo transitionInfo(*runPrincipal_, es, nullptr);
0509 {
0510 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0511 processGlobalTransition<Traits>(transitionInfo);
0512 }
0513 {
0514 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0515 processTransitionForAllStreams<Traits>(transitionInfo);
0516 }
0517 beginRunCalled_ = true;
0518 }
0519
0520 void TestProcessor::beginLuminosityBlock() {
0521 LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp());
0522 aux.setProcessHistoryID(processHistory_.id());
0523 lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0524 lumiPrincipal_->clearPrincipal();
0525 assert(lumiPrincipal_);
0526 lumiPrincipal_->setAux(aux);
0527
0528 lumiPrincipal_->setRunPrincipal(runPrincipal_);
0529 lumiPrincipal_->fillLuminosityBlockPrincipal(&processHistory_);
0530
0531 IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime());
0532 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0533
0534 auto const& es = esp_->eventSetupImpl();
0535
0536 LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
0537
0538 {
0539 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0540 processGlobalTransition<Traits>(transitionInfo);
0541 }
0542 {
0543 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0544 processTransitionForAllStreams<Traits>(transitionInfo);
0545 }
0546 beginLumiCalled_ = true;
0547 }
0548
0549 void TestProcessor::event() {
0550 auto pep = &(principalCache_.eventPrincipal(0));
0551
0552
0553 pep->clearEventPrincipal();
0554 edm::EventAuxiliary aux(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false);
0555 aux.setProcessHistoryID(processHistory_.id());
0556 pep->fillEventPrincipal(aux, nullptr, nullptr);
0557 assert(lumiPrincipal_.get() != nullptr);
0558 pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
0559
0560 for (auto& p : dataProducts_) {
0561 if (p.second) {
0562 pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
0563 } else {
0564
0565
0566 auto r = pep->getProductResolver(p.first.branchID());
0567 dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
0568 }
0569 }
0570
0571 ServiceRegistry::Operate operate(serviceToken_);
0572
0573 FinalWaitingTask waitTask{taskGroup_};
0574
0575 EventTransitionInfo info(*pep, esp_->eventSetupImpl());
0576 schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
0577
0578 waitTask.wait();
0579 ++eventNumber_;
0580 }
0581
0582 std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
0583 auto lumiPrincipal = lumiPrincipal_;
0584 if (beginLumiCalled_) {
0585
0586 ServiceRegistry::Operate operate(serviceToken_);
0587
0588 beginLumiCalled_ = false;
0589 lumiPrincipal_.reset();
0590
0591 IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
0592 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0593
0594 auto const& es = esp_->eventSetupImpl();
0595
0596 LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
0597
0598 {
0599 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0600 processTransitionForAllStreams<Traits>(transitionInfo);
0601 }
0602 {
0603 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0604 processGlobalTransition<Traits>(transitionInfo);
0605 }
0606 {
0607 FinalWaitingTask globalWaitTask{taskGroup_};
0608 schedule_->writeLumiAsync(
0609 WaitingTaskHolder(taskGroup_, &globalWaitTask), *lumiPrincipal, &processContext_, actReg_.get());
0610 globalWaitTask.wait();
0611 }
0612 }
0613 lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
0614 return lumiPrincipal;
0615 }
0616
0617 std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
0618 auto runPrincipal = runPrincipal_;
0619 runPrincipal_.reset();
0620 if (beginRunCalled_) {
0621 beginRunCalled_ = false;
0622
0623
0624 ServiceRegistry::Operate operate(serviceToken_);
0625
0626 IOVSyncValue ts(
0627 EventID(runPrincipal->run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
0628 runPrincipal->endTime());
0629 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0630
0631 auto const& es = esp_->eventSetupImpl();
0632
0633 RunTransitionInfo transitionInfo(*runPrincipal, es);
0634
0635 {
0636 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0637 processTransitionForAllStreams<Traits>(transitionInfo);
0638 }
0639 {
0640 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0641 processGlobalTransition<Traits>(transitionInfo);
0642 }
0643 {
0644 FinalWaitingTask globalWaitTask{taskGroup_};
0645 schedule_->writeRunAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0646 *runPrincipal,
0647 &processContext_,
0648 actReg_.get(),
0649 runPrincipal->mergeableRunProductMetadata());
0650 globalWaitTask.wait();
0651 }
0652 }
0653 return runPrincipal;
0654 }
0655
0656 ProcessBlockPrincipal const* TestProcessor::endProcessBlock() {
0657 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0658 if (beginProcessBlockCalled_) {
0659 beginProcessBlockCalled_ = false;
0660
0661 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0662 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0663 processGlobalTransition<Traits>(transitionInfo);
0664 }
0665 return &processBlockPrincipal;
0666 }
0667
0668 void TestProcessor::endJob() {
0669 if (!beginJobCalled_) {
0670 return;
0671 }
0672 beginJobCalled_ = false;
0673
0674
0675 ExceptionCollector c(
0676 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0677 std::mutex collectorMutex;
0678
0679
0680 ServiceRegistry::Operate operate(serviceToken_);
0681
0682
0683 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0684 schedule_->endStream(i, c, collectorMutex);
0685 }
0686 auto actReg = actReg_.get();
0687 c.call([actReg]() { actReg->preEndJobSignal_(); });
0688 schedule_->endJob(c);
0689 c.call([actReg]() { actReg->postEndJobSignal_(); });
0690 if (c.hasThrown()) {
0691 c.rethrow();
0692 }
0693 }
0694
0695 void TestProcessor::setRunNumber(edm::RunNumber_t iRun) {
0696 if (beginRunCalled_) {
0697 endLuminosityBlock();
0698 endRun();
0699 }
0700 runNumber_ = iRun;
0701 }
0702 void TestProcessor::setLuminosityBlockNumber(edm::LuminosityBlockNumber_t iLumi) {
0703 endLuminosityBlock();
0704 lumiNumber_ = iLumi;
0705 }
0706
0707 void TestProcessor::setEventNumber(edm::EventNumber_t iEv) { eventNumber_ = iEv; }
0708
0709 template <typename Traits>
0710 void TestProcessor::processTransitionForAllStreams(typename Traits::TransitionInfoType& transitionInfo) {
0711 FinalWaitingTask finalWaitTask{taskGroup_};
0712 {
0713 WaitingTaskHolder holder(taskGroup_, &finalWaitTask);
0714
0715 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0716 schedule_->processOneStreamAsync<Traits>(holder, i, transitionInfo, serviceToken_);
0717 }
0718 }
0719 finalWaitTask.wait();
0720 }
0721
0722 template <typename Traits>
0723 void TestProcessor::processGlobalTransition(typename Traits::TransitionInfoType& transitionInfo) {
0724 FinalWaitingTask finalWaitTask{taskGroup_};
0725 schedule_->processOneGlobalAsync<Traits>(
0726 WaitingTaskHolder(taskGroup_, &finalWaitTask), transitionInfo, serviceToken_);
0727 finalWaitTask.wait();
0728 }
0729
0730 }
0731 }