File indexing completed on 2024-07-24 04:44:53
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0017 #include "FWCore/TestProcessor/interface/EventSetupTestHelper.h"
0018
0019 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0020 #include "FWCore/Framework/interface/ScheduleItems.h"
0021 #include "FWCore/Framework/interface/EventPrincipal.h"
0022 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0023 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0024 #include "FWCore/Framework/interface/ExceptionActions.h"
0025 #include "FWCore/Framework/interface/HistoryAppender.h"
0026 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0027 #include "FWCore/Framework/interface/RunPrincipal.h"
0028 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0029 #include "FWCore/Framework/interface/EventSetupsController.h"
0030 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0031 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0032 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0033 #include "FWCore/Framework/interface/ProductPutterBase.h"
0034 #include "FWCore/Framework/interface/DelayedReader.h"
0035 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0036 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0037 #include "FWCore/Framework/interface/FileBlock.h"
0038 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0039
0040 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0041 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0042
0043 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0044 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0045 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0046
0047 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0048
0049 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0050
0051 #include "oneTimeInitialization.h"
0052
0053 #define xstr(s) str(s)
0054 #define str(s) #s
0055
0056 namespace edm {
0057 namespace test {
0058
0059
0060
0061
0062 TestProcessor::TestProcessor(Config const& iConfig, ServiceToken iToken)
0063 : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0064 arena_(1),
0065 historyAppender_(std::make_unique<HistoryAppender>()),
0066 moduleRegistry_(std::make_shared<ModuleRegistry>()) {
0067
0068 (void)testprocessor::oneTimeInitialization();
0069
0070 ProcessDescImpl desc(iConfig.pythonConfiguration(), false);
0071
0072 auto psetPtr = desc.parameterSet();
0073 moduleTypeResolverMaker_ = makeModuleTypeResolverMaker(*psetPtr);
0074 espController_ = std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get());
0075
0076 validateTopLevelParameterSets(psetPtr.get());
0077
0078 ensureAvailableAccelerators(*psetPtr);
0079
0080 labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
0081
0082 auto procDesc = desc.processDesc();
0083
0084 ScheduleItems items;
0085
0086
0087 auto& serviceSets = procDesc->getServicesPSets();
0088 ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0089 serviceToken_ = items.addCPRandTNS(*psetPtr, token);
0090
0091
0092 ServiceRegistry::Operate operate(serviceToken_);
0093
0094
0095 std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
0096
0097
0098 esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
0099
0100 auto nThreads = 1U;
0101 auto nStreams = 1U;
0102 auto nConcurrentLumis = 1U;
0103 auto nConcurrentRuns = 1U;
0104 preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0105
0106 if (not iConfig.esProduceEntries().empty()) {
0107 esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
0108 esp_->add(std::dynamic_pointer_cast<eventsetup::ESProductResolverProvider>(esHelper_));
0109 esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
0110 }
0111
0112 preg_ = items.preg();
0113 processConfiguration_ = items.processConfiguration();
0114
0115 edm::ParameterSet emptyPSet;
0116 emptyPSet.registerIt();
0117 auto psetid = emptyPSet.id();
0118
0119 for (auto const& p : iConfig.extraProcesses()) {
0120 processHistory_.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
0121 processHistoryRegistry_.registerProcessHistory(processHistory_);
0122 }
0123
0124
0125 for (auto const& produce : iConfig.produceEntries()) {
0126 auto processName = produce.processName_;
0127 if (processName.empty()) {
0128 processName = processConfiguration_->processName();
0129 }
0130 edm::TypeWithDict twd(produce.type_.typeInfo());
0131 edm::BranchDescription product(edm::InEvent,
0132 produce.moduleLabel_,
0133 processName,
0134 twd.userClassName(),
0135 twd.friendlyClassName(),
0136 produce.instanceLabel_,
0137 "",
0138 psetid,
0139 twd,
0140 true
0141 );
0142 product.init();
0143 dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
0144 preg_->addProduct(product);
0145 }
0146
0147 processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0148
0149 schedule_ = items.initSchedule(
0150 *psetPtr, false, preallocations_, &processContext_, moduleTypeResolverMaker_.get(), *processBlockHelper_);
0151
0152 act_table_ = std::move(items.act_table_);
0153 actReg_ = items.actReg_;
0154 branchIDListHelper_ = items.branchIDListHelper();
0155 thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0156 processContext_.setProcessConfiguration(processConfiguration_.get());
0157
0158 principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0159
0160 preg_->setFrozen();
0161 mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0162
0163 for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0164
0165 auto ep = std::make_shared<EventPrincipal>(preg_,
0166 branchIDListHelper_,
0167 thinnedAssociationsHelper_,
0168 *processConfiguration_,
0169 historyAppender_.get(),
0170 index);
0171 principalCache_.insert(std::move(ep));
0172 }
0173 for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0174 auto rp = std::make_unique<RunPrincipal>(
0175 preg_, *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_);
0176 principalCache_.insert(std::move(rp));
0177 }
0178 for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0179 auto lp =
0180 std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
0181 principalCache_.insert(std::move(lp));
0182 }
0183 {
0184 auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_);
0185 principalCache_.insert(std::move(pb));
0186 }
0187 }
0188
0189 TestProcessor::~TestProcessor() noexcept(false) { teardownProcessing(); }
0190
0191
0192
0193
0194 void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
0195 if (index >= dataProducts_.size()) {
0196 throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
0197 "with a call to the function \'produces\' BEFORE passing the\n"
0198 "TestProcessor::Config object to the TestProcessor constructor";
0199 }
0200 dataProducts_[index].second = std::move(iWrapper);
0201 }
0202
0203 edm::test::Event TestProcessor::testImpl() {
0204 bool result = arena_.execute([this]() {
0205 setupProcessing();
0206 event();
0207
0208 return schedule_->totalEventsPassed() > 0;
0209 });
0210 schedule_->clearCounters();
0211 if (esHelper_) {
0212
0213 esHelper_->resetAllProxies();
0214 }
0215 return edm::test::Event(
0216 principalCache_.eventPrincipal(0), labelOfTestModule_, processConfiguration_->processName(), result);
0217 }
0218
0219 edm::test::LuminosityBlock TestProcessor::testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum) {
0220 arena_.execute([this, iNum]() {
0221 if (not beginJobCalled_) {
0222 beginJob();
0223 }
0224 if (not respondToOpenInputFileCalled_) {
0225 respondToOpenInputFile();
0226 }
0227 if (not beginProcessBlockCalled_) {
0228 beginProcessBlock();
0229 }
0230 if (not openOutputFilesCalled_) {
0231 openOutputFiles();
0232 }
0233
0234 if (not beginRunCalled_) {
0235 beginRun();
0236 }
0237 if (beginLumiCalled_) {
0238 endLuminosityBlock();
0239 assert(lumiNumber_ != iNum);
0240 }
0241 lumiNumber_ = iNum;
0242 beginLuminosityBlock();
0243 });
0244
0245 if (esHelper_) {
0246
0247 esHelper_->resetAllProxies();
0248 }
0249 return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0250 }
0251
0252 edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() {
0253
0254
0255 std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
0256 arena_.execute([this, &lumi]() {
0257 if (not beginJobCalled_) {
0258 beginJob();
0259 }
0260 if (not respondToOpenInputFileCalled_) {
0261 respondToOpenInputFile();
0262 }
0263 if (not beginProcessBlockCalled_) {
0264 beginProcessBlock();
0265 }
0266 if (not openOutputFilesCalled_) {
0267 openOutputFiles();
0268 }
0269 if (not beginRunCalled_) {
0270 beginRun();
0271 }
0272 if (not beginLumiCalled_) {
0273 beginLuminosityBlock();
0274 }
0275 lumi = endLuminosityBlock();
0276 });
0277 if (esHelper_) {
0278
0279 esHelper_->resetAllProxies();
0280 }
0281
0282 return edm::test::LuminosityBlock(std::move(lumi), labelOfTestModule_, processConfiguration_->processName());
0283 }
0284
0285 edm::test::Run TestProcessor::testBeginRunImpl(edm::RunNumber_t iNum) {
0286 arena_.execute([this, iNum]() {
0287 if (not beginJobCalled_) {
0288 beginJob();
0289 }
0290 if (not respondToOpenInputFileCalled_) {
0291 respondToOpenInputFile();
0292 }
0293 if (not beginProcessBlockCalled_) {
0294 beginProcessBlock();
0295 }
0296 if (not openOutputFilesCalled_) {
0297 openOutputFiles();
0298 }
0299 if (beginRunCalled_) {
0300 assert(runNumber_ != iNum);
0301 endRun();
0302 }
0303 runNumber_ = iNum;
0304 beginRun();
0305 });
0306 if (esHelper_) {
0307
0308 esHelper_->resetAllProxies();
0309 }
0310 return edm::test::Run(runPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0311 }
0312 edm::test::Run TestProcessor::testEndRunImpl() {
0313
0314
0315 std::shared_ptr<edm::RunPrincipal> rp;
0316 arena_.execute([this, &rp]() {
0317 if (not beginJobCalled_) {
0318 beginJob();
0319 }
0320 if (not respondToOpenInputFileCalled_) {
0321 respondToOpenInputFile();
0322 }
0323 if (not beginProcessBlockCalled_) {
0324 beginProcessBlock();
0325 }
0326 if (not openOutputFilesCalled_) {
0327 openOutputFiles();
0328 }
0329 if (not beginRunCalled_) {
0330 beginRun();
0331 }
0332 rp = endRun();
0333 });
0334 if (esHelper_) {
0335
0336 esHelper_->resetAllProxies();
0337 }
0338
0339 return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
0340 }
0341
0342 edm::test::ProcessBlock TestProcessor::testBeginProcessBlockImpl() {
0343 arena_.execute([this]() {
0344 if (not beginJobCalled_) {
0345 beginJob();
0346 }
0347 beginProcessBlock();
0348 });
0349 return edm::test::ProcessBlock(
0350 &principalCache_.processBlockPrincipal(), labelOfTestModule_, processConfiguration_->processName());
0351 }
0352 edm::test::ProcessBlock TestProcessor::testEndProcessBlockImpl() {
0353 auto pbp = arena_.execute([this]() {
0354 if (not beginJobCalled_) {
0355 beginJob();
0356 }
0357 if (not beginProcessBlockCalled_) {
0358 beginProcessBlock();
0359 }
0360 return endProcessBlock();
0361 });
0362 return edm::test::ProcessBlock(pbp, labelOfTestModule_, processConfiguration_->processName());
0363 }
0364
0365 void TestProcessor::setupProcessing() {
0366 if (not beginJobCalled_) {
0367 beginJob();
0368 }
0369 if (not respondToOpenInputFileCalled_) {
0370 respondToOpenInputFile();
0371 }
0372 if (not beginProcessBlockCalled_) {
0373 beginProcessBlock();
0374 }
0375 if (not openOutputFilesCalled_) {
0376 openOutputFiles();
0377 }
0378 if (not beginRunCalled_) {
0379 beginRun();
0380 }
0381 if (not beginLumiCalled_) {
0382 beginLuminosityBlock();
0383 }
0384 }
0385
0386 void TestProcessor::teardownProcessing() {
0387 arena_.execute([this]() {
0388 if (beginLumiCalled_) {
0389 endLuminosityBlock();
0390 beginLumiCalled_ = false;
0391 }
0392 if (beginRunCalled_) {
0393 endRun();
0394 beginRunCalled_ = false;
0395 }
0396 if (respondToOpenInputFileCalled_) {
0397 respondToCloseInputFile();
0398 }
0399 if (beginProcessBlockCalled_) {
0400 endProcessBlock();
0401 beginProcessBlockCalled_ = false;
0402 }
0403 if (openOutputFilesCalled_) {
0404 closeOutputFiles();
0405 openOutputFilesCalled_ = false;
0406 }
0407 if (beginJobCalled_) {
0408 endJob();
0409 }
0410 edm::FinalWaitingTask task{taskGroup_};
0411 espController_->endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &task));
0412 task.waitNoThrow();
0413 });
0414 }
0415
0416 void TestProcessor::beginJob() {
0417 ServiceRegistry::Operate operate(serviceToken_);
0418
0419 service::SystemBounds bounds(preallocations_.numberOfStreams(),
0420 preallocations_.numberOfLuminosityBlocks(),
0421 preallocations_.numberOfRuns(),
0422 preallocations_.numberOfThreads());
0423 actReg_->preallocateSignal_(bounds);
0424 schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0425 PathsAndConsumesOfModules pathsAndConsumesOfModules;
0426
0427
0428
0429
0430
0431 espController_->finishConfiguration();
0432 actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
0433
0434
0435 actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext_);
0436
0437 schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
0438 actReg_->postBeginJobSignal_();
0439
0440 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0441 schedule_->beginStream(i);
0442 }
0443 beginJobCalled_ = true;
0444 }
0445
0446 void TestProcessor::beginProcessBlock() {
0447 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0448 processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0449
0450 std::vector<edm::SubProcess> emptyList;
0451 {
0452 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0453 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0454 FinalWaitingTask globalWaitTask{taskGroup_};
0455 beginGlobalTransitionAsync<Traits>(
0456 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0457 globalWaitTask.wait();
0458 }
0459 beginProcessBlockCalled_ = true;
0460 }
0461
0462 void TestProcessor::openOutputFiles() {
0463
0464 ServiceRegistry::Operate operate(serviceToken_);
0465
0466 edm::FileBlock fb;
0467 schedule_->openOutputFiles(fb);
0468 openOutputFilesCalled_ = true;
0469 }
0470
0471 void TestProcessor::closeOutputFiles() {
0472 if (openOutputFilesCalled_) {
0473
0474 ServiceRegistry::Operate operate(serviceToken_);
0475 schedule_->closeOutputFiles();
0476
0477 openOutputFilesCalled_ = false;
0478 }
0479 }
0480
0481 void TestProcessor::respondToOpenInputFile() {
0482 respondToOpenInputFileCalled_ = true;
0483 edm::FileBlock fb;
0484
0485 ServiceRegistry::Operate operate(serviceToken_);
0486 schedule_->respondToOpenInputFile(fb);
0487 }
0488
0489 void TestProcessor::respondToCloseInputFile() {
0490 if (respondToOpenInputFileCalled_) {
0491 edm::FileBlock fb;
0492
0493 ServiceRegistry::Operate operate(serviceToken_);
0494
0495 schedule_->respondToCloseInputFile(fb);
0496 respondToOpenInputFileCalled_ = false;
0497 }
0498 }
0499
0500 void TestProcessor::beginRun() {
0501 runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0502 runPrincipal_->clearPrincipal();
0503 assert(runPrincipal_);
0504 edm::RunAuxiliary aux(runNumber_, Timestamp(), Timestamp());
0505 aux.setProcessHistoryID(processHistory_.id());
0506 runPrincipal_->setAux(aux);
0507
0508 runPrincipal_->fillRunPrincipal(processHistoryRegistry_);
0509
0510 IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
0511 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0512
0513 auto const& es = esp_->eventSetupImpl();
0514
0515 RunTransitionInfo transitionInfo(*runPrincipal_, es, nullptr);
0516
0517 std::vector<edm::SubProcess> emptyList;
0518 {
0519 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0520 FinalWaitingTask globalWaitTask{taskGroup_};
0521 beginGlobalTransitionAsync<Traits>(
0522 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0523 globalWaitTask.wait();
0524 }
0525 {
0526
0527 FinalWaitingTask streamLoopWaitTask{taskGroup_};
0528
0529 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0530 beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0531 *schedule_,
0532 preallocations_.numberOfStreams(),
0533 transitionInfo,
0534 serviceToken_,
0535 emptyList);
0536 streamLoopWaitTask.wait();
0537 }
0538 beginRunCalled_ = true;
0539 }
0540
0541 void TestProcessor::beginLuminosityBlock() {
0542 LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp());
0543 aux.setProcessHistoryID(processHistory_.id());
0544 lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0545 lumiPrincipal_->clearPrincipal();
0546 assert(lumiPrincipal_);
0547 lumiPrincipal_->setAux(aux);
0548
0549 lumiPrincipal_->setRunPrincipal(runPrincipal_);
0550 lumiPrincipal_->fillLuminosityBlockPrincipal(&processHistory_);
0551
0552 IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime());
0553 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0554
0555 auto const& es = esp_->eventSetupImpl();
0556
0557 LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
0558
0559 std::vector<edm::SubProcess> emptyList;
0560 {
0561 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0562 FinalWaitingTask globalWaitTask{taskGroup_};
0563 beginGlobalTransitionAsync<Traits>(
0564 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0565 globalWaitTask.wait();
0566 }
0567 {
0568
0569 FinalWaitingTask streamLoopWaitTask{taskGroup_};
0570
0571 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0572
0573 beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0574 *schedule_,
0575 preallocations_.numberOfStreams(),
0576 transitionInfo,
0577 serviceToken_,
0578 emptyList);
0579
0580 streamLoopWaitTask.wait();
0581 }
0582 beginLumiCalled_ = true;
0583 }
0584
0585 void TestProcessor::event() {
0586 auto pep = &(principalCache_.eventPrincipal(0));
0587
0588
0589 pep->clearEventPrincipal();
0590 edm::EventAuxiliary aux(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false);
0591 aux.setProcessHistoryID(processHistory_.id());
0592 pep->fillEventPrincipal(aux, nullptr, nullptr);
0593 assert(lumiPrincipal_.get() != nullptr);
0594 pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
0595
0596 for (auto& p : dataProducts_) {
0597 if (p.second) {
0598 pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
0599 } else {
0600
0601
0602 auto r = pep->getProductResolver(p.first.branchID());
0603 dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
0604 }
0605 }
0606
0607 ServiceRegistry::Operate operate(serviceToken_);
0608
0609 FinalWaitingTask waitTask{taskGroup_};
0610
0611 EventTransitionInfo info(*pep, esp_->eventSetupImpl());
0612 schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
0613
0614 waitTask.wait();
0615 ++eventNumber_;
0616 }
0617
0618 std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
0619 auto lumiPrincipal = lumiPrincipal_;
0620 if (beginLumiCalled_) {
0621
0622 ServiceRegistry::Operate operate(serviceToken_);
0623
0624 beginLumiCalled_ = false;
0625 lumiPrincipal_.reset();
0626
0627 IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
0628 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0629
0630 auto const& es = esp_->eventSetupImpl();
0631
0632 LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
0633
0634 std::vector<edm::SubProcess> emptyList;
0635
0636
0637 {
0638 FinalWaitingTask streamLoopWaitTask{taskGroup_};
0639
0640 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0641
0642 endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0643 *schedule_,
0644 preallocations_.numberOfStreams(),
0645 transitionInfo,
0646 serviceToken_,
0647 emptyList,
0648 false);
0649
0650 streamLoopWaitTask.wait();
0651 }
0652 {
0653 FinalWaitingTask globalWaitTask{taskGroup_};
0654
0655 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0656 endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0657 *schedule_,
0658 transitionInfo,
0659 serviceToken_,
0660 emptyList,
0661 false);
0662 globalWaitTask.wait();
0663 }
0664 {
0665 FinalWaitingTask globalWaitTask{taskGroup_};
0666 schedule_->writeLumiAsync(
0667 WaitingTaskHolder(taskGroup_, &globalWaitTask), *lumiPrincipal, &processContext_, actReg_.get());
0668 globalWaitTask.wait();
0669 }
0670 }
0671 lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
0672 return lumiPrincipal;
0673 }
0674
0675 std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
0676 auto runPrincipal = runPrincipal_;
0677 runPrincipal_.reset();
0678 if (beginRunCalled_) {
0679 beginRunCalled_ = false;
0680
0681
0682 ServiceRegistry::Operate operate(serviceToken_);
0683
0684 IOVSyncValue ts(
0685 EventID(runPrincipal->run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
0686 runPrincipal->endTime());
0687 eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0688
0689 auto const& es = esp_->eventSetupImpl();
0690
0691 RunTransitionInfo transitionInfo(*runPrincipal, es);
0692
0693 std::vector<edm::SubProcess> emptyList;
0694
0695
0696 {
0697 FinalWaitingTask streamLoopWaitTask{taskGroup_};
0698
0699 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0700
0701 endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0702 *schedule_,
0703 preallocations_.numberOfStreams(),
0704 transitionInfo,
0705 serviceToken_,
0706 emptyList,
0707 false);
0708
0709 streamLoopWaitTask.wait();
0710 }
0711 {
0712 FinalWaitingTask globalWaitTask{taskGroup_};
0713
0714 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0715 endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0716 *schedule_,
0717 transitionInfo,
0718 serviceToken_,
0719 emptyList,
0720 false);
0721 globalWaitTask.wait();
0722 }
0723 {
0724 FinalWaitingTask globalWaitTask{taskGroup_};
0725 schedule_->writeRunAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0726 *runPrincipal,
0727 &processContext_,
0728 actReg_.get(),
0729 runPrincipal->mergeableRunProductMetadata());
0730 globalWaitTask.wait();
0731 }
0732 }
0733 return runPrincipal;
0734 }
0735
0736 ProcessBlockPrincipal const* TestProcessor::endProcessBlock() {
0737 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0738 if (beginProcessBlockCalled_) {
0739 beginProcessBlockCalled_ = false;
0740
0741 std::vector<edm::SubProcess> emptyList;
0742 {
0743 FinalWaitingTask globalWaitTask{taskGroup_};
0744
0745 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0746 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0747 endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0748 *schedule_,
0749 transitionInfo,
0750 serviceToken_,
0751 emptyList,
0752 false);
0753 globalWaitTask.wait();
0754 }
0755 }
0756 return &processBlockPrincipal;
0757 }
0758
0759 void TestProcessor::endJob() {
0760 if (!beginJobCalled_) {
0761 return;
0762 }
0763 beginJobCalled_ = false;
0764
0765
0766 ExceptionCollector c(
0767 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0768
0769
0770 ServiceRegistry::Operate operate(serviceToken_);
0771
0772
0773 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0774 c.call([this, i]() { this->schedule_->endStream(i); });
0775 }
0776 auto actReg = actReg_.get();
0777 c.call([actReg]() { actReg->preEndJobSignal_(); });
0778 schedule_->endJob(c);
0779 c.call([actReg]() { actReg->postEndJobSignal_(); });
0780 if (c.hasThrown()) {
0781 c.rethrow();
0782 }
0783 }
0784
0785 void TestProcessor::setRunNumber(edm::RunNumber_t iRun) {
0786 if (beginRunCalled_) {
0787 endLuminosityBlock();
0788 endRun();
0789 }
0790 runNumber_ = iRun;
0791 }
0792 void TestProcessor::setLuminosityBlockNumber(edm::LuminosityBlockNumber_t iLumi) {
0793 endLuminosityBlock();
0794 lumiNumber_ = iLumi;
0795 }
0796
0797 void TestProcessor::setEventNumber(edm::EventNumber_t iEv) { eventNumber_ = iEv; }
0798
0799 }
0800 }