File indexing completed on 2023-01-11 16:27:14
0001 #include "FWCore/Framework/interface/EventProcessor.h"
0002 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0003 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0004 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0005 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0007 #include "DataFormats/Provenance/interface/SubProcessParentageHelper.h"
0008
0009 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0010 #include "FWCore/Framework/src/CommonParams.h"
0011 #include "FWCore/Framework/interface/EDLooperBase.h"
0012 #include "FWCore/Framework/interface/EventPrincipal.h"
0013 #include "FWCore/Framework/interface/EventSetupProvider.h"
0014 #include "FWCore/Framework/interface/EventSetupRecord.h"
0015 #include "FWCore/Framework/interface/FileBlock.h"
0016 #include "FWCore/Framework/interface/HistoryAppender.h"
0017 #include "FWCore/Framework/interface/InputSourceDescription.h"
0018 #include "FWCore/Framework/interface/IOVSyncValue.h"
0019 #include "FWCore/Framework/interface/LooperFactory.h"
0020 #include "FWCore/Framework/interface/LuminosityBlock.h"
0021 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0022 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0023 #include "FWCore/Framework/interface/ModuleChanger.h"
0024 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0025 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0026 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0027 #include "FWCore/Framework/interface/ProcessingController.h"
0028 #include "FWCore/Framework/interface/RunPrincipal.h"
0029 #include "FWCore/Framework/interface/Schedule.h"
0030 #include "FWCore/Framework/interface/ScheduleInfo.h"
0031 #include "FWCore/Framework/interface/ScheduleItems.h"
0032 #include "FWCore/Framework/interface/SubProcess.h"
0033 #include "FWCore/Framework/interface/Event.h"
0034 #include "FWCore/Framework/interface/ESRecordsToProxyIndices.h"
0035 #include "FWCore/Framework/src/Breakpoints.h"
0036 #include "FWCore/Framework/interface/EventSetupsController.h"
0037 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0038 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0039 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0040 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0041 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0042 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0043 #include "FWCore/Framework/interface/TriggerNamesService.h"
0044 #include "FWCore/Framework/src/SendSourceTerminationSignalIfException.h"
0045
0046 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0047
0048 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0049 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
0050 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0051 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0052 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0053 #include "FWCore/ParameterSet/interface/Registry.h"
0054 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0055
0056 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0057 #include "FWCore/ServiceRegistry/interface/Service.h"
0058 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0059 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0060
0061 #include "FWCore/Concurrency/interface/WaitingTask.h"
0062 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0063 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0064 #include "FWCore/Concurrency/interface/chain_first.h"
0065
0066 #include "FWCore/Utilities/interface/Algorithms.h"
0067 #include "FWCore/Utilities/interface/DebugMacros.h"
0068 #include "FWCore/Utilities/interface/EDMException.h"
0069 #include "FWCore/Utilities/interface/Exception.h"
0070 #include "FWCore/Utilities/interface/ConvertException.h"
0071 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0072 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0073 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0074 #include "FWCore/Utilities/interface/StreamID.h"
0075 #include "FWCore/Utilities/interface/RootHandlers.h"
0076 #include "FWCore/Utilities/interface/propagate_const.h"
0077 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0078
0079 #include "MessageForSource.h"
0080 #include "MessageForParent.h"
0081 #include "LuminosityBlockProcessingStatus.h"
0082 #include "RunProcessingStatus.h"
0083
0084 #include "boost/range/adaptor/reversed.hpp"
0085
0086 #include <cassert>
0087 #include <exception>
0088 #include <iomanip>
0089 #include <iostream>
0090 #include <utility>
0091 #include <sstream>
0092
0093 #include <sys/ipc.h>
0094 #include <sys/msg.h>
0095
0096 #include "oneapi/tbb/task.h"
0097
0098
0099 #ifndef __APPLE__
0100 #include <sched.h>
0101 #endif
0102
0103 namespace {
0104 class PauseQueueSentry {
0105 public:
0106 PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
0107 ~PauseQueueSentry() { queue_.resume(); }
0108
0109 private:
0110 edm::SerialTaskQueue& queue_;
0111 };
0112 }
0113
0114 namespace edm {
0115
0116 namespace chain = waiting_task::chain;
0117
0118
0119 std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
0120 ParameterSet& params,
0121 CommonParams const& common,
0122 std::shared_ptr<ProductRegistry> preg,
0123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0124 std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0125 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0126 std::shared_ptr<ActivityRegistry> areg,
0127 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0128 PreallocationConfiguration const& allocations) {
0129 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0130 if (main_input == nullptr) {
0131 throw Exception(errors::Configuration)
0132 << "There must be exactly one source in the configuration.\n"
0133 << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0134 }
0135
0136 std::string modtype(main_input->getParameter<std::string>("@module_type"));
0137
0138 std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0139 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0140 ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0141 filler->fill(descriptions);
0142
0143 try {
0144 convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
0145 } catch (cms::Exception& iException) {
0146 std::ostringstream ost;
0147 ost << "Validating configuration of input source of type " << modtype;
0148 iException.addContext(ost.str());
0149 throw;
0150 }
0151
0152 main_input->registerIt();
0153
0154
0155
0156
0157
0158
0159
0160 ModuleDescription md(main_input->id(),
0161 main_input->getParameter<std::string>("@module_type"),
0162 "source",
0163 processConfiguration.get(),
0164 moduleIndex);
0165
0166 InputSourceDescription isdesc(md,
0167 preg,
0168 branchIDListHelper,
0169 processBlockHelper,
0170 thinnedAssociationsHelper,
0171 areg,
0172 common.maxEventsInput_,
0173 common.maxLumisInput_,
0174 common.maxSecondsUntilRampdown_,
0175 allocations);
0176
0177 areg->preSourceConstructionSignal_(md);
0178 std::unique_ptr<InputSource> input;
0179 try {
0180
0181 std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
0182 convertException::wrap([&]() {
0183 input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
0184 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
0185 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
0186 });
0187 } catch (cms::Exception& iException) {
0188 std::ostringstream ost;
0189 ost << "Constructing input source of type " << modtype;
0190 iException.addContext(ost.str());
0191 throw;
0192 }
0193 return input;
0194 }
0195
0196
0197 std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
0198 eventsetup::EventSetupProvider& cp,
0199 ParameterSet& params,
0200 std::vector<std::string> const& loopers) {
0201 std::shared_ptr<EDLooperBase> vLooper;
0202
0203 assert(1 == loopers.size());
0204
0205 for (auto const& looperName : loopers) {
0206 ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
0207
0208 vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
0209 }
0210 return vLooper;
0211 }
0212
0213
0214 EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0215 ServiceToken const& iToken,
0216 serviceregistry::ServiceLegacy iLegacy,
0217 std::vector<std::string> const& defaultServices,
0218 std::vector<std::string> const& forcedServices)
0219 : actReg_(),
0220 preg_(),
0221 branchIDListHelper_(),
0222 serviceToken_(),
0223 input_(),
0224 moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0225 espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0226 esp_(),
0227 act_table_(),
0228 processConfiguration_(),
0229 schedule_(),
0230 subProcesses_(),
0231 historyAppender_(new HistoryAppender),
0232 fb_(),
0233 looper_(),
0234 deferredExceptionPtrIsSet_(false),
0235 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0236 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0237 principalCache_(),
0238 beginJobCalled_(false),
0239 shouldWeStop_(false),
0240 fileModeNoMerge_(false),
0241 exceptionMessageFiles_(),
0242 exceptionMessageRuns_(false),
0243 exceptionMessageLumis_(false),
0244 forceLooperToEnd_(false),
0245 looperBeginJobRun_(false),
0246 forceESCacheClearOnNewRun_(false),
0247 eventSetupDataToExcludeFromPrefetching_() {
0248 auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0249 processDesc->addServices(defaultServices, forcedServices);
0250 init(processDesc, iToken, iLegacy);
0251 }
0252
0253 EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0254 std::vector<std::string> const& defaultServices,
0255 std::vector<std::string> const& forcedServices)
0256 : actReg_(),
0257 preg_(),
0258 branchIDListHelper_(),
0259 serviceToken_(),
0260 input_(),
0261 moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0262 espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0263 esp_(),
0264 act_table_(),
0265 processConfiguration_(),
0266 schedule_(),
0267 subProcesses_(),
0268 historyAppender_(new HistoryAppender),
0269 fb_(),
0270 looper_(),
0271 deferredExceptionPtrIsSet_(false),
0272 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0273 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0274 principalCache_(),
0275 beginJobCalled_(false),
0276 shouldWeStop_(false),
0277 fileModeNoMerge_(false),
0278 exceptionMessageFiles_(),
0279 exceptionMessageRuns_(false),
0280 exceptionMessageLumis_(false),
0281 forceLooperToEnd_(false),
0282 looperBeginJobRun_(false),
0283 forceESCacheClearOnNewRun_(false),
0284 eventSetupDataToExcludeFromPrefetching_() {
0285 auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0286 processDesc->addServices(defaultServices, forcedServices);
0287 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
0288 }
0289
0290 EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0291 ServiceToken const& token,
0292 serviceregistry::ServiceLegacy legacy)
0293 : actReg_(),
0294 preg_(),
0295 branchIDListHelper_(),
0296 serviceToken_(),
0297 input_(),
0298 moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
0299 espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0300 esp_(),
0301 act_table_(),
0302 processConfiguration_(),
0303 schedule_(),
0304 subProcesses_(),
0305 historyAppender_(new HistoryAppender),
0306 fb_(),
0307 looper_(),
0308 deferredExceptionPtrIsSet_(false),
0309 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0310 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0311 principalCache_(),
0312 beginJobCalled_(false),
0313 shouldWeStop_(false),
0314 fileModeNoMerge_(false),
0315 exceptionMessageFiles_(),
0316 exceptionMessageRuns_(false),
0317 exceptionMessageLumis_(false),
0318 forceLooperToEnd_(false),
0319 looperBeginJobRun_(false),
0320 forceESCacheClearOnNewRun_(false),
0321 eventSetupDataToExcludeFromPrefetching_() {
0322 init(processDesc, token, legacy);
0323 }
0324
0325 void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
0326 ServiceToken const& iToken,
0327 serviceregistry::ServiceLegacy iLegacy) {
0328
0329
0330
0331 ParentageRegistry::instance()->insertMapped(Parentage());
0332
0333
0334 ParameterSet().registerIt();
0335
0336 std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
0337
0338
0339 auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
0340 bool const hasSubProcesses = !subProcessVParameterSet.empty();
0341
0342
0343
0344
0345 validateTopLevelParameterSets(parameterSet.get());
0346
0347
0348 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
0349 auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
0350 if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
0351 throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
0352 << fileMode << ".\n"
0353 << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
0354 } else {
0355 fileModeNoMerge_ = (fileMode == "NOMERGE");
0356 }
0357 forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
0358 ensureAvailableAccelerators(*parameterSet);
0359
0360
0361 unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
0362
0363
0364
0365 assert(nThreads != 0);
0366
0367 unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
0368 if (nStreams == 0) {
0369 nStreams = nThreads;
0370 }
0371 unsigned int nConcurrentLumis =
0372 optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
0373 if (nConcurrentLumis == 0) {
0374 nConcurrentLumis = 2;
0375 }
0376 if (nConcurrentLumis > nStreams) {
0377 nConcurrentLumis = nStreams;
0378 }
0379 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
0380 if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
0381 nConcurrentRuns = nConcurrentLumis;
0382 }
0383 std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
0384 if (!loopers.empty()) {
0385
0386 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
0387 edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
0388 "of concurrent runs, and the number of concurrent lumis "
0389 "are all being reset to 1. Loopers cannot currently support "
0390 "values greater than 1.";
0391 nStreams = 1;
0392 nConcurrentLumis = 1;
0393 nConcurrentRuns = 1;
0394 }
0395 }
0396 bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
0397 if (dumpOptions) {
0398 dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
0399 } else {
0400 if (nThreads > 1 or nStreams > 1) {
0401 edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
0402 }
0403 }
0404
0405
0406
0407
0408
0409
0410 unsigned int maxConcurrentIOVs =
0411 3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
0412
0413 IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
0414
0415 printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
0416 deleteNonConsumedUnscheduledModules_ =
0417 optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
0418
0419
0420 if (not hasSubProcesses) {
0421 branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
0422 }
0423 if (not branchesToDeleteEarly_.empty()) {
0424 auto referencePSets =
0425 optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
0426 for (auto const& pset : referencePSets) {
0427 auto product = pset.getParameter<std::string>("product");
0428 auto references = pset.getParameter<std::vector<std::string>>("references");
0429 for (auto const& ref : references) {
0430 referencesToBranches_.emplace(product, ref);
0431 }
0432 }
0433 modulesToIgnoreForDeleteEarly_ =
0434 optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
0435 }
0436
0437
0438 ScheduleItems items;
0439
0440
0441 auto& serviceSets = processDesc->getServicesPSets();
0442 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
0443 serviceToken_ = items.addCPRandTNS(*parameterSet, token);
0444
0445
0446 ServiceRegistry::Operate operate(serviceToken_);
0447
0448 CMS_SA_ALLOW try {
0449 if (nThreads > 1) {
0450 edm::Service<RootHandlers> handler;
0451 handler->willBeUsingThreads();
0452 }
0453
0454
0455 std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
0456
0457
0458 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
0459 esp_ = espController_->makeProvider(
0460 *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
0461
0462
0463 if (!loopers.empty()) {
0464 looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
0465 looper_->setActionTable(items.act_table_.get());
0466 looper_->attachTo(*items.actReg_);
0467
0468
0469 deleteNonConsumedUnscheduledModules_ = false;
0470 }
0471
0472 preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0473
0474 runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
0475 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
0476 streamQueues_.resize(nStreams);
0477 streamRunStatus_.resize(nStreams);
0478 streamLumiStatus_.resize(nStreams);
0479
0480 processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0481
0482 {
0483 std::optional<ScheduleItems::MadeModules> madeModules;
0484
0485
0486 tbb::task_group group;
0487
0488
0489 auto tempReg = std::make_shared<ProductRegistry>();
0490 auto sourceID = ModuleDescription::getUniqueID();
0491
0492 group.run([&, this]() {
0493
0494 ServiceRegistry::Operate operate(serviceToken_);
0495 auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0496 madeModules =
0497 items.initModules(*parameterSet, tns, preallocations_, &processContext_, moduleTypeResolverMaker_.get());
0498 });
0499
0500 group.run([&, this, tempReg]() {
0501 ServiceRegistry::Operate operate(serviceToken_);
0502 input_ = makeInput(sourceID,
0503 *parameterSet,
0504 *common,
0505 tempReg,
0506 items.branchIDListHelper(),
0507 get_underlying_safe(processBlockHelper_),
0508 items.thinnedAssociationsHelper(),
0509 items.actReg_,
0510 items.processConfiguration(),
0511 preallocations_);
0512 });
0513
0514 group.wait();
0515 items.preg()->addFromInput(*tempReg);
0516 input_->switchTo(items.preg());
0517
0518 {
0519 auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0520 schedule_ = items.finishSchedule(std::move(*madeModules),
0521 *parameterSet,
0522 tns,
0523 hasSubProcesses,
0524 preallocations_,
0525 &processContext_,
0526 *processBlockHelper_);
0527 }
0528 }
0529
0530
0531 act_table_ = std::move(items.act_table_);
0532 actReg_ = items.actReg_;
0533 preg_ = items.preg();
0534 mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0535 branchIDListHelper_ = items.branchIDListHelper();
0536 thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0537 processConfiguration_ = items.processConfiguration();
0538 processContext_.setProcessConfiguration(processConfiguration_.get());
0539
0540 FDEBUG(2) << parameterSet << std::endl;
0541
0542 principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0543 for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0544
0545 auto ep = std::make_shared<EventPrincipal>(preg(),
0546 branchIDListHelper(),
0547 thinnedAssociationsHelper(),
0548 *processConfiguration_,
0549 historyAppender_.get(),
0550 index,
0551 true ,
0552 &*processBlockHelper_);
0553 principalCache_.insert(std::move(ep));
0554 }
0555
0556 for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0557 auto rp = std::make_unique<RunPrincipal>(
0558 preg(), *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_);
0559 principalCache_.insert(std::move(rp));
0560 }
0561
0562 for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0563 auto lp =
0564 std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
0565 principalCache_.insert(std::move(lp));
0566 }
0567
0568 {
0569 auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0570 principalCache_.insert(std::move(pb));
0571
0572 auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0573 principalCache_.insertForInput(std::move(pbForInput));
0574 }
0575
0576
0577 subProcesses_.reserve(subProcessVParameterSet.size());
0578 for (auto& subProcessPSet : subProcessVParameterSet) {
0579 subProcesses_.emplace_back(subProcessPSet,
0580 *parameterSet,
0581 preg(),
0582 branchIDListHelper(),
0583 *processBlockHelper_,
0584 *thinnedAssociationsHelper_,
0585 SubProcessParentageHelper(),
0586 *espController_,
0587 *actReg_,
0588 token,
0589 serviceregistry::kConfigurationOverrides,
0590 preallocations_,
0591 &processContext_,
0592 moduleTypeResolverMaker_);
0593 }
0594 } catch (...) {
0595
0596
0597 espController_ = nullptr;
0598 esp_ = nullptr;
0599 schedule_ = nullptr;
0600 input_ = nullptr;
0601 looper_ = nullptr;
0602 actReg_ = nullptr;
0603 throw;
0604 }
0605 }
0606
0607 EventProcessor::~EventProcessor() {
0608
0609 ServiceToken token = getToken();
0610 ServiceRegistry::Operate op(token);
0611
0612
0613
0614 espController_ = nullptr;
0615 esp_ = nullptr;
0616 schedule_ = nullptr;
0617 input_ = nullptr;
0618 looper_ = nullptr;
0619 actReg_ = nullptr;
0620
0621 pset::Registry::instance()->clear();
0622 ParentageRegistry::instance()->clear();
0623 }
0624
0625 void EventProcessor::taskCleanup() {
0626 edm::FinalWaitingTask task{taskGroup_};
0627 espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
0628 task.waitNoThrow();
0629 assert(task.done());
0630 }
0631
0632 void EventProcessor::beginJob() {
0633 if (beginJobCalled_)
0634 return;
0635 beginJobCalled_ = true;
0636 bk::beginJob();
0637
0638
0639
0640 ServiceRegistry::Operate operate(serviceToken_);
0641
0642 service::SystemBounds bounds(preallocations_.numberOfStreams(),
0643 preallocations_.numberOfLuminosityBlocks(),
0644 preallocations_.numberOfRuns(),
0645 preallocations_.numberOfThreads());
0646 actReg_->preallocateSignal_(bounds);
0647 schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0648 pathsAndConsumesOfModules_.initialize(schedule_.get(), preg());
0649
0650 std::vector<ModuleProcessName> consumedBySubProcesses;
0651 for_all(subProcesses_,
0652 [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
0653 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
0654 if (consumedBySubProcesses.empty()) {
0655 consumedBySubProcesses = std::move(c);
0656 } else if (not c.empty()) {
0657 std::vector<ModuleProcessName> tmp;
0658 tmp.reserve(consumedBySubProcesses.size() + c.size());
0659 std::merge(consumedBySubProcesses.begin(),
0660 consumedBySubProcesses.end(),
0661 c.begin(),
0662 c.end(),
0663 std::back_inserter(tmp));
0664 std::swap(consumedBySubProcesses, tmp);
0665 }
0666 });
0667
0668
0669 checkForModuleDependencyCorrectness(pathsAndConsumesOfModules_, printDependencies_);
0670 if (deleteNonConsumedUnscheduledModules_) {
0671 if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
0672 not unusedModules.empty()) {
0673 pathsAndConsumesOfModules_.removeModules(unusedModules);
0674
0675 edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
0676 l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
0677 "and "
0678 "therefore they are deleted before beginJob transition.";
0679 for (auto const& description : unusedModules) {
0680 l << "\n " << description->moduleLabel();
0681 }
0682 });
0683 for (auto const& description : unusedModules) {
0684 schedule_->deleteModule(description->moduleLabel(), actReg_.get());
0685 }
0686 }
0687 }
0688
0689
0690
0691 if (not branchesToDeleteEarly_.empty()) {
0692 auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
0693 auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
0694 auto referencesToBranches = std::move(referencesToBranches_);
0695 schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
0696 }
0697
0698 actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
0699
0700 if (preallocations_.numberOfLuminosityBlocks() > 1) {
0701 throwAboutModulesRequiringLuminosityBlockSynchronization();
0702 }
0703 if (preallocations_.numberOfRuns() > 1) {
0704 warnAboutModulesRequiringRunSynchronization();
0705 }
0706 warnAboutLegacyModules();
0707
0708
0709
0710
0711
0712
0713
0714
0715
0716
0717
0718
0719
0720
0721
0722 try {
0723 convertException::wrap([&]() { input_->doBeginJob(); });
0724 } catch (cms::Exception& ex) {
0725 ex.addContext("Calling beginJob for the source");
0726 throw;
0727 }
0728 espController_->finishConfiguration();
0729 schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
0730 if (looper_) {
0731 constexpr bool mustPrefetchMayGet = true;
0732 auto const processBlockLookup = preg_->productLookup(InProcess);
0733 auto const runLookup = preg_->productLookup(InRun);
0734 auto const lumiLookup = preg_->productLookup(InLumi);
0735 auto const eventLookup = preg_->productLookup(InEvent);
0736 looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
0737 looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
0738 looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
0739 looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
0740 looper_->updateLookup(esp_->recordsToProxyIndices());
0741 }
0742
0743 for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
0744 actReg_->postBeginJobSignal_();
0745
0746 oneapi::tbb::task_group group;
0747 FinalWaitingTask last{group};
0748 using namespace edm::waiting_task::chain;
0749 first([this](auto nextTask) {
0750 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0751 first([i, this](auto nextTask) {
0752 ServiceRegistry::Operate operate(serviceToken_);
0753 schedule_->beginStream(i);
0754 }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
0755 ServiceRegistry::Operate operate(serviceToken_);
0756 for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
0757 }) | lastTask(nextTask);
0758 }
0759 }) | runLast(WaitingTaskHolder(group, &last));
0760 last.wait();
0761 }
0762
0763 void EventProcessor::endJob() {
0764
0765 ExceptionCollector c(
0766 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0767
0768
0769 ServiceRegistry::Operate operate(serviceToken_);
0770
0771 using namespace edm::waiting_task::chain;
0772
0773 oneapi::tbb::task_group group;
0774 edm::FinalWaitingTask waitTask{group};
0775
0776 {
0777
0778 edm::WaitingTaskHolder taskHolder(group, &waitTask);
0779 std::mutex collectorMutex;
0780 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0781 first([this, i, &c, &collectorMutex](auto nextTask) {
0782 std::exception_ptr ep;
0783 try {
0784 ServiceRegistry::Operate operate(serviceToken_);
0785 this->schedule_->endStream(i);
0786 } catch (...) {
0787 ep = std::current_exception();
0788 }
0789 if (ep) {
0790 std::lock_guard<std::mutex> l(collectorMutex);
0791 c.call([&ep]() { std::rethrow_exception(ep); });
0792 }
0793 }) | then([this, i, &c, &collectorMutex](auto nextTask) {
0794 for (auto& subProcess : subProcesses_) {
0795 first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
0796 std::exception_ptr ep;
0797 try {
0798 ServiceRegistry::Operate operate(serviceToken_);
0799 subProcess.doEndStream(i);
0800 } catch (...) {
0801 ep = std::current_exception();
0802 }
0803 if (ep) {
0804 std::lock_guard<std::mutex> l(collectorMutex);
0805 c.call([&ep]() { std::rethrow_exception(ep); });
0806 }
0807 }) | lastTask(nextTask);
0808 }
0809 }) | lastTask(taskHolder);
0810 }
0811 }
0812 waitTask.waitNoThrow();
0813
0814 auto actReg = actReg_.get();
0815 c.call([actReg]() { actReg->preEndJobSignal_(); });
0816 schedule_->endJob(c);
0817 for (auto& subProcess : subProcesses_) {
0818 c.call(std::bind(&SubProcess::doEndJob, &subProcess));
0819 }
0820 c.call(std::bind(&InputSource::doEndJob, input_.get()));
0821 if (looper_) {
0822 c.call(std::bind(&EDLooperBase::endOfJob, looper()));
0823 }
0824 c.call([actReg]() { actReg->postEndJobSignal_(); });
0825 if (c.hasThrown()) {
0826 c.rethrow();
0827 }
0828 }
0829
0830 ServiceToken EventProcessor::getToken() { return serviceToken_; }
0831
0832 std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
0833 return schedule_->getAllModuleDescriptions();
0834 }
0835
0836 int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
0837
0838 int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
0839
0840 int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
0841
0842 void EventProcessor::clearCounters() { schedule_->clearCounters(); }
0843
0844 namespace {
0845 #include "TransitionProcessors.icc"
0846 }
0847
0848 bool EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
0849 bool returnValue = false;
0850
0851
0852 if (shutdown_flag.load(std::memory_order_acquire)) {
0853 returnValue = true;
0854 returnCode = epSignal;
0855 }
0856 return returnValue;
0857 }
0858
0859 InputSource::ItemType EventProcessor::nextTransitionType() {
0860 SendSourceTerminationSignalIfException sentry(actReg_.get());
0861 InputSource::ItemType itemType;
0862
0863 do {
0864 itemType = input_->nextItemType();
0865 } while (itemType == InputSource::IsSynchronize);
0866
0867 lastSourceTransition_ = itemType;
0868 sentry.completedSuccessfully();
0869
0870 StatusCode returnCode = epSuccess;
0871
0872 if (checkForAsyncStopRequest(returnCode)) {
0873 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
0874 lastSourceTransition_ = InputSource::IsStop;
0875 }
0876
0877 return lastSourceTransition_;
0878 }
0879
0880 EventProcessor::StatusCode EventProcessor::runToCompletion() {
0881 beginJob();
0882
0883
0884 ServiceRegistry::Operate operate(serviceToken_);
0885
0886 try {
0887 FilesProcessor fp(fileModeNoMerge_);
0888
0889 convertException::wrap([&]() {
0890 bool firstTime = true;
0891 do {
0892 if (not firstTime) {
0893 prepareForNextLoop();
0894 rewindInput();
0895 } else {
0896 firstTime = false;
0897 }
0898 startingNewLoop();
0899
0900 auto trans = fp.processFiles(*this);
0901
0902 fp.normalEnd();
0903
0904 if (deferredExceptionPtrIsSet_.load()) {
0905 std::rethrow_exception(deferredExceptionPtr_);
0906 }
0907 if (trans != InputSource::IsStop) {
0908
0909 doErrorStuff();
0910
0911 throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
0912 }
0913 } while (not endOfLoop());
0914 });
0915
0916 }
0917 catch (cms::Exception& e) {
0918 if (exceptionMessageLumis_) {
0919 std::string message(
0920 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
0921 e.addAdditionalInfo(message);
0922 if (e.alreadyPrinted()) {
0923 LogAbsolute("Additional Exceptions") << message;
0924 }
0925 }
0926 if (exceptionMessageRuns_) {
0927 std::string message(
0928 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
0929 e.addAdditionalInfo(message);
0930 if (e.alreadyPrinted()) {
0931 LogAbsolute("Additional Exceptions") << message;
0932 }
0933 }
0934 if (!exceptionMessageFiles_.empty()) {
0935 e.addAdditionalInfo(exceptionMessageFiles_);
0936 if (e.alreadyPrinted()) {
0937 LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
0938 }
0939 }
0940 throw;
0941 }
0942 return epSuccess;
0943 }
0944
0945 void EventProcessor::readFile() {
0946 FDEBUG(1) << " \treadFile\n";
0947 size_t size = preg_->size();
0948 SendSourceTerminationSignalIfException sentry(actReg_.get());
0949
0950 if (streamRunActive_ > 0) {
0951 streamRunStatus_[0]->runPrincipal()->preReadFile();
0952 streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
0953 }
0954
0955 if (streamLumiActive_ > 0) {
0956 streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
0957 }
0958
0959 fb_ = input_->readFile();
0960 if (size < preg_->size()) {
0961 principalCache_.adjustIndexesAfterProductRegistryAddition();
0962 }
0963 principalCache_.adjustEventsToNewProductRegistry(preg());
0964 if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
0965 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
0966 }
0967 sentry.completedSuccessfully();
0968 }
0969
0970 void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
0971 if (fileBlockValid()) {
0972 SendSourceTerminationSignalIfException sentry(actReg_.get());
0973 input_->closeFile(fb_.get(), cleaningUpAfterException);
0974 sentry.completedSuccessfully();
0975 }
0976 FDEBUG(1) << "\tcloseInputFile\n";
0977 }
0978
0979 void EventProcessor::openOutputFiles() {
0980 if (fileBlockValid()) {
0981 schedule_->openOutputFiles(*fb_);
0982 for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
0983 }
0984 FDEBUG(1) << "\topenOutputFiles\n";
0985 }
0986
0987 void EventProcessor::closeOutputFiles() {
0988 schedule_->closeOutputFiles();
0989 for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
0990 processBlockHelper_->clearAfterOutputFilesClose();
0991 FDEBUG(1) << "\tcloseOutputFiles\n";
0992 }
0993
0994 void EventProcessor::respondToOpenInputFile() {
0995 if (fileBlockValid()) {
0996 for_all(subProcesses_,
0997 [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
0998 schedule_->respondToOpenInputFile(*fb_);
0999 for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1000 }
1001 FDEBUG(1) << "\trespondToOpenInputFile\n";
1002 }
1003
1004 void EventProcessor::respondToCloseInputFile() {
1005 if (fileBlockValid()) {
1006 schedule_->respondToCloseInputFile(*fb_);
1007 for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1008 }
1009 FDEBUG(1) << "\trespondToCloseInputFile\n";
1010 }
1011
1012 void EventProcessor::startingNewLoop() {
1013 shouldWeStop_ = false;
1014
1015
1016 if (looper_ && looperBeginJobRun_) {
1017 looper_->doStartingNewLoop();
1018 }
1019 FDEBUG(1) << "\tstartingNewLoop\n";
1020 }
1021
1022 bool EventProcessor::endOfLoop() {
1023 if (looper_) {
1024 ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
1025 looper_->setModuleChanger(&changer);
1026 EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1027 looper_->setModuleChanger(nullptr);
1028 if (status != EDLooperBase::kContinue || forceLooperToEnd_)
1029 return true;
1030 else
1031 return false;
1032 }
1033 FDEBUG(1) << "\tendOfLoop\n";
1034 return true;
1035 }
1036
1037 void EventProcessor::rewindInput() {
1038 input_->repeat();
1039 input_->rewind();
1040 FDEBUG(1) << "\trewind\n";
1041 }
1042
1043 void EventProcessor::prepareForNextLoop() {
1044 looper_->prepareForNextLoop(esp_.get());
1045 FDEBUG(1) << "\tprepareForNextLoop\n";
1046 }
1047
1048 bool EventProcessor::shouldWeCloseOutput() const {
1049 FDEBUG(1) << "\tshouldWeCloseOutput\n";
1050 if (!subProcesses_.empty()) {
1051 for (auto const& subProcess : subProcesses_) {
1052 if (subProcess.shouldWeCloseOutput()) {
1053 return true;
1054 }
1055 }
1056 return false;
1057 }
1058 return schedule_->shouldWeCloseOutput();
1059 }
1060
1061 void EventProcessor::doErrorStuff() {
1062 FDEBUG(1) << "\tdoErrorStuff\n";
1063 LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1064 << "and went to the error state\n"
1065 << "Will attempt to terminate processing normally\n"
1066 << "(IF using the looper the next loop will be attempted)\n"
1067 << "This likely indicates a bug in an input module or corrupted input or both\n";
1068 }
1069
1070 void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1071 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1072 processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1073
1074 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1075 FinalWaitingTask globalWaitTask{taskGroup_};
1076
1077 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1078 beginGlobalTransitionAsync<Traits>(
1079 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1080
1081 globalWaitTask.wait();
1082 beginProcessBlockSucceeded = true;
1083 }
1084
1085 void EventProcessor::inputProcessBlocks() {
1086 input_->fillProcessBlockHelper();
1087 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1088 while (input_->nextProcessBlock(processBlockPrincipal)) {
1089 readProcessBlock(processBlockPrincipal);
1090
1091 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1092 FinalWaitingTask globalWaitTask{taskGroup_};
1093
1094 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1095 beginGlobalTransitionAsync<Traits>(
1096 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1097
1098 globalWaitTask.wait();
1099
1100 FinalWaitingTask writeWaitTask{taskGroup_};
1101 writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::Input);
1102 writeWaitTask.wait();
1103
1104 processBlockPrincipal.clearPrincipal();
1105 for (auto& s : subProcesses_) {
1106 s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1107 }
1108 }
1109 }
1110
1111 void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1112 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1113
1114 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1115 FinalWaitingTask globalWaitTask{taskGroup_};
1116
1117 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1118 endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1119 *schedule_,
1120 transitionInfo,
1121 serviceToken_,
1122 subProcesses_,
1123 cleaningUpAfterException);
1124 globalWaitTask.wait();
1125
1126 if (beginProcessBlockSucceeded) {
1127 FinalWaitingTask writeWaitTask{taskGroup_};
1128 writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::New);
1129 writeWaitTask.wait();
1130 }
1131
1132 processBlockPrincipal.clearPrincipal();
1133 for (auto& s : subProcesses_) {
1134 s.clearProcessBlockPrincipal(ProcessBlockType::New);
1135 }
1136 }
1137
1138 InputSource::ItemType EventProcessor::processRuns() {
1139 FinalWaitingTask waitTask{taskGroup_};
1140 assert(lastTransitionType() == InputSource::IsRun);
1141 if (streamRunActive_ == 0) {
1142 assert(streamLumiActive_ == 0);
1143
1144 beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1145 WaitingTaskHolder{taskGroup_, &waitTask});
1146 } else {
1147 assert(streamRunActive_ == preallocations_.numberOfStreams());
1148
1149 auto runStatus = streamRunStatus_[0];
1150
1151 while (lastTransitionType() == InputSource::IsRun and runStatus->runPrincipal()->run() == input_->run() and
1152 runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1153 readAndMergeRun(*runStatus);
1154 nextTransitionType();
1155 }
1156
1157 WaitingTaskHolder holder{taskGroup_, &waitTask};
1158 runStatus->setHolderOfTaskInProcessRuns(holder);
1159 if (streamLumiActive_ > 0) {
1160 assert(streamLumiActive_ == preallocations_.numberOfStreams());
1161 continueLumiAsync(std::move(holder));
1162 } else {
1163 handleNextItemAfterMergingRunEntries(std::move(runStatus), std::move(holder));
1164 }
1165 }
1166 waitTask.wait();
1167 return lastTransitionType();
1168 }
1169
1170 void EventProcessor::beginRunAsync(IOVSyncValue const& iSync, WaitingTaskHolder iHolder) {
1171 if (iHolder.taskHasFailed()) {
1172 return;
1173 }
1174
1175 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1176
1177 auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1178
1179 chain::first([this, &status, &iSync](auto nextTask) {
1180 espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1181 nextTask,
1182 status->endIOVWaitingTasks(),
1183 status->eventSetupImpls(),
1184 queueWhichWaitsForIOVsToFinish_,
1185 actReg_.get(),
1186 serviceToken_,
1187 forceESCacheClearOnNewRun_);
1188 }) | chain::then([this, status, iSync](std::exception_ptr const* iException, auto nextTask) {
1189 CMS_SA_ALLOW try {
1190 if (iException) {
1191 WaitingTaskHolder copyHolder(nextTask);
1192 copyHolder.doneWaiting(*iException);
1193
1194 }
1195 ServiceRegistry::Operate operate(serviceToken_);
1196 actReg_->postESSyncIOVSignal_.emit(iSync);
1197
1198 runQueue_->pushAndPause(
1199 *nextTask.group(),
1200 [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1201 CMS_SA_ALLOW try {
1202 if (postRunQueueTask.taskHasFailed()) {
1203 status->resetBeginResources();
1204 queueWhichWaitsForIOVsToFinish_.resume();
1205 return;
1206 }
1207
1208 status->setResumer(std::move(iResumer));
1209
1210 sourceResourcesAcquirer_.serialQueueChain().push(
1211 *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1212 CMS_SA_ALLOW try {
1213 ServiceRegistry::Operate operate(serviceToken_);
1214
1215 if (postSourceTask.taskHasFailed()) {
1216 status->resetBeginResources();
1217 queueWhichWaitsForIOVsToFinish_.resume();
1218 status->resumeGlobalRunQueue();
1219 return;
1220 }
1221
1222 status->setRunPrincipal(readRun());
1223
1224 RunPrincipal& runPrincipal = *status->runPrincipal();
1225 {
1226 SendSourceTerminationSignalIfException sentry(actReg_.get());
1227 input_->doBeginRun(runPrincipal, &processContext_);
1228 sentry.completedSuccessfully();
1229 }
1230
1231 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1232 if (looper_ && looperBeginJobRun_ == false) {
1233 looper_->copyInfo(ScheduleInfo(schedule_.get()));
1234
1235 oneapi::tbb::task_group group;
1236 FinalWaitingTask waitTask{group};
1237 using namespace edm::waiting_task::chain;
1238 chain::first([this, &es](auto nextTask) {
1239 looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1240 }) | then([this, &es](auto nextTask) mutable {
1241 looper_->beginOfJob(es);
1242 looperBeginJobRun_ = true;
1243 looper_->doStartingNewLoop();
1244 }) | runLast(WaitingTaskHolder(group, &waitTask));
1245 waitTask.wait();
1246 }
1247
1248 using namespace edm::waiting_task::chain;
1249 chain::first([this, status](auto nextTask) mutable {
1250 CMS_SA_ALLOW try { readAndMergeRunEntriesAsync(std::move(status), nextTask); } catch (...) {
1251 status->setStopBeforeProcessingRun(true);
1252 nextTask.doneWaiting(std::current_exception());
1253 }
1254 }) | then([this, status, &es](auto nextTask) {
1255 if (status->stopBeforeProcessingRun()) {
1256 return;
1257 }
1258 RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1259 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1260 beginGlobalTransitionAsync<Traits>(
1261 nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1262 }) | then([status](auto nextTask) mutable {
1263 if (status->stopBeforeProcessingRun()) {
1264 return;
1265 }
1266 status->globalBeginDidSucceed();
1267 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1268 if (status->stopBeforeProcessingRun()) {
1269 return;
1270 }
1271 looper_->prefetchAsync(
1272 nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1273 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1274 if (status->stopBeforeProcessingRun()) {
1275 return;
1276 }
1277 ServiceRegistry::Operate operateLooper(serviceToken_);
1278 looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1279 }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1280 bool precedingTasksSucceeded = true;
1281 if (iException) {
1282 precedingTasksSucceeded = false;
1283 WaitingTaskHolder copyHolder(holder);
1284 copyHolder.doneWaiting(*iException);
1285 }
1286
1287 if (status->stopBeforeProcessingRun()) {
1288
1289 status->resetBeginResources();
1290 queueWhichWaitsForIOVsToFinish_.resume();
1291 status->resumeGlobalRunQueue();
1292 return;
1293 }
1294 CMS_SA_ALLOW try {
1295
1296
1297 auto globalEndRunTask =
1298 edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1299 WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1300 status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1301 globalEndRunAsync(std::move(taskHolder), std::move(status));
1302 });
1303 status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1304 } catch (...) {
1305 status->resetBeginResources();
1306 queueWhichWaitsForIOVsToFinish_.resume();
1307 status->resumeGlobalRunQueue();
1308 holder.doneWaiting(std::current_exception());
1309 return;
1310 }
1311
1312
1313
1314 ServiceRegistry::Operate operate(serviceToken_);
1315
1316
1317
1318
1319 PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1320
1321 CMS_SA_ALLOW try {
1322 streamQueuesInserter_.push(
1323 *holder.group(), [this, status, precedingTasksSucceeded, holder]() mutable {
1324 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1325 CMS_SA_ALLOW try {
1326 streamQueues_[i].push(
1327 *holder.group(),
1328 [this, i, status, precedingTasksSucceeded, holder]() mutable {
1329 streamBeginRunAsync(
1330 i, std::move(status), precedingTasksSucceeded, std::move(holder));
1331 });
1332 } catch (...) {
1333 if (status->streamFinishedBeginRun()) {
1334 WaitingTaskHolder copyHolder(holder);
1335 copyHolder.doneWaiting(std::current_exception());
1336 status->resetBeginResources();
1337 queueWhichWaitsForIOVsToFinish_.resume();
1338 exceptionRunStatus_ = status;
1339 }
1340 }
1341 }
1342 });
1343 } catch (...) {
1344 WaitingTaskHolder copyHolder(holder);
1345 copyHolder.doneWaiting(std::current_exception());
1346 status->resetBeginResources();
1347 queueWhichWaitsForIOVsToFinish_.resume();
1348 exceptionRunStatus_ = status;
1349 }
1350 handleNextItemAfterMergingRunEntries(status, holder);
1351 }) | runLast(postSourceTask);
1352 } catch (...) {
1353 status->resetBeginResources();
1354 queueWhichWaitsForIOVsToFinish_.resume();
1355 status->resumeGlobalRunQueue();
1356 postSourceTask.doneWaiting(std::current_exception());
1357 }
1358 });
1359 } catch (...) {
1360 status->resetBeginResources();
1361 queueWhichWaitsForIOVsToFinish_.resume();
1362 status->resumeGlobalRunQueue();
1363 postRunQueueTask.doneWaiting(std::current_exception());
1364 }
1365 });
1366 } catch (...) {
1367 status->resetBeginResources();
1368 queueWhichWaitsForIOVsToFinish_.resume();
1369 nextTask.doneWaiting(std::current_exception());
1370 }
1371 }) | chain::runLast(std::move(iHolder));
1372 }
1373
1374 void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1375 std::shared_ptr<RunProcessingStatus> status,
1376 bool precedingTasksSucceeded,
1377 WaitingTaskHolder iHolder) {
1378
1379 streamQueues_[iStream].pause();
1380 ++streamRunActive_;
1381 streamRunStatus_[iStream] = std::move(status);
1382
1383 CMS_SA_ALLOW try {
1384 using namespace edm::waiting_task::chain;
1385 chain::first([this, iStream, precedingTasksSucceeded](auto nextTask) {
1386 if (precedingTasksSucceeded) {
1387 RunProcessingStatus& rs = *streamRunStatus_[iStream];
1388 RunTransitionInfo transitionInfo(
1389 *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1390 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1391 beginStreamTransitionAsync<Traits>(
1392 std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1393 }
1394 }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1395 if (exceptionFromBeginStreamRun) {
1396 nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1397 }
1398 releaseBeginRunResources(iStream);
1399 }) | runLast(iHolder);
1400 } catch (...) {
1401 releaseBeginRunResources(iStream);
1402 iHolder.doneWaiting(std::current_exception());
1403 }
1404 }
1405
1406 void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1407 auto& status = streamRunStatus_[iStream];
1408 if (status->streamFinishedBeginRun()) {
1409 status->resetBeginResources();
1410 queueWhichWaitsForIOVsToFinish_.resume();
1411 }
1412 streamQueues_[iStream].resume();
1413 }
1414
1415 void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1416 RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1417 iRunStatus->setEndTime();
1418 IOVSyncValue ts(
1419 EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1420 runPrincipal.endTime());
1421 CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1422 WaitingTaskHolder copyHolder(iHolder);
1423 copyHolder.doneWaiting(std::current_exception());
1424 }
1425
1426 chain::first([this, &iRunStatus, &ts](auto nextTask) {
1427 espController_->runOrQueueEventSetupForInstanceAsync(ts,
1428 nextTask,
1429 iRunStatus->endIOVWaitingTasksEndRun(),
1430 iRunStatus->eventSetupImplsEndRun(),
1431 queueWhichWaitsForIOVsToFinish_,
1432 actReg_.get(),
1433 serviceToken_);
1434 }) | chain::then([this, iRunStatus, ts](std::exception_ptr const* iException, auto nextTask) {
1435 if (iException) {
1436 iRunStatus->setEndingEventSetupSucceeded(false);
1437 handleEndRunExceptions(*iException, nextTask);
1438 }
1439 ServiceRegistry::Operate operate(serviceToken_);
1440 CMS_SA_ALLOW try { actReg_->postESSyncIOVSignal_.emit(ts); } catch (...) {
1441 WaitingTaskHolder copyHolder(nextTask);
1442 copyHolder.doneWaiting(std::current_exception());
1443 }
1444
1445 streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1446 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1447 CMS_SA_ALLOW try {
1448 streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1449 streamQueues_[i].pause();
1450 streamEndRunAsync(std::move(nextTask), i);
1451 });
1452 } catch (...) {
1453 WaitingTaskHolder copyHolder(nextTask);
1454 copyHolder.doneWaiting(std::current_exception());
1455 }
1456 }
1457 });
1458
1459 if (lastTransitionType() == InputSource::IsRun) {
1460 CMS_SA_ALLOW try {
1461 beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1462 } catch (...) {
1463 WaitingTaskHolder copyHolder(nextTask);
1464 copyHolder.doneWaiting(std::current_exception());
1465 }
1466 }
1467 }) | chain::runLast(std::move(iHolder));
1468 }
1469
1470 void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1471 if (holder.taskHasFailed()) {
1472 setExceptionMessageRuns();
1473 } else {
1474 WaitingTaskHolder tmp(holder);
1475 tmp.doneWaiting(iException);
1476 }
1477 }
1478
1479 void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1480 auto& runPrincipal = *(iRunStatus->runPrincipal());
1481 bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1482 bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1483 EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1484 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1485 bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1486
1487 MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1488 using namespace edm::waiting_task::chain;
1489 chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1490 auto nextTask) {
1491 if (endingEventSetupSucceeded) {
1492 RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1493 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1494 endGlobalTransitionAsync<Traits>(
1495 std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1496 }
1497 }) |
1498 ifThen(looper_ && endingEventSetupSucceeded,
1499 [this, &runPrincipal, &es](auto nextTask) {
1500 looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1501 }) |
1502 ifThen(looper_ && endingEventSetupSucceeded,
1503 [this, &runPrincipal, &es](auto nextTask) {
1504 ServiceRegistry::Operate operate(serviceToken_);
1505 looper_->doEndRun(runPrincipal, es, &processContext_);
1506 }) |
1507 ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1508 [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1509 mergeableRunProductMetadata->preWriteRun();
1510 writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1511 }) |
1512 then([status = std::move(iRunStatus),
1513 this,
1514 didGlobalBeginSucceed,
1515 mergeableRunProductMetadata,
1516 endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1517 if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1518 mergeableRunProductMetadata->postWriteRun();
1519 }
1520 if (iException) {
1521 handleEndRunExceptions(*iException, nextTask);
1522 }
1523 ServiceRegistry::Operate operate(serviceToken_);
1524
1525 std::exception_ptr ptr;
1526
1527
1528
1529
1530 CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1531 if (not ptr) {
1532 ptr = std::current_exception();
1533 }
1534 }
1535 CMS_SA_ALLOW try {
1536 status->resumeGlobalRunQueue();
1537 queueWhichWaitsForIOVsToFinish_.resume();
1538 } catch (...) {
1539 if (not ptr) {
1540 ptr = std::current_exception();
1541 }
1542 }
1543 CMS_SA_ALLOW try {
1544 status->resetEndResources();
1545 status.reset();
1546 } catch (...) {
1547 if (not ptr) {
1548 ptr = std::current_exception();
1549 }
1550 }
1551
1552 if (ptr && !iException) {
1553 handleEndRunExceptions(ptr, nextTask);
1554 }
1555 }) |
1556 runLast(std::move(iTask));
1557 }
1558
1559 void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1560 CMS_SA_ALLOW try {
1561 if (!streamRunStatus_[iStreamIndex]) {
1562 if (exceptionRunStatus_->streamFinishedRun()) {
1563 exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1564 exceptionRunStatus_.reset();
1565 }
1566 return;
1567 }
1568
1569 auto runDoneTask =
1570 edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1571 if (iException) {
1572 handleEndRunExceptions(*iException, iTask);
1573 }
1574
1575 auto runStatus = streamRunStatus_[iStreamIndex];
1576
1577
1578 if (runStatus->streamFinishedRun()) {
1579 runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1580 }
1581 streamRunStatus_[iStreamIndex].reset();
1582 --streamRunActive_;
1583 streamQueues_[iStreamIndex].resume();
1584 });
1585
1586 WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1587
1588 auto runStatus = streamRunStatus_[iStreamIndex].get();
1589
1590 if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1591 EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1592 auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1593 bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1594
1595 auto& runPrincipal = *runStatus->runPrincipal();
1596 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1597 RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1598 endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1599 *schedule_,
1600 iStreamIndex,
1601 transitionInfo,
1602 serviceToken_,
1603 subProcesses_,
1604 cleaningUpAfterException);
1605 }
1606 } catch (...) {
1607 handleEndRunExceptions(std::current_exception(), iTask);
1608 }
1609 }
1610
1611 void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1612 if (streamRunActive_ > 0) {
1613 FinalWaitingTask waitTask{taskGroup_};
1614
1615 auto runStatus = streamRunStatus_[0].get();
1616 runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1617 WaitingTaskHolder holder{taskGroup_, &waitTask};
1618 runStatus->setHolderOfTaskInProcessRuns(holder);
1619 lastSourceTransition_ = InputSource::IsStop;
1620 endRunAsync(streamRunStatus_[0], std::move(holder));
1621 waitTask.wait();
1622 }
1623 }
1624
1625 void EventProcessor::beginLumiAsync(IOVSyncValue const& iSync,
1626 std::shared_ptr<RunProcessingStatus> iRunStatus,
1627 edm::WaitingTaskHolder iHolder) {
1628 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1629
1630 auto status = std::make_shared<LuminosityBlockProcessingStatus>(preallocations_.numberOfStreams());
1631 chain::first([this, &iSync, &status](auto nextTask) {
1632 espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1633 nextTask,
1634 status->endIOVWaitingTasks(),
1635 status->eventSetupImpls(),
1636 queueWhichWaitsForIOVsToFinish_,
1637 actReg_.get(),
1638 serviceToken_);
1639 }) | chain::then([this, status, iRunStatus, iSync](std::exception_ptr const* iException, auto nextTask) {
1640 CMS_SA_ALLOW try {
1641
1642 if (iException) {
1643 WaitingTaskHolder copyHolder(nextTask);
1644 copyHolder.doneWaiting(*iException);
1645 }
1646
1647 ServiceRegistry::Operate operate(serviceToken_);
1648 actReg_->postESSyncIOVSignal_.emit(iSync);
1649
1650 lumiQueue_->pushAndPause(
1651 *nextTask.group(),
1652 [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1653 CMS_SA_ALLOW try {
1654 if (postLumiQueueTask.taskHasFailed()) {
1655 status->resetResources();
1656 queueWhichWaitsForIOVsToFinish_.resume();
1657 endRunAsync(iRunStatus, postLumiQueueTask);
1658 return;
1659 }
1660
1661 status->setResumer(std::move(iResumer));
1662
1663 sourceResourcesAcquirer_.serialQueueChain().push(
1664 *postLumiQueueTask.group(),
1665 [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1666 CMS_SA_ALLOW try {
1667 ServiceRegistry::Operate operate(serviceToken_);
1668
1669 if (postSourceTask.taskHasFailed()) {
1670 status->resetResources();
1671 queueWhichWaitsForIOVsToFinish_.resume();
1672 endRunAsync(iRunStatus, postSourceTask);
1673 return;
1674 }
1675
1676 status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1677
1678 LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1679 {
1680 SendSourceTerminationSignalIfException sentry(actReg_.get());
1681 input_->doBeginLumi(lumiPrincipal, &processContext_);
1682 sentry.completedSuccessfully();
1683 }
1684
1685 Service<RandomNumberGenerator> rng;
1686 if (rng.isAvailable()) {
1687 LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1688 rng->preBeginLumi(lb);
1689 }
1690
1691 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1692
1693 using namespace edm::waiting_task::chain;
1694 chain::first([this, status](auto nextTask) mutable {
1695 readAndMergeLumiEntriesAsync(std::move(status), std::move(nextTask));
1696 firstItemAfterLumiMerge_ = true;
1697 }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1698 LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1699 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1700 beginGlobalTransitionAsync<Traits>(
1701 nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1702 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1703 looper_->prefetchAsync(
1704 nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1705 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1706 status->globalBeginDidSucceed();
1707 ServiceRegistry::Operate operateLooper(serviceToken_);
1708 looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1709 }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1710 if (iException) {
1711 status->resetResources();
1712 queueWhichWaitsForIOVsToFinish_.resume();
1713 WaitingTaskHolder copyHolder(holder);
1714 copyHolder.doneWaiting(*iException);
1715 endRunAsync(iRunStatus, holder);
1716 } else {
1717 if (not looper_) {
1718 status->globalBeginDidSucceed();
1719 }
1720
1721 status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1722
1723 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1724 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1725
1726 streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1727 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1728 streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1729 streamQueues_[i].pause();
1730
1731 auto& event = principalCache_.eventPrincipal(i);
1732
1733
1734
1735 auto eventSetupImpls = &status->eventSetupImpls();
1736 auto lp = status->lumiPrincipal().get();
1737 streamLumiStatus_[i] = std::move(status);
1738 ++streamLumiActive_;
1739 event.setLuminosityBlockPrincipal(lp);
1740 LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1741 using namespace edm::waiting_task::chain;
1742 chain::first([this, i, &transitionInfo](auto nextTask) {
1743 beginStreamTransitionAsync<Traits>(std::move(nextTask),
1744 *schedule_,
1745 i,
1746 transitionInfo,
1747 serviceToken_,
1748 subProcesses_);
1749 }) |
1750 then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1751 auto nextTask) {
1752 if (exceptionFromBeginStreamLumi) {
1753 WaitingTaskHolder copyHolder(nextTask);
1754 copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1755 }
1756 handleNextEventForStreamAsync(std::move(nextTask), i);
1757 }) |
1758 runLast(std::move(holder));
1759 });
1760 }
1761 });
1762 }
1763 }) | runLast(postSourceTask);
1764 } catch (...) {
1765 status->resetResources();
1766 queueWhichWaitsForIOVsToFinish_.resume();
1767 WaitingTaskHolder copyHolder(postSourceTask);
1768 copyHolder.doneWaiting(std::current_exception());
1769 endRunAsync(iRunStatus, postSourceTask);
1770 }
1771 });
1772 } catch (...) {
1773 status->resetResources();
1774 queueWhichWaitsForIOVsToFinish_.resume();
1775 WaitingTaskHolder copyHolder(postLumiQueueTask);
1776 copyHolder.doneWaiting(std::current_exception());
1777 endRunAsync(iRunStatus, postLumiQueueTask);
1778 }
1779 });
1780 } catch (...) {
1781 status->resetResources();
1782 queueWhichWaitsForIOVsToFinish_.resume();
1783 WaitingTaskHolder copyHolder(nextTask);
1784 copyHolder.doneWaiting(std::current_exception());
1785 endRunAsync(iRunStatus, nextTask);
1786 }
1787 }) | chain::runLast(std::move(iHolder));
1788 }
1789
1790 void EventProcessor::continueLumiAsync(edm::WaitingTaskHolder iHolder) {
1791 chain::first([this](auto nextTask) {
1792
1793 auto status = streamLumiStatus_[0];
1794 status->setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kProcessing);
1795
1796 while (lastTransitionType() == InputSource::IsLumi and
1797 status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1798 readAndMergeLumi(*status);
1799 nextTransitionType();
1800 }
1801 firstItemAfterLumiMerge_ = true;
1802 }) | chain::then([this](auto nextTask) mutable {
1803 unsigned int streamIndex = 0;
1804 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1805 for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1806 arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1807 }
1808 nextTask.group()->run(
1809 [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1810 }) | chain::runLast(std::move(iHolder));
1811 }
1812
1813 void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1814 if (holder.taskHasFailed()) {
1815 setExceptionMessageLumis();
1816 } else {
1817 WaitingTaskHolder tmp(holder);
1818 tmp.doneWaiting(iException);
1819 }
1820 }
1821
1822 void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask,
1823 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1824
1825
1826 auto& lp = *(iLumiStatus->lumiPrincipal());
1827 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1828 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1829 EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1830 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1831
1832 using namespace edm::waiting_task::chain;
1833 chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1834 IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1835
1836 LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1837 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1838 endGlobalTransitionAsync<Traits>(
1839 std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1840 }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1841
1842 if (didGlobalBeginSucceed) {
1843 writeLumiAsync(std::move(nextTask), lumiPrincipal);
1844 }
1845 }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1846 looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1847 }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1848
1849 ServiceRegistry::Operate operate(serviceToken_);
1850 looper_->doEndLuminosityBlock(lp, es, &processContext_);
1851 }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1852 if (iException) {
1853 handleEndLumiExceptions(*iException, nextTask);
1854 }
1855 ServiceRegistry::Operate operate(serviceToken_);
1856
1857 std::exception_ptr ptr;
1858
1859
1860
1861
1862
1863 CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1864 if (not ptr) {
1865 ptr = std::current_exception();
1866 }
1867 }
1868
1869 CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1870 if (not ptr) {
1871 ptr = std::current_exception();
1872 }
1873 }
1874
1875 CMS_SA_ALLOW try {
1876 status->resetResources();
1877 status->globalEndRunHolderDoneWaiting();
1878 status.reset();
1879 } catch (...) {
1880 if (not ptr) {
1881 ptr = std::current_exception();
1882 }
1883 }
1884
1885 if (ptr && !iException) {
1886 handleEndLumiExceptions(ptr, nextTask);
1887 }
1888 }) | runLast(std::move(iTask));
1889 }
1890
1891 void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1892 auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1893 auto status = streamLumiStatus_[iStreamIndex];
1894 if (iException) {
1895 handleEndLumiExceptions(*iException, iTask);
1896 }
1897
1898
1899 streamLumiStatus_[iStreamIndex].reset();
1900 --streamLumiActive_;
1901 streamQueues_[iStreamIndex].resume();
1902
1903
1904 if (status->streamFinishedLumi()) {
1905 globalEndLumiAsync(iTask, std::move(status));
1906 }
1907 });
1908
1909 edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1910
1911
1912
1913 auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1914 lumiStatus->setEndTime();
1915
1916 EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1917 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1918 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1919
1920 if (lumiStatus->didGlobalBeginSucceed()) {
1921 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1922 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1923 LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1924 endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1925 *schedule_,
1926 iStreamIndex,
1927 transitionInfo,
1928 serviceToken_,
1929 subProcesses_,
1930 cleaningUpAfterException);
1931 }
1932 }
1933
1934 void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1935 if (streamRunActive_ == 0) {
1936 assert(streamLumiActive_ == 0);
1937 } else {
1938 assert(streamRunActive_ == preallocations_.numberOfStreams());
1939 if (streamLumiActive_ > 0) {
1940 FinalWaitingTask globalWaitTask{taskGroup_};
1941 assert(streamLumiActive_ == preallocations_.numberOfStreams());
1942 streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1943 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1944 streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1945 }
1946 globalWaitTask.wait();
1947 }
1948 }
1949 }
1950
1951 void EventProcessor::readProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
1952 SendSourceTerminationSignalIfException sentry(actReg_.get());
1953 input_->readProcessBlock(processBlockPrincipal);
1954 sentry.completedSuccessfully();
1955 }
1956
1957 std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
1958 auto rp = principalCache_.getAvailableRunPrincipalPtr();
1959 assert(rp);
1960 rp->setAux(*input_->runAuxiliary());
1961 {
1962 SendSourceTerminationSignalIfException sentry(actReg_.get());
1963 input_->readRun(*rp, *historyAppender_);
1964 sentry.completedSuccessfully();
1965 }
1966 assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1967 return rp;
1968 }
1969
1970 void EventProcessor::readAndMergeRun(RunProcessingStatus& iStatus) {
1971 RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1972
1973 bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1974 assert(runOK);
1975 runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
1976 {
1977 SendSourceTerminationSignalIfException sentry(actReg_.get());
1978 input_->readAndMergeRun(runPrincipal);
1979 sentry.completedSuccessfully();
1980 }
1981 }
1982
1983 std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
1984 auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
1985 assert(lbp);
1986 lbp->setAux(*input_->luminosityBlockAuxiliary());
1987 {
1988 SendSourceTerminationSignalIfException sentry(actReg_.get());
1989 input_->readLuminosityBlock(*lbp, *historyAppender_);
1990 sentry.completedSuccessfully();
1991 }
1992 lbp->setRunPrincipal(std::move(rp));
1993 return lbp;
1994 }
1995
1996 void EventProcessor::readAndMergeLumi(LuminosityBlockProcessingStatus& iStatus) {
1997 auto& lumiPrincipal = *iStatus.lumiPrincipal();
1998 assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1999 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2000 input_->processHistoryRegistry().reducedProcessHistoryID(
2001 input_->luminosityBlockAuxiliary()->processHistoryID()));
2002 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2003 assert(lumiOK);
2004 lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2005 {
2006 SendSourceTerminationSignalIfException sentry(actReg_.get());
2007 input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2008 sentry.completedSuccessfully();
2009 }
2010 }
2011
2012 void EventProcessor::writeProcessBlockAsync(WaitingTaskHolder task, ProcessBlockType processBlockType) {
2013 using namespace edm::waiting_task;
2014 chain::first([&](auto nextTask) {
2015 ServiceRegistry::Operate op(serviceToken_);
2016 schedule_->writeProcessBlockAsync(
2017 nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2018 }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2019 ServiceRegistry::Operate op(serviceToken_);
2020 for (auto& s : subProcesses_) {
2021 s.writeProcessBlockAsync(nextTask, processBlockType);
2022 }
2023 }) | chain::runLast(std::move(task));
2024 }
2025
2026 void EventProcessor::writeRunAsync(WaitingTaskHolder task,
2027 RunPrincipal const& runPrincipal,
2028 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
2029 using namespace edm::waiting_task;
2030 if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2031 chain::first([&](auto nextTask) {
2032 ServiceRegistry::Operate op(serviceToken_);
2033 schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2034 }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2035 ServiceRegistry::Operate op(serviceToken_);
2036 for (auto& s : subProcesses_) {
2037 s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2038 }
2039 }) | chain::runLast(std::move(task));
2040 }
2041 }
2042
2043 void EventProcessor::clearRunPrincipal(RunProcessingStatus& iStatus) {
2044 for (auto& s : subProcesses_) {
2045 s.clearRunPrincipal(*iStatus.runPrincipal());
2046 }
2047 iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2048 iStatus.runPrincipal()->clearPrincipal();
2049 }
2050
2051 void EventProcessor::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& lumiPrincipal) {
2052 using namespace edm::waiting_task;
2053 if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2054 chain::first([&](auto nextTask) {
2055 ServiceRegistry::Operate op(serviceToken_);
2056
2057 lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2058 schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2059 }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2060 ServiceRegistry::Operate op(serviceToken_);
2061 for (auto& s : subProcesses_) {
2062 s.writeLumiAsync(nextTask, lumiPrincipal);
2063 }
2064 }) | chain::lastTask(std::move(task));
2065 }
2066 }
2067
2068 void EventProcessor::clearLumiPrincipal(LuminosityBlockProcessingStatus& iStatus) {
2069 for (auto& s : subProcesses_) {
2070 s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2071 }
2072 iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2073 iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2074 iStatus.lumiPrincipal()->clearPrincipal();
2075 }
2076
2077 void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2078 WaitingTaskHolder iHolder) {
2079 auto group = iHolder.group();
2080 sourceResourcesAcquirer_.serialQueueChain().push(
2081 *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2082 CMS_SA_ALLOW try {
2083 ServiceRegistry::Operate operate(serviceToken_);
2084
2085 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2086
2087 nextTransitionType();
2088 while (lastTransitionType() == InputSource::IsRun and status->runPrincipal()->run() == input_->run() and
2089 status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2090 if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2091 status->setStopBeforeProcessingRun(true);
2092 return;
2093 }
2094 readAndMergeRun(*status);
2095 nextTransitionType();
2096 }
2097 } catch (...) {
2098 status->setStopBeforeProcessingRun(true);
2099 holder.doneWaiting(std::current_exception());
2100 }
2101 });
2102 }
2103
2104 void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2105 WaitingTaskHolder iHolder) {
2106 auto group = iHolder.group();
2107 sourceResourcesAcquirer_.serialQueueChain().push(
2108 *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2109 CMS_SA_ALLOW try {
2110 ServiceRegistry::Operate operate(serviceToken_);
2111
2112 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2113
2114 nextTransitionType();
2115 while (lastTransitionType() == InputSource::IsLumi and
2116 iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2117 readAndMergeLumi(*iLumiStatus);
2118 nextTransitionType();
2119 }
2120 } catch (...) {
2121 holder.doneWaiting(std::current_exception());
2122 }
2123 });
2124 }
2125
2126 void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2127 WaitingTaskHolder iHolder) {
2128 if (lastTransitionType() == InputSource::IsFile) {
2129 iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2130 iHolder.doneWaiting(std::exception_ptr{});
2131 } else if (lastTransitionType() == InputSource::IsLumi && !iHolder.taskHasFailed()) {
2132 CMS_SA_ALLOW try {
2133 beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2134 input_->luminosityBlockAuxiliary()->beginTime()),
2135 iRunStatus,
2136 iHolder);
2137 } catch (...) {
2138 WaitingTaskHolder copyHolder(iHolder);
2139 iHolder.doneWaiting(std::current_exception());
2140 endRunAsync(std::move(iRunStatus), std::move(iHolder));
2141 }
2142 } else {
2143
2144
2145 endRunAsync(std::move(iRunStatus), std::move(iHolder));
2146 }
2147 }
2148
2149 bool EventProcessor::readNextEventForStream(WaitingTaskHolder const& iTask,
2150 unsigned int iStreamIndex,
2151 LuminosityBlockProcessingStatus& iStatus) {
2152
2153
2154
2155 if (iTask.taskHasFailed()) {
2156
2157
2158
2159 if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2160 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2161 }
2162 return false;
2163 }
2164
2165
2166 if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2167 return false;
2168 }
2169
2170
2171 if (shouldWeStop()) {
2172 lastSourceTransition_ = InputSource::IsStop;
2173 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2174 return false;
2175 }
2176
2177 ServiceRegistry::Operate operate(serviceToken_);
2178
2179
2180
2181
2182 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2183
2184
2185
2186
2187 InputSource::ItemType itemType = firstItemAfterLumiMerge_ ? lastTransitionType() : nextTransitionType();
2188 firstItemAfterLumiMerge_ = false;
2189
2190 if (InputSource::IsEvent != itemType) {
2191
2192
2193
2194 if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
2195 (InputSource::IsRun == itemType and
2196 (iStatus.lumiPrincipal()->run() != input_->run() or
2197 iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2198 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2199 } else {
2200 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2201 }
2202 return false;
2203 }
2204 readEvent(iStreamIndex);
2205 return true;
2206 }
2207
2208 void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2209 auto group = iTask.group();
2210 sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2211 CMS_SA_ALLOW try {
2212 auto status = streamLumiStatus_[iStreamIndex].get();
2213 ServiceRegistry::Operate operate(serviceToken_);
2214
2215 if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2216 auto recursionTask =
2217 make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2218 if (iEventException) {
2219 WaitingTaskHolder copyHolder(iTask);
2220 copyHolder.doneWaiting(*iEventException);
2221
2222
2223
2224 }
2225 handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2226 });
2227
2228 processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2229 } else {
2230
2231 if (status->eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi) {
2232 if (not status->haveStartedNextLumiOrEndedRun()) {
2233 status->startNextLumiOrEndRun();
2234 if (lastTransitionType() == InputSource::IsLumi && !iTask.taskHasFailed()) {
2235 CMS_SA_ALLOW try {
2236 beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2237 input_->luminosityBlockAuxiliary()->beginTime()),
2238 streamRunStatus_[iStreamIndex],
2239 iTask);
2240 } catch (...) {
2241 WaitingTaskHolder copyHolder(iTask);
2242 copyHolder.doneWaiting(std::current_exception());
2243 endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2244 }
2245 } else {
2246
2247 endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2248 }
2249 }
2250 streamEndLumiAsync(iTask, iStreamIndex);
2251 } else {
2252 assert(status->eventProcessingState() ==
2253 LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2254 auto runStatus = streamRunStatus_[iStreamIndex].get();
2255
2256 if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2257 runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2258 }
2259 }
2260 }
2261 } catch (...) {
2262 WaitingTaskHolder copyHolder(iTask);
2263 copyHolder.doneWaiting(std::current_exception());
2264 handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2265 }
2266 });
2267 }
2268
2269 void EventProcessor::readEvent(unsigned int iStreamIndex) {
2270
2271 auto& event = principalCache_.eventPrincipal(iStreamIndex);
2272 StreamContext streamContext(event.streamID(), &processContext_);
2273
2274 SendSourceTerminationSignalIfException sentry(actReg_.get());
2275 input_->readEvent(event, streamContext);
2276
2277 streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2278 streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2279 sentry.completedSuccessfully();
2280
2281 FDEBUG(1) << "\treadEvent\n";
2282 }
2283
2284 void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2285 iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2286 }
2287
2288 void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2289 auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2290
2291 ServiceRegistry::Operate operate(serviceToken_);
2292 Service<RandomNumberGenerator> rng;
2293 if (rng.isAvailable()) {
2294 Event ev(*pep, ModuleDescription(), nullptr);
2295 rng->postEventRead(ev);
2296 }
2297
2298 EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2299 using namespace edm::waiting_task::chain;
2300 chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2301 EventTransitionInfo info(*pep, es);
2302 schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2303 }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2304 for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2305 subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2306 }
2307 }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2308
2309 ServiceRegistry::Operate operateLooper(serviceToken_);
2310 processEventWithLooper(*pep, iStreamIndex);
2311 }) | then([pep](auto nextTask) {
2312 FDEBUG(1) << "\tprocessEvent\n";
2313 pep->clearEventPrincipal();
2314 }) | runLast(iHolder);
2315 }
2316
2317 void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2318 bool randomAccess = input_->randomAccess();
2319 ProcessingController::ForwardState forwardState = input_->forwardState();
2320 ProcessingController::ReverseState reverseState = input_->reverseState();
2321 ProcessingController pc(forwardState, reverseState, randomAccess);
2322
2323 EDLooperBase::Status status = EDLooperBase::kContinue;
2324 do {
2325 StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2326 EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2327 status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2328
2329 bool succeeded = true;
2330 if (randomAccess) {
2331 if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2332 input_->skipEvents(-2);
2333 } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2334 succeeded = input_->goToEvent(pc.specifiedEventTransition());
2335 }
2336 }
2337 pc.setLastOperationSucceeded(succeeded);
2338 } while (!pc.lastOperationSucceeded());
2339 if (status != EDLooperBase::kContinue) {
2340 shouldWeStop_ = true;
2341 }
2342 }
2343
2344 bool EventProcessor::shouldWeStop() const {
2345 FDEBUG(1) << "\tshouldWeStop\n";
2346 if (shouldWeStop_)
2347 return true;
2348 if (!subProcesses_.empty()) {
2349 for (auto const& subProcess : subProcesses_) {
2350 if (subProcess.terminate()) {
2351 return true;
2352 }
2353 }
2354 return false;
2355 }
2356 return schedule_->terminate();
2357 }
2358
2359 void EventProcessor::setExceptionMessageFiles(std::string& message) { exceptionMessageFiles_ = message; }
2360
2361 void EventProcessor::setExceptionMessageRuns() { exceptionMessageRuns_ = true; }
2362
2363 void EventProcessor::setExceptionMessageLumis() { exceptionMessageLumis_ = true; }
2364
2365 bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2366 bool expected = false;
2367 if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2368 deferredExceptionPtr_ = iException;
2369 return true;
2370 }
2371 return false;
2372 }
2373
2374 void EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization() const {
2375 cms::Exception ex("ModulesSynchingOnLumis");
2376 ex << "The framework is configured to use at least two streams, but the following modules\n"
2377 << "require synchronizing on LuminosityBlock boundaries:";
2378 bool found = false;
2379 for (auto worker : schedule_->allWorkers()) {
2380 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2381 found = true;
2382 ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2383 }
2384 }
2385 if (found) {
2386 ex << "\n\nThe situation can be fixed by either\n"
2387 << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2388 << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2389 throw ex;
2390 }
2391 }
2392
2393 void EventProcessor::warnAboutModulesRequiringRunSynchronization() const {
2394 std::unique_ptr<LogSystem> s;
2395 for (auto worker : schedule_->allWorkers()) {
2396 if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2397 if (not s) {
2398 s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2399 (*s) << "The following modules require synchronizing on Run boundaries:";
2400 }
2401 (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2402 }
2403 }
2404 }
2405
2406 void EventProcessor::warnAboutLegacyModules() const {
2407 std::unique_ptr<LogSystem> s;
2408 for (auto worker : schedule_->allWorkers()) {
2409 if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2410 if (not s) {
2411 s = std::make_unique<LogSystem>("LegacyModules");
2412 (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2413 "is going to end soon. These modules need to be converted to have type\n"
2414 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2415 }
2416 (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2417 }
2418 }
2419 }
2420 }