File indexing completed on 2024-10-07 04:59:34
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0017 #include "FWCore/TestProcessor/interface/EventSetupTestHelper.h"
0018
0019 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0020 #include "FWCore/Framework/interface/ScheduleItems.h"
0021 #include "FWCore/Framework/interface/EventPrincipal.h"
0022 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0023 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0024 #include "FWCore/Framework/interface/ExceptionActions.h"
0025 #include "FWCore/Framework/interface/HistoryAppender.h"
0026 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0027 #include "FWCore/Framework/interface/RunPrincipal.h"
0028 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0029 #include "FWCore/Framework/interface/EventSetupsController.h"
0030 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0031 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0032 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0033 #include "FWCore/Framework/interface/ProductPutterBase.h"
0034 #include "FWCore/Framework/interface/DelayedReader.h"
0035 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0036 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0037 #include "FWCore/Framework/interface/FileBlock.h"
0038 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0039
0040 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0041 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0042
0043 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0044 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0045 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0046
0047 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0048
0049 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0050
0051 #include "oneTimeInitialization.h"
0052
0053 #include <mutex>
0054
0055 #define xstr(s) str(s)
0056 #define str(s) #s
0057
0058 namespace edm {
0059 namespace test {
0060
0061
0062
0063
0064 TestProcessor::TestProcessor(Config const& iConfig, ServiceToken iToken)
0065 : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0066 arena_(1),
0067 historyAppender_(std::make_unique<HistoryAppender>()),
0068 moduleRegistry_(std::make_shared<ModuleRegistry>()) {
0069
0070 (void)testprocessor::oneTimeInitialization();
0071
0072 ProcessDescImpl desc(iConfig.pythonConfiguration(), false);
0073
0074 auto psetPtr = desc.parameterSet();
0075 moduleTypeResolverMaker_ = makeModuleTypeResolverMaker(*psetPtr);
0076 espController_ = std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get());
0077
0078 validateTopLevelParameterSets(psetPtr.get());
0079
0080 ensureAvailableAccelerators(*psetPtr);
0081
0082 labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
0083
0084 auto procDesc = desc.processDesc();
0085
0086 ScheduleItems items;
0087
0088
0089 auto& serviceSets = procDesc->getServicesPSets();
0090 ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0091 serviceToken_ = items.addCPRandTNS(*psetPtr, token);
0092
0093
0094 ServiceRegistry::Operate operate(serviceToken_);
0095
0096
0097 std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
0098
0099
0100 esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
0101
0102 auto nThreads = 1U;
0103 auto nStreams = 1U;
0104 auto nConcurrentLumis = 1U;
0105 auto nConcurrentRuns = 1U;
0106 preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0107
0108 if (not iConfig.esProduceEntries().empty()) {
0109 esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
0110 esp_->add(std::dynamic_pointer_cast<eventsetup::ESProductResolverProvider>(esHelper_));
0111 esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
0112 }
0113
0114 preg_ = items.preg();
0115 processConfiguration_ = items.processConfiguration();
0116
0117 edm::ParameterSet emptyPSet;
0118 emptyPSet.registerIt();
0119 auto psetid = emptyPSet.id();
0120
0121 for (auto const& p : iConfig.extraProcesses()) {
0122 processHistory_.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
0123 processHistoryRegistry_.registerProcessHistory(processHistory_);
0124 }
0125
0126
0127 for (auto const& produce : iConfig.produceEntries()) {
0128 auto processName = produce.processName_;
0129 if (processName.empty()) {
0130 processName = processConfiguration_->processName();
0131 }
0132 edm::TypeWithDict twd(produce.type_.typeInfo());
0133 edm::BranchDescription product(edm::InEvent,
0134 produce.moduleLabel_,
0135 processName,
0136 twd.userClassName(),
0137 twd.friendlyClassName(),
0138 produce.instanceLabel_,
0139 "",
0140 psetid,
0141 twd,
0142 true
0143 );
0144 product.init();
0145 dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
0146 preg_->addProduct(product);
0147 }
0148
0149 processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0150
0151 schedule_ = items.initSchedule(
0152 *psetPtr, false, preallocations_, &processContext_, moduleTypeResolverMaker_.get(), *processBlockHelper_);
0153
0154 act_table_ = std::move(items.act_table_);
0155 actReg_ = items.actReg_;
0156 branchIDListHelper_ = items.branchIDListHelper();
0157 thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0158 processContext_.setProcessConfiguration(processConfiguration_.get());
0159
0160 principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0161
0162 preg_->setFrozen();
0163 mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0164
0165 for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0166
0167 auto ep = std::make_shared<EventPrincipal>(preg_,
0168 branchIDListHelper_,
0169 thinnedAssociationsHelper_,
0170 *processConfiguration_,
0171 historyAppender_.get(),
0172 index);
0173 principalCache_.insert(std::move(ep));
0174 }
0175 for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0176 auto rp = std::make_unique<RunPrincipal>(
0177 preg_, *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_);
0178 principalCache_.insert(std::move(rp));
0179 }
0180 for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0181 auto lp =
0182 std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
0183 principalCache_.insert(std::move(lp));
0184 }
0185 {
0186 auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_);
0187 principalCache_.insert(std::move(pb));
0188 }
0189 }
0190
0191 TestProcessor::~TestProcessor() noexcept(false) { teardownProcessing(); }
0192
0193
0194
0195
0196 void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
0197 if (index >= dataProducts_.size()) {
0198 throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
0199 "with a call to the function \'produces\' BEFORE passing the\n"
0200 "TestProcessor::Config object to the TestProcessor constructor";
0201 }
0202 dataProducts_[index].second = std::move(iWrapper);
0203 }
0204
0205 edm::test::Event TestProcessor::testImpl() {
0206 bool result = arena_.execute([this]() {
0207 setupProcessing();
0208 event();
0209
0210 return schedule_->totalEventsPassed() > 0;
0211 });
0212 schedule_->clearCounters();
0213 if (esHelper_) {
0214
0215 esHelper_->resetAllResolvers();
0216 }
0217 return edm::test::Event(
0218 principalCache_.eventPrincipal(0), labelOfTestModule_, processConfiguration_->processName(), result);
0219 }
0220
0221 edm::test::LuminosityBlock TestProcessor::testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum) {
0222 arena_.execute([this, iNum]() {
0223 if (not beginJobCalled_) {
0224 beginJob();
0225 }
0226 if (not respondToOpenInputFileCalled_) {
0227 respondToOpenInputFile();
0228 }
0229 if (not beginProcessBlockCalled_) {
0230 beginProcessBlock();
0231 }
0232 if (not openOutputFilesCalled_) {
0233 openOutputFiles();
0234 }
0235
0236 if (not beginRunCalled_) {
0237 beginRun();
0238 }
0239 if (beginLumiCalled_) {
0240 endLuminosityBlock();
0241 assert(lumiNumber_ != iNum);
0242 }
0243 lumiNumber_ = iNum;
0244 beginLuminosityBlock();
0245 });
0246
0247 if (esHelper_) {
0248
0249 esHelper_->resetAllResolvers();
0250 }
0251 return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0252 }
0253
0254 edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() {
0255
0256
0257 std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
0258 arena_.execute([this, &lumi]() {
0259 if (not beginJobCalled_) {
0260 beginJob();
0261 }
0262 if (not respondToOpenInputFileCalled_) {
0263 respondToOpenInputFile();
0264 }
0265 if (not beginProcessBlockCalled_) {
0266 beginProcessBlock();
0267 }
0268 if (not openOutputFilesCalled_) {
0269 openOutputFiles();
0270 }
0271 if (not beginRunCalled_) {
0272 beginRun();
0273 }
0274 if (not beginLumiCalled_) {
0275 beginLuminosityBlock();
0276 }
0277 lumi = endLuminosityBlock();
0278 });
0279 if (esHelper_) {
0280
0281 esHelper_->resetAllResolvers();
0282 }
0283
0284 return edm::test::LuminosityBlock(std::move(lumi), labelOfTestModule_, processConfiguration_->processName());
0285 }
0286
0287 edm::test::Run TestProcessor::testBeginRunImpl(edm::RunNumber_t iNum) {
0288 arena_.execute([this, iNum]() {
0289 if (not beginJobCalled_) {
0290 beginJob();
0291 }
0292 if (not respondToOpenInputFileCalled_) {
0293 respondToOpenInputFile();
0294 }
0295 if (not beginProcessBlockCalled_) {
0296 beginProcessBlock();
0297 }
0298 if (not openOutputFilesCalled_) {
0299 openOutputFiles();
0300 }
0301 if (beginRunCalled_) {
0302 assert(runNumber_ != iNum);
0303 endRun();
0304 }
0305 runNumber_ = iNum;
0306 beginRun();
0307 });
0308 if (esHelper_) {
0309
0310 esHelper_->resetAllResolvers();
0311 }
0312 return edm::test::Run(runPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0313 }
0314 edm::test::Run TestProcessor::testEndRunImpl() {
0315
0316
0317 std::shared_ptr<edm::RunPrincipal> rp;
0318 arena_.execute([this, &rp]() {
0319 if (not beginJobCalled_) {
0320 beginJob();
0321 }
0322 if (not respondToOpenInputFileCalled_) {
0323 respondToOpenInputFile();
0324 }
0325 if (not beginProcessBlockCalled_) {
0326 beginProcessBlock();
0327 }
0328 if (not openOutputFilesCalled_) {
0329 openOutputFiles();
0330 }
0331 if (not beginRunCalled_) {
0332 beginRun();
0333 }
0334 rp = endRun();
0335 });
0336 if (esHelper_) {
0337
0338 esHelper_->resetAllResolvers();
0339 }
0340
0341 return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
0342 }
0343
0344 edm::test::ProcessBlock TestProcessor::testBeginProcessBlockImpl() {
0345 arena_.execute([this]() {
0346 if (not beginJobCalled_) {
0347 beginJob();
0348 }
0349 beginProcessBlock();
0350 });
0351 return edm::test::ProcessBlock(
0352 &principalCache_.processBlockPrincipal(), labelOfTestModule_, processConfiguration_->processName());
0353 }
0354 edm::test::ProcessBlock TestProcessor::testEndProcessBlockImpl() {
0355 auto pbp = arena_.execute([this]() {
0356 if (not beginJobCalled_) {
0357 beginJob();
0358 }
0359 if (not beginProcessBlockCalled_) {
0360 beginProcessBlock();
0361 }
0362 return endProcessBlock();
0363 });
0364 return edm::test::ProcessBlock(pbp, labelOfTestModule_, processConfiguration_->processName());
0365 }
0366
0367 void TestProcessor::setupProcessing() {
0368 if (not beginJobCalled_) {
0369 beginJob();
0370 }
0371 if (not respondToOpenInputFileCalled_) {
0372 respondToOpenInputFile();
0373 }
0374 if (not beginProcessBlockCalled_) {
0375 beginProcessBlock();
0376 }
0377 if (not openOutputFilesCalled_) {
0378 openOutputFiles();
0379 }
0380 if (not beginRunCalled_) {
0381 beginRun();
0382 }
0383 if (not beginLumiCalled_) {
0384 beginLuminosityBlock();
0385 }
0386 }
0387
0388 void TestProcessor::teardownProcessing() {
0389 arena_.execute([this]() {
0390 if (beginLumiCalled_) {
0391 endLuminosityBlock();
0392 beginLumiCalled_ = false;
0393 }
0394 if (beginRunCalled_) {
0395 endRun();
0396 beginRunCalled_ = false;
0397 }
0398 if (respondToOpenInputFileCalled_) {
0399 respondToCloseInputFile();
0400 }
0401 if (beginProcessBlockCalled_) {
0402 endProcessBlock();
0403 beginProcessBlockCalled_ = false;
0404 }
0405 if (openOutputFilesCalled_) {
0406 closeOutputFiles();
0407 openOutputFilesCalled_ = false;
0408 }
0409 if (beginJobCalled_) {
0410 endJob();
0411 }
0412 edm::FinalWaitingTask task{taskGroup_};
0413 espController_->endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &task));
0414 task.waitNoThrow();
0415 });
0416 }
0417
0418 void TestProcessor::beginJob() {
0419 ServiceRegistry::Operate operate(serviceToken_);
0420
0421 service::SystemBounds bounds(preallocations_.numberOfStreams(),
0422 preallocations_.numberOfLuminosityBlocks(),
0423 preallocations_.numberOfRuns(),
0424 preallocations_.numberOfThreads());
0425 actReg_->preallocateSignal_(bounds);
0426 schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0427 PathsAndConsumesOfModules pathsAndConsumesOfModules;
0428
0429
0430
0431
0432
0433 espController_->finishConfiguration();
0434 actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
0435
0436
0437
0438 schedule_->beginJob(
0439 *preg_, esp_->recordsToResolverIndices(), *processBlockHelper_, pathsAndConsumesOfModules, processContext_);
0440
0441 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0442 schedule_->beginStream(i);
0443 }
0444 beginJobCalled_ = true;
0445 }
0446
0447 void TestProcessor::beginProcessBlock() {
0448 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0449 processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0450
0451 std::vector<edm::SubProcess> emptyList;
0452 {
0453 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0454 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0455 FinalWaitingTask globalWaitTask{taskGroup_};
0456 beginGlobalTransitionAsync<Traits>(
0457 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0458 globalWaitTask.wait();
0459 }
0460 beginProcessBlockCalled_ = true;
0461 }
0462
0463 void TestProcessor::openOutputFiles() {
0464
0465 ServiceRegistry::Operate operate(serviceToken_);
0466
0467 edm::FileBlock fb;
0468 schedule_->openOutputFiles(fb);
0469 openOutputFilesCalled_ = true;
0470 }
0471
0472 void TestProcessor::closeOutputFiles() {
0473 if (openOutputFilesCalled_) {
0474
0475 ServiceRegistry::Operate operate(serviceToken_);
0476 schedule_->closeOutputFiles();
0477
0478 openOutputFilesCalled_ = false;
0479 }
0480 }
0481
0482 void TestProcessor::respondToOpenInputFile() {
0483 respondToOpenInputFileCalled_ = true;
0484 edm::FileBlock fb;
0485
0486 ServiceRegistry::Operate operate(serviceToken_);
0487 schedule_->respondToOpenInputFile(fb);
0488 }
0489
0490 void TestProcessor::respondToCloseInputFile() {
0491 if (respondToOpenInputFileCalled_) {
0492 edm::FileBlock fb;
0493
0494 ServiceRegistry::Operate operate(serviceToken_);
0495
0496 schedule_->respondToCloseInputFile(fb);
0497 respondToOpenInputFileCalled_ = false;
0498 }
0499 }
0500
0501 void TestProcessor::beginRun() {
0502 runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0503 runPrincipal_->clearPrincipal();
0504 assert(runPrincipal_);
0505 edm::RunAuxiliary aux(runNumber_, Timestamp(), Timestamp());
0506 aux.setProcessHistoryID(processHistory_.id());
0507 runPrincipal_->setAux(aux);
0508
0509 runPrincipal_->fillRunPrincipal(processHistoryRegistry_);
0510
0511 IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
0512 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0513
0514 auto const& es = esp_->eventSetupImpl();
0515
0516 RunTransitionInfo transitionInfo(*runPrincipal_, es, nullptr);
0517
0518 std::vector<edm::SubProcess> emptyList;
0519 {
0520 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0521 FinalWaitingTask globalWaitTask{taskGroup_};
0522 beginGlobalTransitionAsync<Traits>(
0523 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0524 globalWaitTask.wait();
0525 }
0526 {
0527
0528 FinalWaitingTask streamLoopWaitTask{taskGroup_};
0529
0530 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0531 beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0532 *schedule_,
0533 preallocations_.numberOfStreams(),
0534 transitionInfo,
0535 serviceToken_,
0536 emptyList);
0537 streamLoopWaitTask.wait();
0538 }
0539 beginRunCalled_ = true;
0540 }
0541
0542 void TestProcessor::beginLuminosityBlock() {
0543 LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp());
0544 aux.setProcessHistoryID(processHistory_.id());
0545 lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0546 lumiPrincipal_->clearPrincipal();
0547 assert(lumiPrincipal_);
0548 lumiPrincipal_->setAux(aux);
0549
0550 lumiPrincipal_->setRunPrincipal(runPrincipal_);
0551 lumiPrincipal_->fillLuminosityBlockPrincipal(&processHistory_);
0552
0553 IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime());
0554 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0555
0556 auto const& es = esp_->eventSetupImpl();
0557
0558 LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
0559
0560 std::vector<edm::SubProcess> emptyList;
0561 {
0562 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0563 FinalWaitingTask globalWaitTask{taskGroup_};
0564 beginGlobalTransitionAsync<Traits>(
0565 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0566 globalWaitTask.wait();
0567 }
0568 {
0569
0570 FinalWaitingTask streamLoopWaitTask{taskGroup_};
0571
0572 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0573
0574 beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0575 *schedule_,
0576 preallocations_.numberOfStreams(),
0577 transitionInfo,
0578 serviceToken_,
0579 emptyList);
0580
0581 streamLoopWaitTask.wait();
0582 }
0583 beginLumiCalled_ = true;
0584 }
0585
0586 void TestProcessor::event() {
0587 auto pep = &(principalCache_.eventPrincipal(0));
0588
0589
0590 pep->clearEventPrincipal();
0591 edm::EventAuxiliary aux(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false);
0592 aux.setProcessHistoryID(processHistory_.id());
0593 pep->fillEventPrincipal(aux, nullptr, nullptr);
0594 assert(lumiPrincipal_.get() != nullptr);
0595 pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
0596
0597 for (auto& p : dataProducts_) {
0598 if (p.second) {
0599 pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
0600 } else {
0601
0602
0603 auto r = pep->getProductResolver(p.first.branchID());
0604 dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
0605 }
0606 }
0607
0608 ServiceRegistry::Operate operate(serviceToken_);
0609
0610 FinalWaitingTask waitTask{taskGroup_};
0611
0612 EventTransitionInfo info(*pep, esp_->eventSetupImpl());
0613 schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
0614
0615 waitTask.wait();
0616 ++eventNumber_;
0617 }
0618
0619 std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
0620 auto lumiPrincipal = lumiPrincipal_;
0621 if (beginLumiCalled_) {
0622
0623 ServiceRegistry::Operate operate(serviceToken_);
0624
0625 beginLumiCalled_ = false;
0626 lumiPrincipal_.reset();
0627
0628 IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
0629 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0630
0631 auto const& es = esp_->eventSetupImpl();
0632
0633 LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
0634
0635 std::vector<edm::SubProcess> emptyList;
0636
0637
0638 {
0639 FinalWaitingTask streamLoopWaitTask{taskGroup_};
0640
0641 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0642
0643 endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0644 *schedule_,
0645 preallocations_.numberOfStreams(),
0646 transitionInfo,
0647 serviceToken_,
0648 emptyList,
0649 false);
0650
0651 streamLoopWaitTask.wait();
0652 }
0653 {
0654 FinalWaitingTask globalWaitTask{taskGroup_};
0655
0656 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0657 endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0658 *schedule_,
0659 transitionInfo,
0660 serviceToken_,
0661 emptyList,
0662 false);
0663 globalWaitTask.wait();
0664 }
0665 {
0666 FinalWaitingTask globalWaitTask{taskGroup_};
0667 schedule_->writeLumiAsync(
0668 WaitingTaskHolder(taskGroup_, &globalWaitTask), *lumiPrincipal, &processContext_, actReg_.get());
0669 globalWaitTask.wait();
0670 }
0671 }
0672 lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
0673 return lumiPrincipal;
0674 }
0675
0676 std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
0677 auto runPrincipal = runPrincipal_;
0678 runPrincipal_.reset();
0679 if (beginRunCalled_) {
0680 beginRunCalled_ = false;
0681
0682
0683 ServiceRegistry::Operate operate(serviceToken_);
0684
0685 IOVSyncValue ts(
0686 EventID(runPrincipal->run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
0687 runPrincipal->endTime());
0688 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0689
0690 auto const& es = esp_->eventSetupImpl();
0691
0692 RunTransitionInfo transitionInfo(*runPrincipal, es);
0693
0694 std::vector<edm::SubProcess> emptyList;
0695
0696
0697 {
0698 FinalWaitingTask streamLoopWaitTask{taskGroup_};
0699
0700 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0701
0702 endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0703 *schedule_,
0704 preallocations_.numberOfStreams(),
0705 transitionInfo,
0706 serviceToken_,
0707 emptyList,
0708 false);
0709
0710 streamLoopWaitTask.wait();
0711 }
0712 {
0713 FinalWaitingTask globalWaitTask{taskGroup_};
0714
0715 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0716 endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0717 *schedule_,
0718 transitionInfo,
0719 serviceToken_,
0720 emptyList,
0721 false);
0722 globalWaitTask.wait();
0723 }
0724 {
0725 FinalWaitingTask globalWaitTask{taskGroup_};
0726 schedule_->writeRunAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0727 *runPrincipal,
0728 &processContext_,
0729 actReg_.get(),
0730 runPrincipal->mergeableRunProductMetadata());
0731 globalWaitTask.wait();
0732 }
0733 }
0734 return runPrincipal;
0735 }
0736
0737 ProcessBlockPrincipal const* TestProcessor::endProcessBlock() {
0738 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0739 if (beginProcessBlockCalled_) {
0740 beginProcessBlockCalled_ = false;
0741
0742 std::vector<edm::SubProcess> emptyList;
0743 {
0744 FinalWaitingTask globalWaitTask{taskGroup_};
0745
0746 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0747 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0748 endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0749 *schedule_,
0750 transitionInfo,
0751 serviceToken_,
0752 emptyList,
0753 false);
0754 globalWaitTask.wait();
0755 }
0756 }
0757 return &processBlockPrincipal;
0758 }
0759
0760 void TestProcessor::endJob() {
0761 if (!beginJobCalled_) {
0762 return;
0763 }
0764 beginJobCalled_ = false;
0765
0766
0767 ExceptionCollector c(
0768 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0769 std::mutex collectorMutex;
0770
0771
0772 ServiceRegistry::Operate operate(serviceToken_);
0773
0774
0775 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0776 schedule_->endStream(i, c, collectorMutex);
0777 }
0778 auto actReg = actReg_.get();
0779 c.call([actReg]() { actReg->preEndJobSignal_(); });
0780 schedule_->endJob(c);
0781 c.call([actReg]() { actReg->postEndJobSignal_(); });
0782 if (c.hasThrown()) {
0783 c.rethrow();
0784 }
0785 }
0786
0787 void TestProcessor::setRunNumber(edm::RunNumber_t iRun) {
0788 if (beginRunCalled_) {
0789 endLuminosityBlock();
0790 endRun();
0791 }
0792 runNumber_ = iRun;
0793 }
0794 void TestProcessor::setLuminosityBlockNumber(edm::LuminosityBlockNumber_t iLumi) {
0795 endLuminosityBlock();
0796 lumiNumber_ = iLumi;
0797 }
0798
0799 void TestProcessor::setEventNumber(edm::EventNumber_t iEv) { eventNumber_ = iEv; }
0800
0801 }
0802 }