File indexing completed on 2025-06-29 22:58:03
0001 #include "FWCore/Framework/interface/EventProcessor.h"
0002 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0003 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0004 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0005 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0007
0008 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0009 #include "FWCore/Framework/src/CommonParams.h"
0010 #include "FWCore/Framework/interface/EDLooperBase.h"
0011 #include "FWCore/Framework/interface/EventPrincipal.h"
0012 #include "FWCore/Framework/interface/EventSetupProvider.h"
0013 #include "FWCore/Framework/interface/EventSetupRecord.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/Framework/interface/HistoryAppender.h"
0016 #include "FWCore/Framework/interface/InputSourceDescription.h"
0017 #include "FWCore/Framework/interface/IOVSyncValue.h"
0018 #include "FWCore/Framework/interface/LooperFactory.h"
0019 #include "FWCore/Framework/interface/LuminosityBlock.h"
0020 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0021 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0022 #include "FWCore/Framework/interface/ModuleChanger.h"
0023 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0024 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0025 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0026 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0027 #include "FWCore/Framework/interface/ProcessingController.h"
0028 #include "FWCore/Framework/interface/RunPrincipal.h"
0029 #include "FWCore/Framework/interface/Schedule.h"
0030 #include "FWCore/Framework/interface/ScheduleInfo.h"
0031 #include "FWCore/Framework/interface/ScheduleItems.h"
0032 #include "FWCore/Framework/interface/Event.h"
0033 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0034 #include "FWCore/Framework/src/Breakpoints.h"
0035 #include "FWCore/Framework/interface/EventSetupsController.h"
0036 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0037 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0038 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0039 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0040 #include "FWCore/Framework/interface/TriggerNamesService.h"
0041 #include "FWCore/Framework/src/SendSourceTerminationSignalIfException.h"
0042 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0043
0044 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0045
0046 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0047 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
0048 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0049 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0050 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0051 #include "FWCore/ParameterSet/interface/Registry.h"
0052 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0053
0054 #include "FWCore/AbstractServices/interface/RandomNumberGenerator.h"
0055 #include "FWCore/AbstractServices/interface/RootHandlers.h"
0056
0057 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0058 #include "FWCore/ServiceRegistry/interface/Service.h"
0059 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0060 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0061
0062 #include "FWCore/Concurrency/interface/WaitingTask.h"
0063 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0064 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0065 #include "FWCore/Concurrency/interface/chain_first.h"
0066
0067 #include "FWCore/Utilities/interface/Algorithms.h"
0068 #include "FWCore/Utilities/interface/DebugMacros.h"
0069 #include "FWCore/Utilities/interface/EDMException.h"
0070 #include "FWCore/Utilities/interface/Exception.h"
0071 #include "FWCore/Utilities/interface/ConvertException.h"
0072 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0073 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0074 #include "FWCore/Utilities/interface/StreamID.h"
0075 #include "FWCore/Utilities/interface/propagate_const.h"
0076 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0077
0078 #include "MessageForSource.h"
0079 #include "MessageForParent.h"
0080 #include "LuminosityBlockProcessingStatus.h"
0081 #include "RunProcessingStatus.h"
0082
0083 #include "boost/range/adaptor/reversed.hpp"
0084
0085 #include <cassert>
0086 #include <exception>
0087 #include <iomanip>
0088 #include <iostream>
0089 #include <utility>
0090 #include <sstream>
0091
0092 #include <sys/ipc.h>
0093 #include <sys/msg.h>
0094
0095 #include "oneapi/tbb/task.h"
0096 #include "oneapi/tbb/task_arena.h"
0097
0098
0099 #ifndef __APPLE__
0100 #include <sched.h>
0101 #endif
0102
0103 namespace {
0104 class PauseQueueSentry {
0105 public:
0106 PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
0107 ~PauseQueueSentry() { queue_.resume(); }
0108
0109 private:
0110 edm::SerialTaskQueue& queue_;
0111 };
0112 }
0113
0114 namespace edm {
0115
0116 namespace chain = waiting_task::chain;
0117
0118
0119 std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
0120 ParameterSet& params,
0121 CommonParams const& common,
0122 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0123 std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0124 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0125 std::shared_ptr<ActivityRegistry> areg,
0126 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0127 PreallocationConfiguration const& allocations) {
0128 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0129 if (main_input == nullptr) {
0130 throw Exception(errors::Configuration)
0131 << "There must be exactly one source in the configuration.\n"
0132 << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0133 }
0134
0135 std::string modtype(main_input->getParameter<std::string>("@module_type"));
0136
0137 std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0138 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0139 ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0140 filler->fill(descriptions);
0141
0142 try {
0143 convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
0144 } catch (cms::Exception& iException) {
0145 std::ostringstream ost;
0146 ost << "Validating configuration of input source of type " << modtype;
0147 iException.addContext(ost.str());
0148 throw;
0149 }
0150
0151 main_input->registerIt();
0152
0153
0154
0155
0156
0157
0158
0159 ModuleDescription md(main_input->id(),
0160 main_input->getParameter<std::string>("@module_type"),
0161 "source",
0162 processConfiguration.get(),
0163 moduleIndex);
0164
0165 InputSourceDescription isdesc(md,
0166 branchIDListHelper,
0167 processBlockHelper,
0168 thinnedAssociationsHelper,
0169 areg,
0170 common.maxEventsInput_,
0171 common.maxLumisInput_,
0172 common.maxSecondsUntilRampdown_,
0173 allocations);
0174
0175 areg->preSourceConstructionSignal_(md);
0176 std::unique_ptr<InputSource> input;
0177 try {
0178
0179 std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
0180 convertException::wrap([&]() {
0181 input = InputSourceFactory::get()->makeInputSource(*main_input, isdesc);
0182 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
0183 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
0184 });
0185 } catch (cms::Exception& iException) {
0186 std::ostringstream ost;
0187 ost << "Constructing input source of type " << modtype;
0188 iException.addContext(ost.str());
0189 throw;
0190 }
0191 return input;
0192 }
0193
0194
0195 std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
0196 eventsetup::EventSetupProvider& cp,
0197 ParameterSet& params,
0198 std::vector<std::string> const& loopers) {
0199 std::shared_ptr<EDLooperBase> vLooper;
0200
0201 assert(1 == loopers.size());
0202
0203 for (auto const& looperName : loopers) {
0204 ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
0205
0206 vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
0207 }
0208 return vLooper;
0209 }
0210
0211
0212 EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0213 ServiceToken const& iToken,
0214 serviceregistry::ServiceLegacy iLegacy,
0215 std::vector<std::string> const& defaultServices,
0216 std::vector<std::string> const& forcedServices)
0217 : actReg_(),
0218 preg_(),
0219 branchIDListHelper_(),
0220 serviceToken_(),
0221 input_(),
0222 moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0223 espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0224 esp_(),
0225 act_table_(),
0226 processConfiguration_(),
0227 schedule_(),
0228 historyAppender_(new HistoryAppender),
0229 fb_(),
0230 looper_(),
0231 deferredExceptionPtrIsSet_(false),
0232 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0233 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0234 principalCache_(),
0235 beginJobCalled_(false),
0236 shouldWeStop_(false),
0237 fileModeNoMerge_(false),
0238 exceptionMessageFiles_(),
0239 exceptionMessageRuns_(false),
0240 exceptionMessageLumis_(false),
0241 forceLooperToEnd_(false),
0242 looperBeginJobRun_(false),
0243 forceESCacheClearOnNewRun_(false),
0244 eventSetupDataToExcludeFromPrefetching_() {
0245 auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0246 processDesc->addServices(defaultServices, forcedServices);
0247 init(processDesc, iToken, iLegacy);
0248 }
0249
0250 EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0251 std::vector<std::string> const& defaultServices,
0252 std::vector<std::string> const& forcedServices)
0253 : actReg_(),
0254 preg_(),
0255 branchIDListHelper_(),
0256 serviceToken_(),
0257 input_(),
0258 moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0259 espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0260 esp_(),
0261 act_table_(),
0262 processConfiguration_(),
0263 schedule_(),
0264 historyAppender_(new HistoryAppender),
0265 fb_(),
0266 looper_(),
0267 deferredExceptionPtrIsSet_(false),
0268 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0269 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0270 principalCache_(),
0271 beginJobCalled_(false),
0272 shouldWeStop_(false),
0273 fileModeNoMerge_(false),
0274 exceptionMessageFiles_(),
0275 exceptionMessageRuns_(false),
0276 exceptionMessageLumis_(false),
0277 forceLooperToEnd_(false),
0278 looperBeginJobRun_(false),
0279 forceESCacheClearOnNewRun_(false),
0280 eventSetupDataToExcludeFromPrefetching_() {
0281 auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0282 processDesc->addServices(defaultServices, forcedServices);
0283 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
0284 }
0285
0286 EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0287 ServiceToken const& token,
0288 serviceregistry::ServiceLegacy legacy)
0289 : actReg_(),
0290 preg_(),
0291 branchIDListHelper_(),
0292 serviceToken_(),
0293 input_(),
0294 moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
0295 espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0296 esp_(),
0297 act_table_(),
0298 processConfiguration_(),
0299 schedule_(),
0300 historyAppender_(new HistoryAppender),
0301 fb_(),
0302 looper_(),
0303 deferredExceptionPtrIsSet_(false),
0304 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0305 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0306 principalCache_(),
0307 beginJobCalled_(false),
0308 shouldWeStop_(false),
0309 fileModeNoMerge_(false),
0310 exceptionMessageFiles_(),
0311 exceptionMessageRuns_(false),
0312 exceptionMessageLumis_(false),
0313 forceLooperToEnd_(false),
0314 looperBeginJobRun_(false),
0315 forceESCacheClearOnNewRun_(false),
0316 eventSetupDataToExcludeFromPrefetching_() {
0317 init(processDesc, token, legacy);
0318 }
0319
0320 void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
0321 ServiceToken const& iToken,
0322 serviceregistry::ServiceLegacy iLegacy) {
0323
0324
0325
0326 ParentageRegistry::instance()->insertMapped(Parentage());
0327
0328
0329 ParameterSet().registerIt();
0330
0331 std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
0332
0333
0334
0335
0336 validateTopLevelParameterSets(parameterSet.get());
0337
0338
0339 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
0340 auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
0341 if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
0342 throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
0343 << fileMode << ".\n"
0344 << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
0345 } else {
0346 fileModeNoMerge_ = (fileMode == "NOMERGE");
0347 }
0348 forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
0349 ensureAvailableAccelerators(*parameterSet);
0350
0351
0352 unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
0353
0354
0355
0356 assert(nThreads != 0);
0357
0358 unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
0359 if (nStreams == 0) {
0360 nStreams = nThreads;
0361 }
0362 unsigned int nConcurrentLumis =
0363 optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
0364 if (nConcurrentLumis == 0) {
0365 nConcurrentLumis = 2;
0366 }
0367 if (nConcurrentLumis > nStreams) {
0368 nConcurrentLumis = nStreams;
0369 }
0370 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
0371 if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
0372 nConcurrentRuns = nConcurrentLumis;
0373 }
0374 std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
0375 if (!loopers.empty()) {
0376
0377 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
0378 edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
0379 "of concurrent runs, and the number of concurrent lumis "
0380 "are all being reset to 1. Loopers cannot currently support "
0381 "values greater than 1.";
0382 nStreams = 1;
0383 nConcurrentLumis = 1;
0384 nConcurrentRuns = 1;
0385 }
0386 }
0387 bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
0388 if (dumpOptions) {
0389 dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
0390 } else {
0391 if (nThreads > 1 or nStreams > 1) {
0392 edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
0393 }
0394 }
0395
0396
0397
0398
0399
0400
0401 unsigned int maxConcurrentIOVs =
0402 3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
0403
0404 IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
0405
0406 printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
0407 deleteNonConsumedUnscheduledModules_ =
0408 optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
0409
0410 branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
0411
0412 if (not branchesToDeleteEarly_.empty()) {
0413 auto referencePSets =
0414 optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
0415 for (auto const& pset : referencePSets) {
0416 auto product = pset.getParameter<std::string>("product");
0417 auto references = pset.getParameter<std::vector<std::string>>("references");
0418 for (auto const& ref : references) {
0419 referencesToBranches_.emplace(product, ref);
0420 }
0421 }
0422 modulesToIgnoreForDeleteEarly_ =
0423 optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
0424 }
0425
0426
0427 ScheduleItems items;
0428
0429
0430 auto& serviceSets = processDesc->getServicesPSets();
0431 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy);
0432 serviceToken_ = items.addTNS(*parameterSet, token);
0433
0434
0435 ServiceRegistry::Operate operate(serviceToken_);
0436
0437 CMS_SA_ALLOW try {
0438 if (nThreads > 1) {
0439 edm::Service<RootHandlers> handler;
0440 handler->willBeUsingThreads();
0441 }
0442
0443
0444 std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
0445
0446
0447 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
0448 esp_ = espController_->makeProvider(
0449 *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
0450
0451
0452 if (!loopers.empty()) {
0453 looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
0454 looper_->setActionTable(items.act_table_.get());
0455 looper_->attachTo(*items.actReg_);
0456
0457
0458 deleteNonConsumedUnscheduledModules_ = false;
0459 }
0460
0461 preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0462
0463 runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
0464 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
0465 streamQueues_.resize(nStreams);
0466 streamRunStatus_.resize(nStreams);
0467 streamLumiStatus_.resize(nStreams);
0468
0469 processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0470
0471 {
0472 std::optional<ScheduleItems::MadeModules> madeModules;
0473
0474
0475 tbb::task_group group;
0476
0477
0478 auto sourceID = ModuleDescription::getUniqueID();
0479
0480 group.run([&, this]() {
0481
0482 ServiceRegistry::Operate operate(serviceToken_);
0483 auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0484 madeModules =
0485 items.initModules(*parameterSet, tns, preallocations_, &processContext_, moduleTypeResolverMaker_.get());
0486 });
0487
0488 group.run([&, this]() {
0489 ServiceRegistry::Operate operate(serviceToken_);
0490 input_ = makeInput(sourceID,
0491 *parameterSet,
0492 *common,
0493 items.branchIDListHelper(),
0494 get_underlying_safe(processBlockHelper_),
0495 items.thinnedAssociationsHelper(),
0496 items.actReg_,
0497 items.processConfiguration(),
0498 preallocations_);
0499 });
0500
0501 group.wait();
0502 items.preg()->addFromInput(input_->productRegistry());
0503 {
0504 auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0505 schedule_ = items.finishSchedule(
0506 std::move(*madeModules), *parameterSet, tns, preallocations_, &processContext_, *processBlockHelper_);
0507 }
0508 }
0509
0510
0511 act_table_ = std::move(items.act_table_);
0512 actReg_ = items.actReg_;
0513 preg_ = std::make_shared<ProductRegistry>(items.preg()->moveTo());
0514 mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0515 branchIDListHelper_ = items.branchIDListHelper();
0516 thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0517 processConfiguration_ = items.processConfiguration();
0518 processContext_.setProcessConfiguration(processConfiguration_.get());
0519
0520 {
0521 edm::Service<edm::JobReport> jr;
0522 if (jr.isAvailable()) {
0523 ProcessConfiguration reduced = *processConfiguration_;
0524 reduced.reduce();
0525 jr->reportProcess(reduced.processName(), reduced.id(), reduced.parameterSetID());
0526 }
0527 }
0528
0529 FDEBUG(2) << parameterSet << std::endl;
0530
0531 principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0532 for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0533
0534 auto ep = std::make_shared<EventPrincipal>(preg(),
0535 productResolversFactory::makePrimary,
0536 branchIDListHelper(),
0537 thinnedAssociationsHelper(),
0538 *processConfiguration_,
0539 historyAppender_.get(),
0540 index,
0541 &*processBlockHelper_);
0542 principalCache_.insert(std::move(ep));
0543 }
0544
0545 for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0546 auto rp = std::make_unique<RunPrincipal>(preg(),
0547 productResolversFactory::makePrimary,
0548 *processConfiguration_,
0549 historyAppender_.get(),
0550 index,
0551 &mergeableRunProductProcesses_);
0552 principalCache_.insert(std::move(rp));
0553 }
0554
0555 for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0556 auto lp = std::make_unique<LuminosityBlockPrincipal>(
0557 preg(), productResolversFactory::makePrimary, *processConfiguration_, historyAppender_.get(), index);
0558 principalCache_.insert(std::move(lp));
0559 }
0560
0561 {
0562 auto pb = std::make_unique<ProcessBlockPrincipal>(
0563 preg(), productResolversFactory::makePrimary, *processConfiguration_);
0564 principalCache_.insert(std::move(pb));
0565
0566 auto pbForInput = std::make_unique<ProcessBlockPrincipal>(
0567 preg(), productResolversFactory::makePrimary, *processConfiguration_);
0568 principalCache_.insertForInput(std::move(pbForInput));
0569 }
0570 } catch (...) {
0571
0572
0573 espController_ = nullptr;
0574 esp_ = nullptr;
0575 schedule_ = nullptr;
0576 input_ = nullptr;
0577 looper_ = nullptr;
0578 actReg_ = nullptr;
0579 throw;
0580 }
0581 }
0582
0583 EventProcessor::~EventProcessor() {
0584
0585 ServiceToken token = getToken();
0586 ServiceRegistry::Operate op(token);
0587
0588
0589
0590 espController_ = nullptr;
0591 esp_ = nullptr;
0592 schedule_ = nullptr;
0593 input_ = nullptr;
0594 looper_ = nullptr;
0595 actReg_ = nullptr;
0596
0597 pset::Registry::instance()->clear();
0598 ParentageRegistry::instance()->clear();
0599 }
0600
0601 void EventProcessor::taskCleanup() {
0602 edm::FinalWaitingTask task{taskGroup_};
0603 espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
0604 task.waitNoThrow();
0605 assert(task.done());
0606 }
0607
0608 void EventProcessor::beginJob() {
0609 if (beginJobCalled_)
0610 return;
0611 beginJobCalled_ = true;
0612 bk::beginJob();
0613
0614 ServiceRegistry::Operate operate(serviceToken_);
0615
0616 service::SystemBounds bounds(preallocations_.numberOfStreams(),
0617 preallocations_.numberOfLuminosityBlocks(),
0618 preallocations_.numberOfRuns(),
0619 preallocations_.numberOfThreads());
0620 actReg_->preallocateSignal_(bounds);
0621 schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0622
0623 PathsAndConsumesOfModules pathsAndConsumesOfModules;
0624 pathsAndConsumesOfModules.initialize(schedule_.get(), preg());
0625
0626
0627 checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, printDependencies_);
0628 if (deleteNonConsumedUnscheduledModules_) {
0629 if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules);
0630 not unusedModules.empty()) {
0631 pathsAndConsumesOfModules.removeModules(unusedModules);
0632
0633 edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
0634 l << "The following modules are not in any Path or EndPath, nor is their output consumed by any other "
0635 "module, "
0636 "and therefore they are deleted before the beginJob transition.";
0637 for (auto const& description : unusedModules) {
0638 l << "\n " << description->moduleLabel();
0639 }
0640 });
0641 for (auto const& description : unusedModules) {
0642 schedule_->deleteModule(description->moduleLabel(), actReg_.get());
0643 }
0644 }
0645 }
0646
0647
0648
0649 if (not branchesToDeleteEarly_.empty()) {
0650 auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
0651 auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
0652 auto referencesToBranches = std::move(referencesToBranches_);
0653 schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
0654 }
0655
0656 if (preallocations_.numberOfLuminosityBlocks() > 1) {
0657 throwAboutModulesRequiringLuminosityBlockSynchronization();
0658 }
0659 if (preallocations_.numberOfRuns() > 1) {
0660 warnAboutModulesRequiringRunSynchronization();
0661 }
0662
0663
0664
0665
0666
0667
0668
0669
0670
0671
0672
0673
0674
0675
0676
0677 espController_->finishConfiguration();
0678
0679 eventsetup::ESRecordsToProductResolverIndices esRecordsToProductResolverIndices = esp_->recordsToResolverIndices();
0680
0681 actReg_->eventSetupConfigurationSignal_(esRecordsToProductResolverIndices, processContext_);
0682 try {
0683 convertException::wrap([&]() { input_->doBeginJob(*preg_); });
0684 } catch (cms::Exception& ex) {
0685 ex.addContext("Calling beginJob for the source");
0686 throw;
0687 }
0688
0689 beginJobStartedModules_ = true;
0690
0691
0692
0693
0694 std::exception_ptr firstException;
0695 CMS_SA_ALLOW try {
0696 schedule_->beginJob(
0697 *preg_, esRecordsToProductResolverIndices, *processBlockHelper_, processContext_.processName());
0698 } catch (...) {
0699 firstException = std::current_exception();
0700 }
0701 if (looper_ && !firstException) {
0702 CMS_SA_ALLOW try {
0703 constexpr bool mustPrefetchMayGet = true;
0704 auto const processBlockLookup = preg_->productLookup(InProcess);
0705 auto const runLookup = preg_->productLookup(InRun);
0706 auto const lumiLookup = preg_->productLookup(InLumi);
0707 auto const eventLookup = preg_->productLookup(InEvent);
0708 looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
0709 looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
0710 looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
0711 looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
0712 looper_->updateLookup(esRecordsToProductResolverIndices);
0713 } catch (...) {
0714 firstException = std::current_exception();
0715 }
0716 }
0717 if (firstException) {
0718 std::rethrow_exception(firstException);
0719 }
0720 pathsAndConsumesOfModules.initializeForEventSetup(*esp_);
0721 actReg_->lookupInitializationCompleteSignal_(pathsAndConsumesOfModules, processContext_);
0722 schedule_->releaseMemoryPostLookupSignal();
0723
0724 beginJobSucceeded_ = true;
0725 beginStreams();
0726 }
0727
0728 void EventProcessor::beginStreams() {
0729
0730
0731 oneapi::tbb::task_group group;
0732 FinalWaitingTask finalWaitingTask{group};
0733 using namespace edm::waiting_task::chain;
0734 {
0735 WaitingTaskHolder taskHolder(group, &finalWaitingTask);
0736 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0737 first([this, i](auto nextTask) {
0738 std::exception_ptr exceptionPtr;
0739 {
0740 ServiceRegistry::Operate operate(serviceToken_);
0741 CMS_SA_ALLOW try { schedule_->beginStream(i); } catch (...) {
0742 exceptionPtr = std::current_exception();
0743 }
0744 }
0745 nextTask.doneWaiting(exceptionPtr);
0746 }) | lastTask(taskHolder);
0747 }
0748 }
0749 finalWaitingTask.wait();
0750 }
0751
0752 void EventProcessor::endStreams(ExceptionCollector& collector) noexcept {
0753 std::mutex collectorMutex;
0754
0755
0756
0757 oneapi::tbb::task_group group;
0758 FinalWaitingTask finalWaitingTask{group};
0759 using namespace edm::waiting_task::chain;
0760 {
0761 WaitingTaskHolder taskHolder(group, &finalWaitingTask);
0762 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0763 first([this, i, &collector, &collectorMutex](auto nextTask) {
0764 {
0765 ServiceRegistry::Operate operate(serviceToken_);
0766 schedule_->endStream(i, collector, collectorMutex);
0767 }
0768 }) | lastTask(taskHolder);
0769 }
0770 }
0771 finalWaitingTask.waitNoThrow();
0772 }
0773
0774 void EventProcessor::endJob() {
0775
0776 ExceptionCollector c(
0777 "Multiple exceptions were thrown while executing endStream and endJob. An exception message follows for "
0778 "each.\n");
0779
0780
0781 ServiceRegistry::Operate operate(serviceToken_);
0782
0783 if (beginJobSucceeded_) {
0784 endStreams(c);
0785 }
0786
0787 if (beginJobStartedModules_) {
0788 schedule_->endJob(c);
0789 c.call(std::bind(&InputSource::doEndJob, input_.get()));
0790 if (looper_) {
0791 c.call(std::bind(&EDLooperBase::endOfJob, looper()));
0792 }
0793 if (c.hasThrown()) {
0794 c.rethrow();
0795 }
0796 }
0797 }
0798
0799 ServiceToken EventProcessor::getToken() { return serviceToken_; }
0800
0801 std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
0802 return schedule_->getAllModuleDescriptions();
0803 }
0804
0805 int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
0806
0807 int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
0808
0809 int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
0810
0811 void EventProcessor::clearCounters() { schedule_->clearCounters(); }
0812
0813 namespace {
0814 #include "TransitionProcessors.icc"
0815 }
0816
0817 bool EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
0818 bool returnValue = false;
0819
0820
0821 if (shutdown_flag.load(std::memory_order_acquire)) {
0822 returnValue = true;
0823 edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
0824 edm::Service<edm::JobReport> jr;
0825 jr->reportShutdownSignal();
0826 returnCode = epSignal;
0827 }
0828 return returnValue;
0829 }
0830
0831 namespace {
0832 struct SourceNextGuard {
0833 SourceNextGuard(edm::ActivityRegistry& iReg) : act_(iReg) { iReg.preSourceNextTransitionSignal_.emit(); }
0834 ~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
0835 edm::ActivityRegistry& act_;
0836 };
0837 }
0838
0839 InputSource::ItemTypeInfo EventProcessor::nextTransitionType() {
0840 SendSourceTerminationSignalIfException sentry(actReg_.get());
0841 InputSource::ItemTypeInfo itemTypeInfo;
0842 {
0843 SourceNextGuard guard(*actReg_.get());
0844
0845 do {
0846 itemTypeInfo = input_->nextItemType();
0847 } while (itemTypeInfo == InputSource::ItemType::IsSynchronize);
0848 }
0849 lastSourceTransition_ = itemTypeInfo;
0850 sentry.completedSuccessfully();
0851
0852 StatusCode returnCode = epSuccess;
0853
0854 if (checkForAsyncStopRequest(returnCode)) {
0855 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
0856 lastSourceTransition_ = InputSource::ItemType::IsStop;
0857 }
0858
0859 return lastSourceTransition_;
0860 }
0861
0862 void EventProcessor::nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
0863 WaitingTaskHolder nextTask) {
0864 auto group = nextTask.group();
0865 sourceResourcesAcquirer_.serialQueueChain().push(
0866 *group, [this, runStatus = std::move(iRunStatus), nextHolder = std::move(nextTask)]() mutable {
0867 CMS_SA_ALLOW try {
0868 ServiceRegistry::Operate operate(serviceToken_);
0869 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
0870 nextTransitionType();
0871 if (lastTransitionType() == InputSource::ItemType::IsRun &&
0872 runStatus->runPrincipal()->run() == input_->run() &&
0873 runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
0874 throw Exception(errors::LogicError)
0875 << "InputSource claimed previous Run Entry was last to be merged in this file,\n"
0876 << "but the next entry has the same run number and reduced ProcessHistoryID.\n"
0877 << "This is probably a bug in the InputSource. Please report to the Core group.\n";
0878 }
0879 } catch (...) {
0880 nextHolder.doneWaiting(std::current_exception());
0881 }
0882 });
0883 }
0884
0885 EventProcessor::StatusCode EventProcessor::runToCompletion() {
0886 beginJob();
0887
0888
0889 ServiceRegistry::Operate operate(serviceToken_);
0890 actReg_->beginProcessingSignal_();
0891 auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
0892 std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
0893 try {
0894 FilesProcessor fp(fileModeNoMerge_);
0895
0896 convertException::wrap([&]() {
0897 bool firstTime = true;
0898 do {
0899 if (not firstTime) {
0900 prepareForNextLoop();
0901 rewindInput();
0902 } else {
0903 firstTime = false;
0904 }
0905 startingNewLoop();
0906
0907 auto trans = fp.processFiles(*this);
0908
0909 fp.normalEnd();
0910
0911 if (deferredExceptionPtrIsSet_.load()) {
0912 std::rethrow_exception(deferredExceptionPtr_);
0913 }
0914 if (trans != InputSource::ItemType::IsStop) {
0915
0916 doErrorStuff();
0917
0918 throw cms::Exception("BadTransition") << "Unexpected transition change " << static_cast<int>(trans);
0919 }
0920 } while (not endOfLoop());
0921 });
0922
0923 }
0924 catch (cms::Exception& e) {
0925 if (exceptionMessageLumis_) {
0926 std::string message(
0927 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
0928 e.addAdditionalInfo(message);
0929 if (e.alreadyPrinted()) {
0930 LogAbsolute("Additional Exceptions") << message;
0931 }
0932 }
0933 if (exceptionMessageRuns_) {
0934 std::string message(
0935 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
0936 e.addAdditionalInfo(message);
0937 if (e.alreadyPrinted()) {
0938 LogAbsolute("Additional Exceptions") << message;
0939 }
0940 }
0941 if (!exceptionMessageFiles_.empty()) {
0942 e.addAdditionalInfo(exceptionMessageFiles_);
0943 if (e.alreadyPrinted()) {
0944 LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
0945 }
0946 }
0947 throw;
0948 }
0949 return epSuccess;
0950 }
0951
0952 void EventProcessor::readFile() {
0953 FDEBUG(1) << " \treadFile\n";
0954 SendSourceTerminationSignalIfException sentry(actReg_.get());
0955
0956 if (streamRunActive_ > 0) {
0957
0958
0959 streamRunStatus_[0]->runPrincipal()->preReadFile();
0960 }
0961
0962 auto oldCacheID = input_->productRegistry().cacheIdentifier();
0963 fb_ = input_->readFile();
0964
0965 if (input_->productRegistry().cacheIdentifier() != oldCacheID) {
0966 auto temp = std::make_shared<edm::ProductRegistry>(*preg_);
0967 temp->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string());
0968 preg_ = std::move(temp);
0969 }
0970 if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
0971 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
0972 }
0973 sentry.completedSuccessfully();
0974 }
0975
0976 void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
0977 if (fileBlockValid()) {
0978 SendSourceTerminationSignalIfException sentry(actReg_.get());
0979 input_->closeFile(fb_.get(), cleaningUpAfterException);
0980 sentry.completedSuccessfully();
0981 }
0982 FDEBUG(1) << "\tcloseInputFile\n";
0983 }
0984
0985 void EventProcessor::openOutputFiles() {
0986 if (fileBlockValid()) {
0987 schedule_->openOutputFiles(*fb_);
0988 }
0989 FDEBUG(1) << "\topenOutputFiles\n";
0990 }
0991
0992 void EventProcessor::closeOutputFiles() {
0993 schedule_->closeOutputFiles();
0994 processBlockHelper_->clearAfterOutputFilesClose();
0995 FDEBUG(1) << "\tcloseOutputFiles\n";
0996 }
0997
0998 void EventProcessor::respondToOpenInputFile() {
0999 if (fileBlockValid()) {
1000 schedule_->respondToOpenInputFile(*fb_);
1001 }
1002 FDEBUG(1) << "\trespondToOpenInputFile\n";
1003 }
1004
1005 void EventProcessor::respondToCloseInputFile() {
1006 if (fileBlockValid()) {
1007 schedule_->respondToCloseInputFile(*fb_);
1008 }
1009 FDEBUG(1) << "\trespondToCloseInputFile\n";
1010 }
1011
1012 void EventProcessor::startingNewLoop() {
1013 shouldWeStop_ = false;
1014
1015
1016 if (looper_ && looperBeginJobRun_) {
1017 looper_->doStartingNewLoop();
1018 }
1019 FDEBUG(1) << "\tstartingNewLoop\n";
1020 }
1021
1022 bool EventProcessor::endOfLoop() {
1023 if (looper_) {
1024 SignallingProductRegistryFiller sReg(*preg());
1025 ModuleChanger changer(schedule_.get(), &sReg, esp_->recordsToResolverIndices());
1026 looper_->setModuleChanger(&changer);
1027 EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1028 looper_->setModuleChanger(nullptr);
1029 if (status != EDLooperBase::kContinue || forceLooperToEnd_)
1030 return true;
1031 else
1032 return false;
1033 }
1034 FDEBUG(1) << "\tendOfLoop\n";
1035 return true;
1036 }
1037
1038 void EventProcessor::rewindInput() {
1039 input_->repeat();
1040 input_->rewind();
1041 FDEBUG(1) << "\trewind\n";
1042 }
1043
1044 void EventProcessor::prepareForNextLoop() {
1045 looper_->prepareForNextLoop(esp_.get());
1046 FDEBUG(1) << "\tprepareForNextLoop\n";
1047 }
1048
1049 bool EventProcessor::shouldWeCloseOutput() const {
1050 FDEBUG(1) << "\tshouldWeCloseOutput\n";
1051 return schedule_->shouldWeCloseOutput();
1052 }
1053
1054 void EventProcessor::doErrorStuff() {
1055 FDEBUG(1) << "\tdoErrorStuff\n";
1056 LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1057 << "and went to the error state\n"
1058 << "Will attempt to terminate processing normally\n"
1059 << "(IF using the looper the next loop will be attempted)\n"
1060 << "This likely indicates a bug in an input module or corrupted input or both\n";
1061 }
1062
1063 void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1064 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1065 processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1066
1067 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1068 FinalWaitingTask globalWaitTask{taskGroup_};
1069
1070 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1071 schedule_->processOneGlobalAsync<Traits>(
1072 WaitingTaskHolder(taskGroup_, &globalWaitTask), transitionInfo, serviceToken_);
1073
1074 globalWaitTask.wait();
1075 beginProcessBlockSucceeded = true;
1076 }
1077
1078 void EventProcessor::inputProcessBlocks() {
1079 input_->fillProcessBlockHelper();
1080 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1081 while (input_->nextProcessBlock(processBlockPrincipal)) {
1082 readProcessBlock(processBlockPrincipal);
1083
1084 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1085 FinalWaitingTask globalWaitTask{taskGroup_};
1086
1087 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1088 schedule_->processOneGlobalAsync<Traits>(
1089 WaitingTaskHolder(taskGroup_, &globalWaitTask), transitionInfo, serviceToken_);
1090
1091 globalWaitTask.wait();
1092
1093 FinalWaitingTask writeWaitTask{taskGroup_};
1094 writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::Input);
1095 writeWaitTask.wait();
1096
1097 processBlockPrincipal.clearPrincipal();
1098 }
1099 }
1100
1101 void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1102 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1103
1104 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1105 FinalWaitingTask globalWaitTask{taskGroup_};
1106
1107 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1108 schedule_->processOneGlobalAsync<Traits>(
1109 WaitingTaskHolder(taskGroup_, &globalWaitTask), transitionInfo, serviceToken_, cleaningUpAfterException);
1110 globalWaitTask.wait();
1111
1112 if (beginProcessBlockSucceeded) {
1113 FinalWaitingTask writeWaitTask{taskGroup_};
1114 writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::New);
1115 writeWaitTask.wait();
1116 }
1117
1118 processBlockPrincipal.clearPrincipal();
1119 }
1120
1121 InputSource::ItemType EventProcessor::processRuns() {
1122 FinalWaitingTask waitTask{taskGroup_};
1123 assert(lastTransitionType() == InputSource::ItemType::IsRun);
1124 if (streamRunActive_ == 0) {
1125 assert(streamLumiActive_ == 0);
1126
1127 beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1128 WaitingTaskHolder{taskGroup_, &waitTask});
1129 } else {
1130 assert(streamRunActive_ == preallocations_.numberOfStreams());
1131
1132 auto runStatus = streamRunStatus_[0];
1133
1134 while (lastTransitionType() == InputSource::ItemType::IsRun and
1135 runStatus->runPrincipal()->run() == input_->run() and
1136 runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1137 readAndMergeRun(*runStatus);
1138 nextTransitionType();
1139 }
1140
1141 setNeedToCallNext(false);
1142
1143 WaitingTaskHolder holder{taskGroup_, &waitTask};
1144 runStatus->setHolderOfTaskInProcessRuns(holder);
1145 if (streamLumiActive_ > 0) {
1146 assert(streamLumiActive_ == preallocations_.numberOfStreams());
1147 continueLumiAsync(std::move(holder));
1148 } else {
1149 handleNextItemAfterMergingRunEntries(std::move(runStatus), std::move(holder));
1150 }
1151 }
1152 waitTask.wait();
1153 return lastTransitionType();
1154 }
1155
1156 void EventProcessor::beginRunAsync(IOVSyncValue const& iSync, WaitingTaskHolder iHolder) {
1157 if (iHolder.taskHasFailed()) {
1158 return;
1159 }
1160
1161 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1162
1163 auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1164
1165 chain::first([this, &status, iSync](auto nextTask) {
1166 espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1167 nextTask,
1168 status->endIOVWaitingTasks(),
1169 status->eventSetupImpls(),
1170 queueWhichWaitsForIOVsToFinish_,
1171 actReg_.get(),
1172 serviceToken_,
1173 forceESCacheClearOnNewRun_);
1174 }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1175 CMS_SA_ALLOW try {
1176 if (iException) {
1177 WaitingTaskHolder copyHolder(nextTask);
1178 copyHolder.doneWaiting(*iException);
1179
1180 }
1181 ServiceRegistry::Operate operate(serviceToken_);
1182
1183 runQueue_->pushAndPause(
1184 *nextTask.group(),
1185 [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1186 CMS_SA_ALLOW try {
1187 if (postRunQueueTask.taskHasFailed()) {
1188 status->resetBeginResources();
1189 queueWhichWaitsForIOVsToFinish_.resume();
1190 return;
1191 }
1192
1193 status->setResumer(std::move(iResumer));
1194
1195 sourceResourcesAcquirer_.serialQueueChain().push(
1196 *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1197 CMS_SA_ALLOW try {
1198 ServiceRegistry::Operate operate(serviceToken_);
1199
1200 if (postSourceTask.taskHasFailed()) {
1201 status->resetBeginResources();
1202 queueWhichWaitsForIOVsToFinish_.resume();
1203 status->resumeGlobalRunQueue();
1204 return;
1205 }
1206
1207 {
1208 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1209 status->setRunPrincipal(readRun());
1210
1211 RunPrincipal& runPrincipal = *status->runPrincipal();
1212 {
1213 SendSourceTerminationSignalIfException sentry(actReg_.get());
1214 input_->doBeginRun(runPrincipal, &processContext_);
1215 sentry.completedSuccessfully();
1216 }
1217 }
1218
1219 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1220 if (looper_ && looperBeginJobRun_ == false) {
1221 looper_->copyInfo(ScheduleInfo(schedule_.get()));
1222
1223 oneapi::tbb::task_group group;
1224 FinalWaitingTask waitTask{group};
1225 using namespace edm::waiting_task::chain;
1226 chain::first([this, &es](auto nextTask) {
1227 looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1228 }) | then([this, &es](auto nextTask) mutable {
1229 looper_->beginOfJob(es);
1230 looperBeginJobRun_ = true;
1231 looper_->doStartingNewLoop();
1232 }) | runLast(WaitingTaskHolder(group, &waitTask));
1233 waitTask.wait();
1234 }
1235
1236 using namespace edm::waiting_task::chain;
1237 chain::first([this, status](auto nextTask) mutable {
1238 CMS_SA_ALLOW try {
1239 if (lastTransitionType().itemPosition() != InputSource::ItemPosition::LastItemToBeMerged) {
1240 readAndMergeRunEntriesAsync(status, nextTask);
1241 } else {
1242 setNeedToCallNext(true);
1243 }
1244 } catch (...) {
1245 status->setStopBeforeProcessingRun(true);
1246 nextTask.doneWaiting(std::current_exception());
1247 }
1248 }) | then([this, status, &es](auto nextTask) {
1249 if (status->stopBeforeProcessingRun()) {
1250 return;
1251 }
1252 RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1253 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1254 schedule_->processOneGlobalAsync<Traits>(nextTask, transitionInfo, serviceToken_);
1255 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1256 if (status->stopBeforeProcessingRun()) {
1257 return;
1258 }
1259 looper_->prefetchAsync(
1260 nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1261 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1262 if (status->stopBeforeProcessingRun()) {
1263 return;
1264 }
1265 ServiceRegistry::Operate operateLooper(serviceToken_);
1266 looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1267 }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1268 if (iException) {
1269 WaitingTaskHolder copyHolder(holder);
1270 copyHolder.doneWaiting(*iException);
1271 } else {
1272 status->globalBeginDidSucceed();
1273 }
1274
1275 if (status->stopBeforeProcessingRun()) {
1276
1277 status->resetBeginResources();
1278 queueWhichWaitsForIOVsToFinish_.resume();
1279 status->resumeGlobalRunQueue();
1280 return;
1281 }
1282 CMS_SA_ALLOW try {
1283
1284
1285 auto globalEndRunTask =
1286 edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1287 WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1288 status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1289 globalEndRunAsync(std::move(taskHolder), std::move(status));
1290 });
1291 status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1292 } catch (...) {
1293 status->resetBeginResources();
1294 queueWhichWaitsForIOVsToFinish_.resume();
1295 status->resumeGlobalRunQueue();
1296 holder.doneWaiting(std::current_exception());
1297 return;
1298 }
1299
1300
1301
1302 ServiceRegistry::Operate operate(serviceToken_);
1303
1304
1305
1306
1307 PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1308
1309 CMS_SA_ALLOW try {
1310 streamQueuesInserter_.push(*holder.group(), [this, status, holder]() mutable {
1311 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1312 CMS_SA_ALLOW try {
1313 streamQueues_[i].push(*holder.group(), [this, i, status, holder]() mutable {
1314 streamBeginRunAsync(i, std::move(status), std::move(holder));
1315 });
1316 } catch (...) {
1317 if (status->streamFinishedBeginRun()) {
1318 WaitingTaskHolder copyHolder(holder);
1319 copyHolder.doneWaiting(std::current_exception());
1320 status->resetBeginResources();
1321 queueWhichWaitsForIOVsToFinish_.resume();
1322 exceptionRunStatus_ = status;
1323 }
1324 }
1325 }
1326 });
1327 } catch (...) {
1328 WaitingTaskHolder copyHolder(holder);
1329 copyHolder.doneWaiting(std::current_exception());
1330 status->resetBeginResources();
1331 queueWhichWaitsForIOVsToFinish_.resume();
1332 exceptionRunStatus_ = status;
1333 }
1334 handleNextItemAfterMergingRunEntries(status, holder);
1335 }) | runLast(postSourceTask);
1336 } catch (...) {
1337 status->resetBeginResources();
1338 queueWhichWaitsForIOVsToFinish_.resume();
1339 status->resumeGlobalRunQueue();
1340 postSourceTask.doneWaiting(std::current_exception());
1341 }
1342 });
1343 } catch (...) {
1344 status->resetBeginResources();
1345 queueWhichWaitsForIOVsToFinish_.resume();
1346 status->resumeGlobalRunQueue();
1347 postRunQueueTask.doneWaiting(std::current_exception());
1348 }
1349 });
1350 } catch (...) {
1351 status->resetBeginResources();
1352 queueWhichWaitsForIOVsToFinish_.resume();
1353 nextTask.doneWaiting(std::current_exception());
1354 }
1355 }) | chain::runLast(std::move(iHolder));
1356 }
1357
1358 void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1359 std::shared_ptr<RunProcessingStatus> status,
1360 WaitingTaskHolder iHolder) noexcept {
1361
1362 streamQueues_[iStream].pause();
1363 ++streamRunActive_;
1364 streamRunStatus_[iStream] = std::move(status);
1365
1366 CMS_SA_ALLOW try {
1367 using namespace edm::waiting_task::chain;
1368 chain::first([this, iStream](auto nextTask) {
1369 RunProcessingStatus& rs = *streamRunStatus_[iStream];
1370 if (rs.didGlobalBeginSucceed()) {
1371 RunTransitionInfo transitionInfo(
1372 *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1373 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1374 schedule_->processOneStreamAsync<Traits>(std::move(nextTask), iStream, transitionInfo, serviceToken_);
1375 }
1376 }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1377 if (exceptionFromBeginStreamRun) {
1378 nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1379 }
1380 releaseBeginRunResources(iStream);
1381 }) | runLast(iHolder);
1382 } catch (...) {
1383 releaseBeginRunResources(iStream);
1384 iHolder.doneWaiting(std::current_exception());
1385 }
1386 }
1387
1388 void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1389 auto& status = streamRunStatus_[iStream];
1390 if (status->streamFinishedBeginRun()) {
1391 status->resetBeginResources();
1392 queueWhichWaitsForIOVsToFinish_.resume();
1393 }
1394 streamQueues_[iStream].resume();
1395 }
1396
1397 void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1398 RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1399 iRunStatus->setEndTime();
1400 IOVSyncValue ts(
1401 EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1402 runPrincipal.endTime());
1403 CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1404 WaitingTaskHolder copyHolder(iHolder);
1405 copyHolder.doneWaiting(std::current_exception());
1406 }
1407
1408 chain::first([this, &iRunStatus, &ts](auto nextTask) {
1409 espController_->runOrQueueEventSetupForInstanceAsync(ts,
1410 nextTask,
1411 iRunStatus->endIOVWaitingTasksEndRun(),
1412 iRunStatus->eventSetupImplsEndRun(),
1413 queueWhichWaitsForIOVsToFinish_,
1414 actReg_.get(),
1415 serviceToken_);
1416 }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1417 if (iException) {
1418 iRunStatus->setEndingEventSetupSucceeded(false);
1419 handleEndRunExceptions(*iException, nextTask);
1420 }
1421 ServiceRegistry::Operate operate(serviceToken_);
1422 streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1423 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1424 CMS_SA_ALLOW try {
1425 streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1426 streamQueues_[i].pause();
1427 streamEndRunAsync(std::move(nextTask), i);
1428 });
1429 } catch (...) {
1430 WaitingTaskHolder copyHolder(nextTask);
1431 copyHolder.doneWaiting(std::current_exception());
1432 }
1433 }
1434 });
1435
1436 if (lastTransitionType() == InputSource::ItemType::IsRun) {
1437 CMS_SA_ALLOW try {
1438 beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1439 } catch (...) {
1440 WaitingTaskHolder copyHolder(nextTask);
1441 copyHolder.doneWaiting(std::current_exception());
1442 }
1443 }
1444 }) | chain::runLast(std::move(iHolder));
1445 }
1446
1447 void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1448 if (holder.taskHasFailed()) {
1449 setExceptionMessageRuns();
1450 } else {
1451 WaitingTaskHolder tmp(holder);
1452 tmp.doneWaiting(iException);
1453 }
1454 }
1455
1456 void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1457 auto& runPrincipal = *(iRunStatus->runPrincipal());
1458 bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1459 bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1460 EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1461 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1462 bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1463
1464 MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1465 using namespace edm::waiting_task::chain;
1466 chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1467 auto nextTask) {
1468 if (endingEventSetupSucceeded) {
1469 RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1470 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1471 schedule_->processOneGlobalAsync<Traits>(
1472 std::move(nextTask), transitionInfo, serviceToken_, cleaningUpAfterException);
1473 }
1474 }) |
1475 ifThen(looper_ && endingEventSetupSucceeded,
1476 [this, &runPrincipal, &es](auto nextTask) {
1477 looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1478 }) |
1479 ifThen(looper_ && endingEventSetupSucceeded,
1480 [this, &runPrincipal, &es](auto nextTask) {
1481 ServiceRegistry::Operate operate(serviceToken_);
1482 looper_->doEndRun(runPrincipal, es, &processContext_);
1483 }) |
1484 ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1485 [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1486 mergeableRunProductMetadata->preWriteRun();
1487 writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1488 }) |
1489 then([status = std::move(iRunStatus),
1490 this,
1491 didGlobalBeginSucceed,
1492 mergeableRunProductMetadata,
1493 endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1494 if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1495 mergeableRunProductMetadata->postWriteRun();
1496 }
1497 if (iException) {
1498 handleEndRunExceptions(*iException, nextTask);
1499 }
1500 ServiceRegistry::Operate operate(serviceToken_);
1501
1502 std::exception_ptr ptr;
1503
1504
1505
1506
1507 CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1508 if (not ptr) {
1509 ptr = std::current_exception();
1510 }
1511 }
1512 CMS_SA_ALLOW try {
1513 status->resumeGlobalRunQueue();
1514 queueWhichWaitsForIOVsToFinish_.resume();
1515 } catch (...) {
1516 if (not ptr) {
1517 ptr = std::current_exception();
1518 }
1519 }
1520 CMS_SA_ALLOW try {
1521 status->resetEndResources();
1522 status.reset();
1523 } catch (...) {
1524 if (not ptr) {
1525 ptr = std::current_exception();
1526 }
1527 }
1528
1529 if (ptr && !iException) {
1530 handleEndRunExceptions(ptr, nextTask);
1531 }
1532 }) |
1533 runLast(std::move(iTask));
1534 }
1535
1536 void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1537 CMS_SA_ALLOW try {
1538 if (!streamRunStatus_[iStreamIndex]) {
1539 if (exceptionRunStatus_->streamFinishedRun()) {
1540 exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1541 exceptionRunStatus_.reset();
1542 }
1543 return;
1544 }
1545
1546 auto runDoneTask =
1547 edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1548 if (iException) {
1549 handleEndRunExceptions(*iException, iTask);
1550 }
1551
1552 auto runStatus = streamRunStatus_[iStreamIndex];
1553
1554
1555 if (runStatus->streamFinishedRun()) {
1556 runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1557 }
1558 streamRunStatus_[iStreamIndex].reset();
1559 --streamRunActive_;
1560 streamQueues_[iStreamIndex].resume();
1561 });
1562
1563 WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1564
1565 auto runStatus = streamRunStatus_[iStreamIndex].get();
1566
1567 if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1568 EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1569 auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1570 bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1571
1572 auto& runPrincipal = *runStatus->runPrincipal();
1573 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1574 RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1575 schedule_->processOneStreamAsync<Traits>(
1576 std::move(runDoneTaskHolder), iStreamIndex, transitionInfo, serviceToken_, cleaningUpAfterException);
1577 }
1578 } catch (...) {
1579 handleEndRunExceptions(std::current_exception(), iTask);
1580 }
1581 }
1582
1583 void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1584 if (streamRunActive_ > 0) {
1585 FinalWaitingTask waitTask{taskGroup_};
1586
1587 auto runStatus = streamRunStatus_[0].get();
1588 runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1589 WaitingTaskHolder holder{taskGroup_, &waitTask};
1590 runStatus->setHolderOfTaskInProcessRuns(holder);
1591 lastSourceTransition_ = InputSource::ItemType::IsStop;
1592 endRunAsync(streamRunStatus_[0], std::move(holder));
1593 waitTask.wait();
1594 }
1595 }
1596
1597 void EventProcessor::beginLumiAsync(IOVSyncValue const& iSync,
1598 std::shared_ptr<RunProcessingStatus> iRunStatus,
1599 edm::WaitingTaskHolder iHolder) {
1600 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1601
1602 auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1603 chain::first([this, &iSync, &status](auto nextTask) {
1604 espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1605 nextTask,
1606 status->endIOVWaitingTasks(),
1607 status->eventSetupImpls(),
1608 queueWhichWaitsForIOVsToFinish_,
1609 actReg_.get(),
1610 serviceToken_);
1611 }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1612 CMS_SA_ALLOW try {
1613
1614 if (iException) {
1615 WaitingTaskHolder copyHolder(nextTask);
1616 copyHolder.doneWaiting(*iException);
1617 }
1618
1619 lumiQueue_->pushAndPause(
1620 *nextTask.group(),
1621 [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1622 CMS_SA_ALLOW try {
1623 if (postLumiQueueTask.taskHasFailed()) {
1624 status->resetResources();
1625 queueWhichWaitsForIOVsToFinish_.resume();
1626 endRunAsync(iRunStatus, postLumiQueueTask);
1627 return;
1628 }
1629
1630 status->setResumer(std::move(iResumer));
1631
1632 sourceResourcesAcquirer_.serialQueueChain().push(
1633 *postLumiQueueTask.group(),
1634 [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1635 CMS_SA_ALLOW try {
1636 ServiceRegistry::Operate operate(serviceToken_);
1637
1638 if (postSourceTask.taskHasFailed()) {
1639 status->resetResources();
1640 queueWhichWaitsForIOVsToFinish_.resume();
1641 endRunAsync(iRunStatus, postSourceTask);
1642 return;
1643 }
1644
1645 {
1646 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1647 status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1648
1649 LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1650 {
1651 SendSourceTerminationSignalIfException sentry(actReg_.get());
1652 input_->doBeginLumi(lumiPrincipal, &processContext_);
1653 sentry.completedSuccessfully();
1654 }
1655 }
1656 LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1657 Service<RandomNumberGenerator> rng;
1658 if (rng.isAvailable()) {
1659 LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1660 rng->preBeginLumi(lb);
1661 }
1662
1663 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1664
1665 using namespace edm::waiting_task::chain;
1666 chain::first([this, status](auto nextTask) mutable {
1667 if (lastTransitionType().itemPosition() != InputSource::ItemPosition::LastItemToBeMerged) {
1668 readAndMergeLumiEntriesAsync(std::move(status), std::move(nextTask));
1669 } else {
1670 setNeedToCallNext(true);
1671 }
1672 }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1673 LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1674 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1675 schedule_->processOneGlobalAsync<Traits>(nextTask, transitionInfo, serviceToken_);
1676 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1677 looper_->prefetchAsync(
1678 nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1679 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1680 ServiceRegistry::Operate operateLooper(serviceToken_);
1681 looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1682 }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1683 status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1684
1685 if (iException) {
1686 WaitingTaskHolder copyHolder(holder);
1687 copyHolder.doneWaiting(*iException);
1688 globalEndLumiAsync(holder, status);
1689 endRunAsync(iRunStatus, holder);
1690 } else {
1691 status->globalBeginDidSucceed();
1692
1693 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1694 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1695
1696 streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1697 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1698 streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1699 if (!status->shouldStreamStartLumi()) {
1700 return;
1701 }
1702 streamQueues_[i].pause();
1703
1704 auto& event = principalCache_.eventPrincipal(i);
1705 auto eventSetupImpls = &status->eventSetupImpls();
1706 auto lp = status->lumiPrincipal().get();
1707 streamLumiStatus_[i] = std::move(status);
1708 ++streamLumiActive_;
1709 event.setLuminosityBlockPrincipal(lp);
1710 LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1711 using namespace edm::waiting_task::chain;
1712 chain::first([this, i, &transitionInfo](auto nextTask) {
1713 schedule_->processOneStreamAsync<Traits>(
1714 std::move(nextTask), i, transitionInfo, serviceToken_);
1715 }) |
1716 then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1717 auto nextTask) {
1718 if (exceptionFromBeginStreamLumi) {
1719 WaitingTaskHolder copyHolder(nextTask);
1720 copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1721 }
1722 handleNextEventForStreamAsync(std::move(nextTask), i);
1723 }) |
1724 runLast(std::move(holder));
1725 });
1726 }
1727 });
1728 }
1729 }) | runLast(postSourceTask);
1730 } catch (...) {
1731 status->resetResources();
1732 queueWhichWaitsForIOVsToFinish_.resume();
1733 WaitingTaskHolder copyHolder(postSourceTask);
1734 copyHolder.doneWaiting(std::current_exception());
1735 endRunAsync(iRunStatus, postSourceTask);
1736 }
1737 });
1738 } catch (...) {
1739 status->resetResources();
1740 queueWhichWaitsForIOVsToFinish_.resume();
1741 WaitingTaskHolder copyHolder(postLumiQueueTask);
1742 copyHolder.doneWaiting(std::current_exception());
1743 endRunAsync(iRunStatus, postLumiQueueTask);
1744 }
1745 });
1746 } catch (...) {
1747 status->resetResources();
1748 queueWhichWaitsForIOVsToFinish_.resume();
1749 WaitingTaskHolder copyHolder(nextTask);
1750 copyHolder.doneWaiting(std::current_exception());
1751 endRunAsync(iRunStatus, nextTask);
1752 }
1753 }) | chain::runLast(std::move(iHolder));
1754 }
1755
1756 void EventProcessor::continueLumiAsync(edm::WaitingTaskHolder iHolder) {
1757 chain::first([this](auto nextTask) {
1758
1759 auto status = streamLumiStatus_[0];
1760 status->setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kProcessing);
1761
1762 while (lastTransitionType() == InputSource::ItemType::IsLumi and
1763 status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1764 readAndMergeLumi(*status);
1765 nextTransitionType();
1766 }
1767 }) | chain::then([this](auto nextTask) mutable {
1768 unsigned int streamIndex = 0;
1769 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1770 for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1771 arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1772 }
1773 nextTask.group()->run(
1774 [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1775 }) | chain::runLast(std::move(iHolder));
1776 }
1777
1778 void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1779 if (holder.taskHasFailed()) {
1780 setExceptionMessageLumis();
1781 } else {
1782 WaitingTaskHolder tmp(holder);
1783 tmp.doneWaiting(iException);
1784 }
1785 }
1786
1787 void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask,
1788 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1789
1790
1791 auto& lp = *(iLumiStatus->lumiPrincipal());
1792 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1793 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1794 EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1795 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1796
1797 using namespace edm::waiting_task::chain;
1798 chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1799 IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1800
1801 LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1802 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1803 schedule_->processOneGlobalAsync<Traits>(
1804 std::move(nextTask), transitionInfo, serviceToken_, cleaningUpAfterException);
1805 }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1806
1807 if (didGlobalBeginSucceed) {
1808 writeLumiAsync(std::move(nextTask), lumiPrincipal);
1809 }
1810 }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1811 looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1812 }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1813
1814 ServiceRegistry::Operate operate(serviceToken_);
1815 looper_->doEndLuminosityBlock(lp, es, &processContext_);
1816 }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1817 if (iException) {
1818 handleEndLumiExceptions(*iException, nextTask);
1819 }
1820 ServiceRegistry::Operate operate(serviceToken_);
1821
1822 std::exception_ptr ptr;
1823
1824
1825
1826
1827
1828 CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1829 if (not ptr) {
1830 ptr = std::current_exception();
1831 }
1832 }
1833
1834 CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1835 if (not ptr) {
1836 ptr = std::current_exception();
1837 }
1838 }
1839
1840 CMS_SA_ALLOW try {
1841 status->resetResources();
1842 status->globalEndRunHolderDoneWaiting();
1843 status.reset();
1844 } catch (...) {
1845 if (not ptr) {
1846 ptr = std::current_exception();
1847 }
1848 }
1849
1850 if (ptr && !iException) {
1851 handleEndLumiExceptions(ptr, nextTask);
1852 }
1853 }) | runLast(std::move(iTask));
1854 }
1855
1856 void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1857 auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1858 auto status = streamLumiStatus_[iStreamIndex];
1859 if (iException) {
1860 handleEndLumiExceptions(*iException, iTask);
1861 }
1862
1863
1864 streamLumiStatus_[iStreamIndex].reset();
1865 --streamLumiActive_;
1866 streamQueues_[iStreamIndex].resume();
1867
1868
1869 if (status->streamFinishedLumi()) {
1870 globalEndLumiAsync(iTask, std::move(status));
1871 }
1872 });
1873
1874 edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1875
1876
1877
1878 auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1879 lumiStatus->setEndTime();
1880
1881 EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1882 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1883 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1884
1885 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1886 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1887 LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1888 schedule_->processOneStreamAsync<Traits>(
1889 std::move(lumiDoneTask), iStreamIndex, transitionInfo, serviceToken_, cleaningUpAfterException);
1890 }
1891
1892 void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1893 if (streamRunActive_ == 0) {
1894 assert(streamLumiActive_ == 0);
1895 } else {
1896 assert(streamRunActive_ == preallocations_.numberOfStreams());
1897 if (streamLumiActive_ > 0) {
1898 FinalWaitingTask globalWaitTask{taskGroup_};
1899 assert(streamLumiActive_ == preallocations_.numberOfStreams());
1900 streamLumiStatus_[0]->noMoreEventsInLumi();
1901 streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1902 {
1903 WaitingTaskHolder holder{taskGroup_, &globalWaitTask};
1904 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1905 streamEndLumiAsync(holder, i);
1906 }
1907 }
1908 globalWaitTask.wait();
1909 }
1910 }
1911 }
1912
1913 void EventProcessor::readProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
1914 SendSourceTerminationSignalIfException sentry(actReg_.get());
1915 input_->readProcessBlock(processBlockPrincipal);
1916 sentry.completedSuccessfully();
1917 }
1918
1919 std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
1920 auto rp = principalCache_.getAvailableRunPrincipalPtr();
1921
1922 rp->possiblyUpdateAfterAddition(preg());
1923 assert(rp);
1924 rp->setAux(*input_->runAuxiliary());
1925 {
1926 SendSourceTerminationSignalIfException sentry(actReg_.get());
1927 input_->readRun(*rp, *historyAppender_);
1928 sentry.completedSuccessfully();
1929 }
1930 assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1931 return rp;
1932 }
1933
1934 void EventProcessor::readAndMergeRun(RunProcessingStatus& iStatus) {
1935 RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1936
1937
1938 runPrincipal.possiblyUpdateAfterAddition(preg());
1939
1940 runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
1941 {
1942 SendSourceTerminationSignalIfException sentry(actReg_.get());
1943 input_->readAndMergeRun(runPrincipal);
1944 sentry.completedSuccessfully();
1945 }
1946 }
1947
1948 std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
1949 auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
1950
1951 lbp->possiblyUpdateAfterAddition(preg());
1952 assert(lbp);
1953 lbp->setAux(*input_->luminosityBlockAuxiliary());
1954 {
1955 SendSourceTerminationSignalIfException sentry(actReg_.get());
1956 input_->readLuminosityBlock(*lbp, *historyAppender_);
1957 sentry.completedSuccessfully();
1958 }
1959 lbp->setRunPrincipal(std::move(rp));
1960 return lbp;
1961 }
1962
1963 void EventProcessor::readAndMergeLumi(LuminosityBlockProcessingStatus& iStatus) {
1964 auto& lumiPrincipal = *iStatus.lumiPrincipal();
1965 assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1966 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1967 input_->processHistoryRegistry().reducedProcessHistoryID(
1968 input_->luminosityBlockAuxiliary()->processHistoryID()));
1969
1970
1971 lumiPrincipal.possiblyUpdateAfterAddition(preg());
1972 lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1973 {
1974 SendSourceTerminationSignalIfException sentry(actReg_.get());
1975 input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1976 sentry.completedSuccessfully();
1977 }
1978 }
1979
1980 void EventProcessor::writeProcessBlockAsync(WaitingTaskHolder task, ProcessBlockType processBlockType) {
1981 ServiceRegistry::Operate op(serviceToken_);
1982
1983 schedule_->writeProcessBlockAsync(
1984 task, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1985 }
1986
1987 void EventProcessor::writeRunAsync(WaitingTaskHolder task,
1988 RunPrincipal const& runPrincipal,
1989 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1990 if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
1991 ServiceRegistry::Operate op(serviceToken_);
1992
1993 schedule_->writeRunAsync(task, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
1994 }
1995 }
1996
1997 void EventProcessor::clearRunPrincipal(RunProcessingStatus& iStatus) {
1998 iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
1999 iStatus.runPrincipal()->clearPrincipal();
2000 }
2001
2002 void EventProcessor::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& lumiPrincipal) {
2003 using namespace edm::waiting_task;
2004 if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2005 chain::first([&](auto nextTask) {
2006 ServiceRegistry::Operate op(serviceToken_);
2007
2008 lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2009 schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2010 }) | chain::lastTask(std::move(task));
2011 }
2012 }
2013
2014 void EventProcessor::clearLumiPrincipal(LuminosityBlockProcessingStatus& iStatus) {
2015 iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2016 iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2017 iStatus.lumiPrincipal()->clearPrincipal();
2018 }
2019
2020 void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2021 WaitingTaskHolder iHolder) {
2022 auto group = iHolder.group();
2023 sourceResourcesAcquirer_.serialQueueChain().push(
2024 *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2025 CMS_SA_ALLOW try {
2026 ServiceRegistry::Operate operate(serviceToken_);
2027
2028 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2029
2030 nextTransitionType();
2031 setNeedToCallNext(false);
2032
2033 while (lastTransitionType() == InputSource::ItemType::IsRun and
2034 status->runPrincipal()->run() == input_->run() and
2035 status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2036 if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2037 status->setStopBeforeProcessingRun(true);
2038 return;
2039 }
2040 readAndMergeRun(*status);
2041 if (lastTransitionType().itemPosition() == InputSource::ItemPosition::LastItemToBeMerged) {
2042 setNeedToCallNext(true);
2043 return;
2044 }
2045 nextTransitionType();
2046 }
2047 } catch (...) {
2048 status->setStopBeforeProcessingRun(true);
2049 holder.doneWaiting(std::current_exception());
2050 }
2051 });
2052 }
2053
2054 void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2055 WaitingTaskHolder iHolder) {
2056 auto group = iHolder.group();
2057 sourceResourcesAcquirer_.serialQueueChain().push(
2058 *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2059 CMS_SA_ALLOW try {
2060 ServiceRegistry::Operate operate(serviceToken_);
2061
2062 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2063
2064 nextTransitionType();
2065 setNeedToCallNext(false);
2066
2067 while (lastTransitionType() == InputSource::ItemType::IsLumi and
2068 iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2069 readAndMergeLumi(*iLumiStatus);
2070 if (lastTransitionType().itemPosition() == InputSource::ItemPosition::LastItemToBeMerged) {
2071 setNeedToCallNext(true);
2072 return;
2073 }
2074 nextTransitionType();
2075 }
2076 } catch (...) {
2077 holder.doneWaiting(std::current_exception());
2078 }
2079 });
2080 }
2081
2082 void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2083 WaitingTaskHolder iHolder) {
2084 chain::first([this, iRunStatus](auto nextTask) mutable {
2085 if (needToCallNext()) {
2086 nextTransitionTypeAsync(std::move(iRunStatus), std::move(nextTask));
2087 }
2088 }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
2089 ServiceRegistry::Operate operate(serviceToken_);
2090 if (iException) {
2091 WaitingTaskHolder copyHolder(nextTask);
2092 copyHolder.doneWaiting(*iException);
2093 }
2094 if (lastTransitionType() == InputSource::ItemType::IsFile) {
2095 iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2096 return;
2097 }
2098 if (lastTransitionType() == InputSource::ItemType::IsLumi && !nextTask.taskHasFailed()) {
2099 CMS_SA_ALLOW try {
2100 beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2101 input_->luminosityBlockAuxiliary()->beginTime()),
2102 iRunStatus,
2103 nextTask);
2104 return;
2105 } catch (...) {
2106 WaitingTaskHolder copyHolder(nextTask);
2107 copyHolder.doneWaiting(std::current_exception());
2108 }
2109 }
2110
2111
2112 endRunAsync(iRunStatus, std::move(nextTask));
2113 }) | chain::runLast(std::move(iHolder));
2114 }
2115
2116 bool EventProcessor::readNextEventForStream(WaitingTaskHolder const& iTask,
2117 unsigned int iStreamIndex,
2118 LuminosityBlockProcessingStatus& iStatus) {
2119
2120
2121
2122 if (iTask.taskHasFailed()) {
2123
2124
2125
2126 if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2127 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2128 }
2129 return false;
2130 }
2131
2132
2133 if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2134 return false;
2135 }
2136
2137
2138 if (shouldWeStop()) {
2139 lastSourceTransition_ = InputSource::ItemType::IsStop;
2140 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2141 return false;
2142 }
2143
2144 ServiceRegistry::Operate operate(serviceToken_);
2145
2146
2147
2148
2149 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2150
2151
2152
2153
2154 InputSource::ItemType itemType = needToCallNext() ? nextTransitionType() : lastTransitionType();
2155 setNeedToCallNext(true);
2156
2157 if (InputSource::ItemType::IsEvent != itemType) {
2158
2159
2160
2161 if (InputSource::ItemType::IsStop == itemType or InputSource::ItemType::IsLumi == itemType or
2162 (InputSource::ItemType::IsRun == itemType and
2163 (iStatus.lumiPrincipal()->run() != input_->run() or
2164 iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2165 if (itemType == InputSource::ItemType::IsLumi &&
2166 iStatus.lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2167 throw Exception(errors::LogicError)
2168 << "InputSource claimed previous Lumi Entry was last to be merged in this file,\n"
2169 << "but the next lumi entry has the same lumi number.\n"
2170 << "This is probably a bug in the InputSource. Please report to the Core group.\n";
2171 }
2172 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2173 } else {
2174 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2175 }
2176 return false;
2177 }
2178 readEvent(iStreamIndex);
2179 return true;
2180 }
2181
2182 void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2183 if (streamLumiStatus_[iStreamIndex]->haveStartedNextLumiOrEndedRun()) {
2184 streamEndLumiAsync(iTask, iStreamIndex);
2185 return;
2186 }
2187 auto group = iTask.group();
2188 sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2189 CMS_SA_ALLOW try {
2190 auto status = streamLumiStatus_[iStreamIndex].get();
2191 ServiceRegistry::Operate operate(serviceToken_);
2192
2193 if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2194 auto recursionTask =
2195 make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2196 if (iEventException) {
2197 WaitingTaskHolder copyHolder(iTask);
2198 copyHolder.doneWaiting(*iEventException);
2199
2200
2201
2202 }
2203 handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2204 });
2205
2206 processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2207 } else {
2208
2209 if (status->eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi) {
2210 if (not status->haveStartedNextLumiOrEndedRun()) {
2211 status->noMoreEventsInLumi();
2212 status->startNextLumiOrEndRun();
2213 if (lastTransitionType() == InputSource::ItemType::IsLumi && !iTask.taskHasFailed()) {
2214 CMS_SA_ALLOW try {
2215 beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2216 input_->luminosityBlockAuxiliary()->beginTime()),
2217 streamRunStatus_[iStreamIndex],
2218 iTask);
2219 } catch (...) {
2220 WaitingTaskHolder copyHolder(iTask);
2221 copyHolder.doneWaiting(std::current_exception());
2222 endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2223 }
2224 } else {
2225
2226 endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2227 }
2228 }
2229 streamEndLumiAsync(iTask, iStreamIndex);
2230 } else {
2231 assert(status->eventProcessingState() ==
2232 LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2233 auto runStatus = streamRunStatus_[iStreamIndex].get();
2234
2235 if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2236 runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2237 }
2238 }
2239 }
2240 } catch (...) {
2241 WaitingTaskHolder copyHolder(iTask);
2242 copyHolder.doneWaiting(std::current_exception());
2243 handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2244 }
2245 });
2246 }
2247
2248 void EventProcessor::readEvent(unsigned int iStreamIndex) {
2249
2250 auto& event = principalCache_.eventPrincipal(iStreamIndex);
2251 StreamContext streamContext(event.streamID(), &processContext_);
2252
2253 event.possiblyUpdateAfterAddition(preg());
2254
2255 SendSourceTerminationSignalIfException sentry(actReg_.get());
2256 input_->readEvent(event, streamContext);
2257
2258 streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2259 streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2260 sentry.completedSuccessfully();
2261
2262 FDEBUG(1) << "\treadEvent\n";
2263 }
2264
2265 void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2266 iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2267 }
2268
2269 namespace {
2270 struct ClearEventGuard {
2271 ClearEventGuard(edm::ActivityRegistry& iReg, edm::StreamContext const& iContext)
2272 : act_(iReg), context_(iContext) {
2273 iReg.preClearEventSignal_.emit(iContext);
2274 }
2275 ~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
2276 edm::ActivityRegistry& act_;
2277 edm::StreamContext const& context_;
2278 };
2279 }
2280
2281 void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2282 auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2283
2284 ServiceRegistry::Operate operate(serviceToken_);
2285 Service<RandomNumberGenerator> rng;
2286 if (rng.isAvailable()) {
2287 Event ev(*pep, ModuleDescription(), nullptr);
2288 rng->postEventRead(ev);
2289 }
2290
2291 EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2292 using namespace edm::waiting_task::chain;
2293 chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2294 EventTransitionInfo info(*pep, es);
2295 schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2296 }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2297
2298 ServiceRegistry::Operate operateLooper(serviceToken_);
2299 processEventWithLooper(*pep, iStreamIndex);
2300 }) | then([this, pep](auto nextTask) {
2301 FDEBUG(1) << "\tprocessEvent\n";
2302 StreamContext streamContext(pep->streamID(),
2303 StreamContext::Transition::kEvent,
2304 pep->id(),
2305 pep->runPrincipal().index(),
2306 pep->luminosityBlockPrincipal().index(),
2307 pep->time(),
2308 &processContext_);
2309 ClearEventGuard guard(*this->actReg_.get(), streamContext);
2310 pep->clearEventPrincipal();
2311 }) | runLast(iHolder);
2312 }
2313
2314 void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2315 bool randomAccess = input_->randomAccess();
2316 ProcessingController::ForwardState forwardState = input_->forwardState();
2317 ProcessingController::ReverseState reverseState = input_->reverseState();
2318 ProcessingController pc(forwardState, reverseState, randomAccess);
2319
2320 EDLooperBase::Status status = EDLooperBase::kContinue;
2321 do {
2322 StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2323 EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2324 status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2325
2326 bool succeeded = true;
2327 if (randomAccess) {
2328 if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2329 input_->skipEvents(-2);
2330 } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2331 succeeded = input_->goToEvent(pc.specifiedEventTransition());
2332 }
2333 }
2334 pc.setLastOperationSucceeded(succeeded);
2335 } while (!pc.lastOperationSucceeded());
2336 if (status != EDLooperBase::kContinue) {
2337 shouldWeStop_ = true;
2338 }
2339 }
2340
2341 bool EventProcessor::shouldWeStop() const {
2342 FDEBUG(1) << "\tshouldWeStop\n";
2343 if (shouldWeStop_)
2344 return true;
2345 return schedule_->terminate();
2346 }
2347
2348 void EventProcessor::setExceptionMessageFiles(std::string& message) { exceptionMessageFiles_ = message; }
2349
2350 void EventProcessor::setExceptionMessageRuns() { exceptionMessageRuns_ = true; }
2351
2352 void EventProcessor::setExceptionMessageLumis() { exceptionMessageLumis_ = true; }
2353
2354 bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2355 bool expected = false;
2356 if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2357 deferredExceptionPtr_ = iException;
2358 return true;
2359 }
2360 return false;
2361 }
2362
2363 void EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization() const {
2364 cms::Exception ex("ModulesSynchingOnLumis");
2365 ex << "The framework is configured to use at least two streams, but the following modules\n"
2366 << "require synchronizing on LuminosityBlock boundaries:";
2367 bool found = false;
2368 for (auto worker : schedule_->allWorkers()) {
2369 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2370 found = true;
2371 ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2372 }
2373 }
2374 if (found) {
2375 ex << "\n\nThe situation can be fixed by either\n"
2376 << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2377 << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2378 throw ex;
2379 }
2380 }
2381
2382 void EventProcessor::warnAboutModulesRequiringRunSynchronization() const {
2383 std::unique_ptr<LogSystem> s;
2384 for (auto worker : schedule_->allWorkers()) {
2385 if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2386 if (not s) {
2387 s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2388 (*s) << "The following modules require synchronizing on Run boundaries:";
2389 }
2390 (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2391 }
2392 }
2393 }
2394 }