File indexing completed on 2024-06-04 04:34:54
0001 #include "FWCore/Framework/interface/EventProcessor.h"
0002 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0003 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0004 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0005 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0007 #include "DataFormats/Provenance/interface/SubProcessParentageHelper.h"
0008
0009 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0010 #include "FWCore/Framework/src/CommonParams.h"
0011 #include "FWCore/Framework/interface/EDLooperBase.h"
0012 #include "FWCore/Framework/interface/EventPrincipal.h"
0013 #include "FWCore/Framework/interface/EventSetupProvider.h"
0014 #include "FWCore/Framework/interface/EventSetupRecord.h"
0015 #include "FWCore/Framework/interface/FileBlock.h"
0016 #include "FWCore/Framework/interface/HistoryAppender.h"
0017 #include "FWCore/Framework/interface/InputSourceDescription.h"
0018 #include "FWCore/Framework/interface/IOVSyncValue.h"
0019 #include "FWCore/Framework/interface/LooperFactory.h"
0020 #include "FWCore/Framework/interface/LuminosityBlock.h"
0021 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0022 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0023 #include "FWCore/Framework/interface/ModuleChanger.h"
0024 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0025 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0026 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0027 #include "FWCore/Framework/interface/ProcessingController.h"
0028 #include "FWCore/Framework/interface/RunPrincipal.h"
0029 #include "FWCore/Framework/interface/Schedule.h"
0030 #include "FWCore/Framework/interface/ScheduleInfo.h"
0031 #include "FWCore/Framework/interface/ScheduleItems.h"
0032 #include "FWCore/Framework/interface/SubProcess.h"
0033 #include "FWCore/Framework/interface/Event.h"
0034 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0035 #include "FWCore/Framework/src/Breakpoints.h"
0036 #include "FWCore/Framework/interface/EventSetupsController.h"
0037 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0038 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0039 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0040 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0041 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0042 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0043 #include "FWCore/Framework/interface/TriggerNamesService.h"
0044 #include "FWCore/Framework/src/SendSourceTerminationSignalIfException.h"
0045
0046 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0047
0048 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0049 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
0050 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0051 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0052 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0053 #include "FWCore/ParameterSet/interface/Registry.h"
0054 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0055
0056 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0057 #include "FWCore/ServiceRegistry/interface/Service.h"
0058 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0059 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0060
0061 #include "FWCore/Concurrency/interface/WaitingTask.h"
0062 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0063 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0064 #include "FWCore/Concurrency/interface/chain_first.h"
0065
0066 #include "FWCore/Utilities/interface/Algorithms.h"
0067 #include "FWCore/Utilities/interface/DebugMacros.h"
0068 #include "FWCore/Utilities/interface/EDMException.h"
0069 #include "FWCore/Utilities/interface/Exception.h"
0070 #include "FWCore/Utilities/interface/ConvertException.h"
0071 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0072 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0073 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0074 #include "FWCore/Utilities/interface/StreamID.h"
0075 #include "FWCore/Utilities/interface/RootHandlers.h"
0076 #include "FWCore/Utilities/interface/propagate_const.h"
0077 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0078
0079 #include "MessageForSource.h"
0080 #include "MessageForParent.h"
0081 #include "LuminosityBlockProcessingStatus.h"
0082 #include "RunProcessingStatus.h"
0083
0084 #include "boost/range/adaptor/reversed.hpp"
0085
0086 #include <cassert>
0087 #include <exception>
0088 #include <iomanip>
0089 #include <iostream>
0090 #include <utility>
0091 #include <sstream>
0092
0093 #include <sys/ipc.h>
0094 #include <sys/msg.h>
0095
0096 #include "oneapi/tbb/task.h"
0097
0098
0099 #ifndef __APPLE__
0100 #include <sched.h>
0101 #endif
0102
0103 namespace {
0104 class PauseQueueSentry {
0105 public:
0106 PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
0107 ~PauseQueueSentry() { queue_.resume(); }
0108
0109 private:
0110 edm::SerialTaskQueue& queue_;
0111 };
0112 }
0113
0114 namespace edm {
0115
0116 namespace chain = waiting_task::chain;
0117
0118
0119 std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
0120 ParameterSet& params,
0121 CommonParams const& common,
0122 std::shared_ptr<ProductRegistry> preg,
0123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0124 std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0125 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0126 std::shared_ptr<ActivityRegistry> areg,
0127 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0128 PreallocationConfiguration const& allocations) {
0129 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0130 if (main_input == nullptr) {
0131 throw Exception(errors::Configuration)
0132 << "There must be exactly one source in the configuration.\n"
0133 << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0134 }
0135
0136 std::string modtype(main_input->getParameter<std::string>("@module_type"));
0137
0138 std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0139 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0140 ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0141 filler->fill(descriptions);
0142
0143 try {
0144 convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
0145 } catch (cms::Exception& iException) {
0146 std::ostringstream ost;
0147 ost << "Validating configuration of input source of type " << modtype;
0148 iException.addContext(ost.str());
0149 throw;
0150 }
0151
0152 main_input->registerIt();
0153
0154
0155
0156
0157
0158
0159
0160 ModuleDescription md(main_input->id(),
0161 main_input->getParameter<std::string>("@module_type"),
0162 "source",
0163 processConfiguration.get(),
0164 moduleIndex);
0165
0166 InputSourceDescription isdesc(md,
0167 preg,
0168 branchIDListHelper,
0169 processBlockHelper,
0170 thinnedAssociationsHelper,
0171 areg,
0172 common.maxEventsInput_,
0173 common.maxLumisInput_,
0174 common.maxSecondsUntilRampdown_,
0175 allocations);
0176
0177 areg->preSourceConstructionSignal_(md);
0178 std::unique_ptr<InputSource> input;
0179 try {
0180
0181 std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
0182 convertException::wrap([&]() {
0183 input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
0184 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
0185 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
0186 });
0187 } catch (cms::Exception& iException) {
0188 std::ostringstream ost;
0189 ost << "Constructing input source of type " << modtype;
0190 iException.addContext(ost.str());
0191 throw;
0192 }
0193 return input;
0194 }
0195
0196
0197 std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
0198 eventsetup::EventSetupProvider& cp,
0199 ParameterSet& params,
0200 std::vector<std::string> const& loopers) {
0201 std::shared_ptr<EDLooperBase> vLooper;
0202
0203 assert(1 == loopers.size());
0204
0205 for (auto const& looperName : loopers) {
0206 ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
0207
0208 vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
0209 }
0210 return vLooper;
0211 }
0212
0213
0214 EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0215 ServiceToken const& iToken,
0216 serviceregistry::ServiceLegacy iLegacy,
0217 std::vector<std::string> const& defaultServices,
0218 std::vector<std::string> const& forcedServices)
0219 : actReg_(),
0220 preg_(),
0221 branchIDListHelper_(),
0222 serviceToken_(),
0223 input_(),
0224 moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0225 espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0226 esp_(),
0227 act_table_(),
0228 processConfiguration_(),
0229 schedule_(),
0230 subProcesses_(),
0231 historyAppender_(new HistoryAppender),
0232 fb_(),
0233 looper_(),
0234 deferredExceptionPtrIsSet_(false),
0235 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0236 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0237 principalCache_(),
0238 beginJobCalled_(false),
0239 shouldWeStop_(false),
0240 fileModeNoMerge_(false),
0241 exceptionMessageFiles_(),
0242 exceptionMessageRuns_(false),
0243 exceptionMessageLumis_(false),
0244 forceLooperToEnd_(false),
0245 looperBeginJobRun_(false),
0246 forceESCacheClearOnNewRun_(false),
0247 eventSetupDataToExcludeFromPrefetching_() {
0248 auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0249 processDesc->addServices(defaultServices, forcedServices);
0250 init(processDesc, iToken, iLegacy);
0251 }
0252
0253 EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0254 std::vector<std::string> const& defaultServices,
0255 std::vector<std::string> const& forcedServices)
0256 : actReg_(),
0257 preg_(),
0258 branchIDListHelper_(),
0259 serviceToken_(),
0260 input_(),
0261 moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0262 espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0263 esp_(),
0264 act_table_(),
0265 processConfiguration_(),
0266 schedule_(),
0267 subProcesses_(),
0268 historyAppender_(new HistoryAppender),
0269 fb_(),
0270 looper_(),
0271 deferredExceptionPtrIsSet_(false),
0272 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0273 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0274 principalCache_(),
0275 beginJobCalled_(false),
0276 shouldWeStop_(false),
0277 fileModeNoMerge_(false),
0278 exceptionMessageFiles_(),
0279 exceptionMessageRuns_(false),
0280 exceptionMessageLumis_(false),
0281 forceLooperToEnd_(false),
0282 looperBeginJobRun_(false),
0283 forceESCacheClearOnNewRun_(false),
0284 eventSetupDataToExcludeFromPrefetching_() {
0285 auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0286 processDesc->addServices(defaultServices, forcedServices);
0287 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
0288 }
0289
0290 EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0291 ServiceToken const& token,
0292 serviceregistry::ServiceLegacy legacy)
0293 : actReg_(),
0294 preg_(),
0295 branchIDListHelper_(),
0296 serviceToken_(),
0297 input_(),
0298 moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
0299 espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0300 esp_(),
0301 act_table_(),
0302 processConfiguration_(),
0303 schedule_(),
0304 subProcesses_(),
0305 historyAppender_(new HistoryAppender),
0306 fb_(),
0307 looper_(),
0308 deferredExceptionPtrIsSet_(false),
0309 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0310 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0311 principalCache_(),
0312 beginJobCalled_(false),
0313 shouldWeStop_(false),
0314 fileModeNoMerge_(false),
0315 exceptionMessageFiles_(),
0316 exceptionMessageRuns_(false),
0317 exceptionMessageLumis_(false),
0318 forceLooperToEnd_(false),
0319 looperBeginJobRun_(false),
0320 forceESCacheClearOnNewRun_(false),
0321 eventSetupDataToExcludeFromPrefetching_() {
0322 init(processDesc, token, legacy);
0323 }
0324
0325 void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
0326 ServiceToken const& iToken,
0327 serviceregistry::ServiceLegacy iLegacy) {
0328
0329
0330
0331 ParentageRegistry::instance()->insertMapped(Parentage());
0332
0333
0334 ParameterSet().registerIt();
0335
0336 std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
0337
0338
0339 auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
0340 bool const hasSubProcesses = !subProcessVParameterSet.empty();
0341
0342
0343
0344
0345 validateTopLevelParameterSets(parameterSet.get());
0346
0347
0348 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
0349 auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
0350 if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
0351 throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
0352 << fileMode << ".\n"
0353 << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
0354 } else {
0355 fileModeNoMerge_ = (fileMode == "NOMERGE");
0356 }
0357 forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
0358 ensureAvailableAccelerators(*parameterSet);
0359
0360
0361 unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
0362
0363
0364
0365 assert(nThreads != 0);
0366
0367 unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
0368 if (nStreams == 0) {
0369 nStreams = nThreads;
0370 }
0371 unsigned int nConcurrentLumis =
0372 optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
0373 if (nConcurrentLumis == 0) {
0374 nConcurrentLumis = 2;
0375 }
0376 if (nConcurrentLumis > nStreams) {
0377 nConcurrentLumis = nStreams;
0378 }
0379 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
0380 if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
0381 nConcurrentRuns = nConcurrentLumis;
0382 }
0383 std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
0384 if (!loopers.empty()) {
0385
0386 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
0387 edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
0388 "of concurrent runs, and the number of concurrent lumis "
0389 "are all being reset to 1. Loopers cannot currently support "
0390 "values greater than 1.";
0391 nStreams = 1;
0392 nConcurrentLumis = 1;
0393 nConcurrentRuns = 1;
0394 }
0395 }
0396 bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
0397 if (dumpOptions) {
0398 dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
0399 } else {
0400 if (nThreads > 1 or nStreams > 1) {
0401 edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
0402 }
0403 }
0404
0405
0406
0407
0408
0409
0410 unsigned int maxConcurrentIOVs =
0411 3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
0412
0413 IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
0414
0415 printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
0416 deleteNonConsumedUnscheduledModules_ =
0417 optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
0418
0419
0420 if (not hasSubProcesses) {
0421 branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
0422 }
0423 if (not branchesToDeleteEarly_.empty()) {
0424 auto referencePSets =
0425 optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
0426 for (auto const& pset : referencePSets) {
0427 auto product = pset.getParameter<std::string>("product");
0428 auto references = pset.getParameter<std::vector<std::string>>("references");
0429 for (auto const& ref : references) {
0430 referencesToBranches_.emplace(product, ref);
0431 }
0432 }
0433 modulesToIgnoreForDeleteEarly_ =
0434 optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
0435 }
0436
0437
0438 ScheduleItems items;
0439
0440
0441 auto& serviceSets = processDesc->getServicesPSets();
0442 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
0443 serviceToken_ = items.addCPRandTNS(*parameterSet, token);
0444
0445
0446 ServiceRegistry::Operate operate(serviceToken_);
0447
0448 CMS_SA_ALLOW try {
0449 if (nThreads > 1) {
0450 edm::Service<RootHandlers> handler;
0451 handler->willBeUsingThreads();
0452 }
0453
0454
0455 std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
0456
0457
0458 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
0459 esp_ = espController_->makeProvider(
0460 *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
0461
0462
0463 if (!loopers.empty()) {
0464 looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
0465 looper_->setActionTable(items.act_table_.get());
0466 looper_->attachTo(*items.actReg_);
0467
0468
0469 deleteNonConsumedUnscheduledModules_ = false;
0470 }
0471
0472 preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0473
0474 runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
0475 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
0476 streamQueues_.resize(nStreams);
0477 streamRunStatus_.resize(nStreams);
0478 streamLumiStatus_.resize(nStreams);
0479
0480 processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0481
0482 {
0483 std::optional<ScheduleItems::MadeModules> madeModules;
0484
0485
0486 tbb::task_group group;
0487
0488
0489 auto tempReg = std::make_shared<ProductRegistry>();
0490 auto sourceID = ModuleDescription::getUniqueID();
0491
0492 group.run([&, this]() {
0493
0494 ServiceRegistry::Operate operate(serviceToken_);
0495 auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0496 madeModules =
0497 items.initModules(*parameterSet, tns, preallocations_, &processContext_, moduleTypeResolverMaker_.get());
0498 });
0499
0500 group.run([&, this, tempReg]() {
0501 ServiceRegistry::Operate operate(serviceToken_);
0502 input_ = makeInput(sourceID,
0503 *parameterSet,
0504 *common,
0505 tempReg,
0506 items.branchIDListHelper(),
0507 get_underlying_safe(processBlockHelper_),
0508 items.thinnedAssociationsHelper(),
0509 items.actReg_,
0510 items.processConfiguration(),
0511 preallocations_);
0512 });
0513
0514 group.wait();
0515 items.preg()->addFromInput(*tempReg);
0516 input_->switchTo(items.preg());
0517
0518 {
0519 auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0520 schedule_ = items.finishSchedule(std::move(*madeModules),
0521 *parameterSet,
0522 tns,
0523 hasSubProcesses,
0524 preallocations_,
0525 &processContext_,
0526 *processBlockHelper_);
0527 }
0528 }
0529
0530
0531 act_table_ = std::move(items.act_table_);
0532 actReg_ = items.actReg_;
0533 preg_ = items.preg();
0534 mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0535 branchIDListHelper_ = items.branchIDListHelper();
0536 thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0537 processConfiguration_ = items.processConfiguration();
0538 processContext_.setProcessConfiguration(processConfiguration_.get());
0539
0540 FDEBUG(2) << parameterSet << std::endl;
0541
0542 principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0543 for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0544
0545 auto ep = std::make_shared<EventPrincipal>(preg(),
0546 branchIDListHelper(),
0547 thinnedAssociationsHelper(),
0548 *processConfiguration_,
0549 historyAppender_.get(),
0550 index,
0551 true ,
0552 &*processBlockHelper_);
0553 principalCache_.insert(std::move(ep));
0554 }
0555
0556 for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0557 auto rp = std::make_unique<RunPrincipal>(
0558 preg(), *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_);
0559 principalCache_.insert(std::move(rp));
0560 }
0561
0562 for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0563 auto lp =
0564 std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
0565 principalCache_.insert(std::move(lp));
0566 }
0567
0568 {
0569 auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0570 principalCache_.insert(std::move(pb));
0571
0572 auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0573 principalCache_.insertForInput(std::move(pbForInput));
0574 }
0575
0576
0577 subProcesses_.reserve(subProcessVParameterSet.size());
0578 for (auto& subProcessPSet : subProcessVParameterSet) {
0579 subProcesses_.emplace_back(subProcessPSet,
0580 *parameterSet,
0581 preg(),
0582 branchIDListHelper(),
0583 *processBlockHelper_,
0584 *thinnedAssociationsHelper_,
0585 SubProcessParentageHelper(),
0586 *espController_,
0587 *actReg_,
0588 token,
0589 serviceregistry::kConfigurationOverrides,
0590 preallocations_,
0591 &processContext_,
0592 moduleTypeResolverMaker_);
0593 }
0594 } catch (...) {
0595
0596
0597 espController_ = nullptr;
0598 esp_ = nullptr;
0599 schedule_ = nullptr;
0600 input_ = nullptr;
0601 looper_ = nullptr;
0602 actReg_ = nullptr;
0603 throw;
0604 }
0605 }
0606
0607 EventProcessor::~EventProcessor() {
0608
0609 ServiceToken token = getToken();
0610 ServiceRegistry::Operate op(token);
0611
0612
0613
0614 espController_ = nullptr;
0615 esp_ = nullptr;
0616 schedule_ = nullptr;
0617 input_ = nullptr;
0618 looper_ = nullptr;
0619 actReg_ = nullptr;
0620
0621 pset::Registry::instance()->clear();
0622 ParentageRegistry::instance()->clear();
0623 }
0624
0625 void EventProcessor::taskCleanup() {
0626 edm::FinalWaitingTask task{taskGroup_};
0627 espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
0628 task.waitNoThrow();
0629 assert(task.done());
0630 }
0631
0632 void EventProcessor::beginJob() {
0633 if (beginJobCalled_)
0634 return;
0635 beginJobCalled_ = true;
0636 bk::beginJob();
0637
0638
0639
0640 ServiceRegistry::Operate operate(serviceToken_);
0641
0642 service::SystemBounds bounds(preallocations_.numberOfStreams(),
0643 preallocations_.numberOfLuminosityBlocks(),
0644 preallocations_.numberOfRuns(),
0645 preallocations_.numberOfThreads());
0646 actReg_->preallocateSignal_(bounds);
0647 schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0648 pathsAndConsumesOfModules_.initialize(schedule_.get(), preg());
0649
0650 std::vector<ModuleProcessName> consumedBySubProcesses;
0651 for_all(subProcesses_,
0652 [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
0653 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
0654 if (consumedBySubProcesses.empty()) {
0655 consumedBySubProcesses = std::move(c);
0656 } else if (not c.empty()) {
0657 std::vector<ModuleProcessName> tmp;
0658 tmp.reserve(consumedBySubProcesses.size() + c.size());
0659 std::merge(consumedBySubProcesses.begin(),
0660 consumedBySubProcesses.end(),
0661 c.begin(),
0662 c.end(),
0663 std::back_inserter(tmp));
0664 std::swap(consumedBySubProcesses, tmp);
0665 }
0666 });
0667
0668
0669 checkForModuleDependencyCorrectness(pathsAndConsumesOfModules_, printDependencies_);
0670 if (deleteNonConsumedUnscheduledModules_) {
0671 if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
0672 not unusedModules.empty()) {
0673 pathsAndConsumesOfModules_.removeModules(unusedModules);
0674
0675 edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
0676 l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
0677 "and "
0678 "therefore they are deleted before beginJob transition.";
0679 for (auto const& description : unusedModules) {
0680 l << "\n " << description->moduleLabel();
0681 }
0682 });
0683 for (auto const& description : unusedModules) {
0684 schedule_->deleteModule(description->moduleLabel(), actReg_.get());
0685 }
0686 }
0687 }
0688
0689
0690
0691 if (not branchesToDeleteEarly_.empty()) {
0692 auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
0693 auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
0694 auto referencesToBranches = std::move(referencesToBranches_);
0695 schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
0696 }
0697
0698 if (preallocations_.numberOfLuminosityBlocks() > 1) {
0699 throwAboutModulesRequiringLuminosityBlockSynchronization();
0700 }
0701 if (preallocations_.numberOfRuns() > 1) {
0702 warnAboutModulesRequiringRunSynchronization();
0703 }
0704
0705
0706
0707
0708
0709
0710
0711
0712
0713
0714
0715
0716
0717
0718
0719 espController_->finishConfiguration();
0720 actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
0721 actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
0722 try {
0723 convertException::wrap([&]() { input_->doBeginJob(); });
0724 } catch (cms::Exception& ex) {
0725 ex.addContext("Calling beginJob for the source");
0726 throw;
0727 }
0728
0729 schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
0730 if (looper_) {
0731 constexpr bool mustPrefetchMayGet = true;
0732 auto const processBlockLookup = preg_->productLookup(InProcess);
0733 auto const runLookup = preg_->productLookup(InRun);
0734 auto const lumiLookup = preg_->productLookup(InLumi);
0735 auto const eventLookup = preg_->productLookup(InEvent);
0736 looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
0737 looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
0738 looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
0739 looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
0740 looper_->updateLookup(esp_->recordsToResolverIndices());
0741 }
0742
0743 for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
0744 actReg_->postBeginJobSignal_();
0745
0746 oneapi::tbb::task_group group;
0747 FinalWaitingTask last{group};
0748 using namespace edm::waiting_task::chain;
0749 first([this](auto nextTask) {
0750 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0751 first([i, this](auto nextTask) {
0752 ServiceRegistry::Operate operate(serviceToken_);
0753 schedule_->beginStream(i);
0754 }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
0755 ServiceRegistry::Operate operate(serviceToken_);
0756 for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
0757 }) | lastTask(nextTask);
0758 }
0759 }) | runLast(WaitingTaskHolder(group, &last));
0760 last.wait();
0761 }
0762
0763 void EventProcessor::endJob() {
0764
0765 ExceptionCollector c(
0766 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0767
0768
0769 ServiceRegistry::Operate operate(serviceToken_);
0770
0771 using namespace edm::waiting_task::chain;
0772
0773 oneapi::tbb::task_group group;
0774 edm::FinalWaitingTask waitTask{group};
0775
0776 {
0777
0778 edm::WaitingTaskHolder taskHolder(group, &waitTask);
0779 std::mutex collectorMutex;
0780 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0781 first([this, i, &c, &collectorMutex](auto nextTask) {
0782 std::exception_ptr ep;
0783 try {
0784 ServiceRegistry::Operate operate(serviceToken_);
0785 this->schedule_->endStream(i);
0786 } catch (...) {
0787 ep = std::current_exception();
0788 }
0789 if (ep) {
0790 std::lock_guard<std::mutex> l(collectorMutex);
0791 c.call([&ep]() { std::rethrow_exception(ep); });
0792 }
0793 }) | then([this, i, &c, &collectorMutex](auto nextTask) {
0794 for (auto& subProcess : subProcesses_) {
0795 first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
0796 std::exception_ptr ep;
0797 try {
0798 ServiceRegistry::Operate operate(serviceToken_);
0799 subProcess.doEndStream(i);
0800 } catch (...) {
0801 ep = std::current_exception();
0802 }
0803 if (ep) {
0804 std::lock_guard<std::mutex> l(collectorMutex);
0805 c.call([&ep]() { std::rethrow_exception(ep); });
0806 }
0807 }) | lastTask(nextTask);
0808 }
0809 }) | lastTask(taskHolder);
0810 }
0811 }
0812 waitTask.waitNoThrow();
0813
0814 auto actReg = actReg_.get();
0815 c.call([actReg]() { actReg->preEndJobSignal_(); });
0816 schedule_->endJob(c);
0817 for (auto& subProcess : subProcesses_) {
0818 c.call(std::bind(&SubProcess::doEndJob, &subProcess));
0819 }
0820 c.call(std::bind(&InputSource::doEndJob, input_.get()));
0821 if (looper_) {
0822 c.call(std::bind(&EDLooperBase::endOfJob, looper()));
0823 }
0824 c.call([actReg]() { actReg->postEndJobSignal_(); });
0825 if (c.hasThrown()) {
0826 c.rethrow();
0827 }
0828 }
0829
0830 ServiceToken EventProcessor::getToken() { return serviceToken_; }
0831
0832 std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
0833 return schedule_->getAllModuleDescriptions();
0834 }
0835
0836 int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
0837
0838 int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
0839
0840 int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
0841
0842 void EventProcessor::clearCounters() { schedule_->clearCounters(); }
0843
0844 namespace {
0845 #include "TransitionProcessors.icc"
0846 }
0847
0848 bool EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
0849 bool returnValue = false;
0850
0851
0852 if (shutdown_flag.load(std::memory_order_acquire)) {
0853 returnValue = true;
0854 edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
0855 edm::Service<edm::JobReport> jr;
0856 jr->reportShutdownSignal();
0857 returnCode = epSignal;
0858 }
0859 return returnValue;
0860 }
0861
0862 namespace {
0863 struct SourceNextGuard {
0864 SourceNextGuard(edm::ActivityRegistry& iReg) : act_(iReg) { iReg.preSourceNextTransitionSignal_.emit(); }
0865 ~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
0866 edm::ActivityRegistry& act_;
0867 };
0868 }
0869
0870 InputSource::ItemTypeInfo EventProcessor::nextTransitionType() {
0871 SendSourceTerminationSignalIfException sentry(actReg_.get());
0872 InputSource::ItemTypeInfo itemTypeInfo;
0873 {
0874 SourceNextGuard guard(*actReg_.get());
0875
0876 do {
0877 itemTypeInfo = input_->nextItemType();
0878 } while (itemTypeInfo == InputSource::ItemType::IsSynchronize);
0879 }
0880 lastSourceTransition_ = itemTypeInfo;
0881 sentry.completedSuccessfully();
0882
0883 StatusCode returnCode = epSuccess;
0884
0885 if (checkForAsyncStopRequest(returnCode)) {
0886 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
0887 lastSourceTransition_ = InputSource::ItemType::IsStop;
0888 }
0889
0890 return lastSourceTransition_;
0891 }
0892
0893 void EventProcessor::nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
0894 WaitingTaskHolder nextTask) {
0895 auto group = nextTask.group();
0896 sourceResourcesAcquirer_.serialQueueChain().push(
0897 *group, [this, runStatus = std::move(iRunStatus), nextHolder = std::move(nextTask)]() mutable {
0898 CMS_SA_ALLOW try {
0899 ServiceRegistry::Operate operate(serviceToken_);
0900 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
0901 nextTransitionType();
0902 if (lastTransitionType() == InputSource::ItemType::IsRun &&
0903 runStatus->runPrincipal()->run() == input_->run() &&
0904 runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
0905 throw Exception(errors::LogicError)
0906 << "InputSource claimed previous Run Entry was last to be merged in this file,\n"
0907 << "but the next entry has the same run number and reduced ProcessHistoryID.\n"
0908 << "This is probably a bug in the InputSource. Please report to the Core group.\n";
0909 }
0910 } catch (...) {
0911 nextHolder.doneWaiting(std::current_exception());
0912 }
0913 });
0914 }
0915
0916 EventProcessor::StatusCode EventProcessor::runToCompletion() {
0917 beginJob();
0918
0919
0920 ServiceRegistry::Operate operate(serviceToken_);
0921 actReg_->beginProcessingSignal_();
0922 auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
0923 std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
0924 try {
0925 FilesProcessor fp(fileModeNoMerge_);
0926
0927 convertException::wrap([&]() {
0928 bool firstTime = true;
0929 do {
0930 if (not firstTime) {
0931 prepareForNextLoop();
0932 rewindInput();
0933 } else {
0934 firstTime = false;
0935 }
0936 startingNewLoop();
0937
0938 auto trans = fp.processFiles(*this);
0939
0940 fp.normalEnd();
0941
0942 if (deferredExceptionPtrIsSet_.load()) {
0943 std::rethrow_exception(deferredExceptionPtr_);
0944 }
0945 if (trans != InputSource::ItemType::IsStop) {
0946
0947 doErrorStuff();
0948
0949 throw cms::Exception("BadTransition") << "Unexpected transition change " << static_cast<int>(trans);
0950 }
0951 } while (not endOfLoop());
0952 });
0953
0954 }
0955 catch (cms::Exception& e) {
0956 if (exceptionMessageLumis_) {
0957 std::string message(
0958 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
0959 e.addAdditionalInfo(message);
0960 if (e.alreadyPrinted()) {
0961 LogAbsolute("Additional Exceptions") << message;
0962 }
0963 }
0964 if (exceptionMessageRuns_) {
0965 std::string message(
0966 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
0967 e.addAdditionalInfo(message);
0968 if (e.alreadyPrinted()) {
0969 LogAbsolute("Additional Exceptions") << message;
0970 }
0971 }
0972 if (!exceptionMessageFiles_.empty()) {
0973 e.addAdditionalInfo(exceptionMessageFiles_);
0974 if (e.alreadyPrinted()) {
0975 LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
0976 }
0977 }
0978 throw;
0979 }
0980 return epSuccess;
0981 }
0982
0983 void EventProcessor::readFile() {
0984 FDEBUG(1) << " \treadFile\n";
0985 size_t size = preg_->size();
0986 SendSourceTerminationSignalIfException sentry(actReg_.get());
0987
0988 if (streamRunActive_ > 0) {
0989 streamRunStatus_[0]->runPrincipal()->preReadFile();
0990 streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
0991 }
0992
0993 if (streamLumiActive_ > 0) {
0994 streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
0995 }
0996
0997 fb_ = input_->readFile();
0998 if (size < preg_->size()) {
0999 principalCache_.adjustIndexesAfterProductRegistryAddition();
1000 }
1001 principalCache_.adjustEventsToNewProductRegistry(preg());
1002 if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
1003 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1004 }
1005 sentry.completedSuccessfully();
1006 }
1007
1008 void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1009 if (fileBlockValid()) {
1010 SendSourceTerminationSignalIfException sentry(actReg_.get());
1011 input_->closeFile(fb_.get(), cleaningUpAfterException);
1012 sentry.completedSuccessfully();
1013 }
1014 FDEBUG(1) << "\tcloseInputFile\n";
1015 }
1016
1017 void EventProcessor::openOutputFiles() {
1018 if (fileBlockValid()) {
1019 schedule_->openOutputFiles(*fb_);
1020 for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
1021 }
1022 FDEBUG(1) << "\topenOutputFiles\n";
1023 }
1024
1025 void EventProcessor::closeOutputFiles() {
1026 schedule_->closeOutputFiles();
1027 for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
1028 processBlockHelper_->clearAfterOutputFilesClose();
1029 FDEBUG(1) << "\tcloseOutputFiles\n";
1030 }
1031
1032 void EventProcessor::respondToOpenInputFile() {
1033 if (fileBlockValid()) {
1034 for_all(subProcesses_,
1035 [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1036 schedule_->respondToOpenInputFile(*fb_);
1037 for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1038 }
1039 FDEBUG(1) << "\trespondToOpenInputFile\n";
1040 }
1041
1042 void EventProcessor::respondToCloseInputFile() {
1043 if (fileBlockValid()) {
1044 schedule_->respondToCloseInputFile(*fb_);
1045 for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1046 }
1047 FDEBUG(1) << "\trespondToCloseInputFile\n";
1048 }
1049
1050 void EventProcessor::startingNewLoop() {
1051 shouldWeStop_ = false;
1052
1053
1054 if (looper_ && looperBeginJobRun_) {
1055 looper_->doStartingNewLoop();
1056 }
1057 FDEBUG(1) << "\tstartingNewLoop\n";
1058 }
1059
1060 bool EventProcessor::endOfLoop() {
1061 if (looper_) {
1062 ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToResolverIndices());
1063 looper_->setModuleChanger(&changer);
1064 EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1065 looper_->setModuleChanger(nullptr);
1066 if (status != EDLooperBase::kContinue || forceLooperToEnd_)
1067 return true;
1068 else
1069 return false;
1070 }
1071 FDEBUG(1) << "\tendOfLoop\n";
1072 return true;
1073 }
1074
1075 void EventProcessor::rewindInput() {
1076 input_->repeat();
1077 input_->rewind();
1078 FDEBUG(1) << "\trewind\n";
1079 }
1080
1081 void EventProcessor::prepareForNextLoop() {
1082 looper_->prepareForNextLoop(esp_.get());
1083 FDEBUG(1) << "\tprepareForNextLoop\n";
1084 }
1085
1086 bool EventProcessor::shouldWeCloseOutput() const {
1087 FDEBUG(1) << "\tshouldWeCloseOutput\n";
1088 if (!subProcesses_.empty()) {
1089 for (auto const& subProcess : subProcesses_) {
1090 if (subProcess.shouldWeCloseOutput()) {
1091 return true;
1092 }
1093 }
1094 return false;
1095 }
1096 return schedule_->shouldWeCloseOutput();
1097 }
1098
1099 void EventProcessor::doErrorStuff() {
1100 FDEBUG(1) << "\tdoErrorStuff\n";
1101 LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1102 << "and went to the error state\n"
1103 << "Will attempt to terminate processing normally\n"
1104 << "(IF using the looper the next loop will be attempted)\n"
1105 << "This likely indicates a bug in an input module or corrupted input or both\n";
1106 }
1107
1108 void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1109 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1110 processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1111
1112 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1113 FinalWaitingTask globalWaitTask{taskGroup_};
1114
1115 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1116 beginGlobalTransitionAsync<Traits>(
1117 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1118
1119 globalWaitTask.wait();
1120 beginProcessBlockSucceeded = true;
1121 }
1122
1123 void EventProcessor::inputProcessBlocks() {
1124 input_->fillProcessBlockHelper();
1125 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1126 while (input_->nextProcessBlock(processBlockPrincipal)) {
1127 readProcessBlock(processBlockPrincipal);
1128
1129 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1130 FinalWaitingTask globalWaitTask{taskGroup_};
1131
1132 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1133 beginGlobalTransitionAsync<Traits>(
1134 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1135
1136 globalWaitTask.wait();
1137
1138 FinalWaitingTask writeWaitTask{taskGroup_};
1139 writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::Input);
1140 writeWaitTask.wait();
1141
1142 processBlockPrincipal.clearPrincipal();
1143 for (auto& s : subProcesses_) {
1144 s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1145 }
1146 }
1147 }
1148
1149 void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1150 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1151
1152 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1153 FinalWaitingTask globalWaitTask{taskGroup_};
1154
1155 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1156 endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1157 *schedule_,
1158 transitionInfo,
1159 serviceToken_,
1160 subProcesses_,
1161 cleaningUpAfterException);
1162 globalWaitTask.wait();
1163
1164 if (beginProcessBlockSucceeded) {
1165 FinalWaitingTask writeWaitTask{taskGroup_};
1166 writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::New);
1167 writeWaitTask.wait();
1168 }
1169
1170 processBlockPrincipal.clearPrincipal();
1171 for (auto& s : subProcesses_) {
1172 s.clearProcessBlockPrincipal(ProcessBlockType::New);
1173 }
1174 }
1175
1176 InputSource::ItemType EventProcessor::processRuns() {
1177 FinalWaitingTask waitTask{taskGroup_};
1178 assert(lastTransitionType() == InputSource::ItemType::IsRun);
1179 if (streamRunActive_ == 0) {
1180 assert(streamLumiActive_ == 0);
1181
1182 beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1183 WaitingTaskHolder{taskGroup_, &waitTask});
1184 } else {
1185 assert(streamRunActive_ == preallocations_.numberOfStreams());
1186
1187 auto runStatus = streamRunStatus_[0];
1188
1189 while (lastTransitionType() == InputSource::ItemType::IsRun and
1190 runStatus->runPrincipal()->run() == input_->run() and
1191 runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1192 readAndMergeRun(*runStatus);
1193 nextTransitionType();
1194 }
1195
1196 setNeedToCallNext(false);
1197
1198 WaitingTaskHolder holder{taskGroup_, &waitTask};
1199 runStatus->setHolderOfTaskInProcessRuns(holder);
1200 if (streamLumiActive_ > 0) {
1201 assert(streamLumiActive_ == preallocations_.numberOfStreams());
1202 continueLumiAsync(std::move(holder));
1203 } else {
1204 handleNextItemAfterMergingRunEntries(std::move(runStatus), std::move(holder));
1205 }
1206 }
1207 waitTask.wait();
1208 return lastTransitionType();
1209 }
1210
1211 void EventProcessor::beginRunAsync(IOVSyncValue const& iSync, WaitingTaskHolder iHolder) {
1212 if (iHolder.taskHasFailed()) {
1213 return;
1214 }
1215
1216 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1217
1218 auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1219
1220 chain::first([this, &status, iSync](auto nextTask) {
1221 espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1222 nextTask,
1223 status->endIOVWaitingTasks(),
1224 status->eventSetupImpls(),
1225 queueWhichWaitsForIOVsToFinish_,
1226 actReg_.get(),
1227 serviceToken_,
1228 forceESCacheClearOnNewRun_);
1229 }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1230 CMS_SA_ALLOW try {
1231 if (iException) {
1232 WaitingTaskHolder copyHolder(nextTask);
1233 copyHolder.doneWaiting(*iException);
1234
1235 }
1236 ServiceRegistry::Operate operate(serviceToken_);
1237
1238 runQueue_->pushAndPause(
1239 *nextTask.group(),
1240 [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1241 CMS_SA_ALLOW try {
1242 if (postRunQueueTask.taskHasFailed()) {
1243 status->resetBeginResources();
1244 queueWhichWaitsForIOVsToFinish_.resume();
1245 return;
1246 }
1247
1248 status->setResumer(std::move(iResumer));
1249
1250 sourceResourcesAcquirer_.serialQueueChain().push(
1251 *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1252 CMS_SA_ALLOW try {
1253 ServiceRegistry::Operate operate(serviceToken_);
1254
1255 if (postSourceTask.taskHasFailed()) {
1256 status->resetBeginResources();
1257 queueWhichWaitsForIOVsToFinish_.resume();
1258 status->resumeGlobalRunQueue();
1259 return;
1260 }
1261
1262 status->setRunPrincipal(readRun());
1263
1264 RunPrincipal& runPrincipal = *status->runPrincipal();
1265 {
1266 SendSourceTerminationSignalIfException sentry(actReg_.get());
1267 input_->doBeginRun(runPrincipal, &processContext_);
1268 sentry.completedSuccessfully();
1269 }
1270
1271 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1272 if (looper_ && looperBeginJobRun_ == false) {
1273 looper_->copyInfo(ScheduleInfo(schedule_.get()));
1274
1275 oneapi::tbb::task_group group;
1276 FinalWaitingTask waitTask{group};
1277 using namespace edm::waiting_task::chain;
1278 chain::first([this, &es](auto nextTask) {
1279 looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1280 }) | then([this, &es](auto nextTask) mutable {
1281 looper_->beginOfJob(es);
1282 looperBeginJobRun_ = true;
1283 looper_->doStartingNewLoop();
1284 }) | runLast(WaitingTaskHolder(group, &waitTask));
1285 waitTask.wait();
1286 }
1287
1288 using namespace edm::waiting_task::chain;
1289 chain::first([this, status](auto nextTask) mutable {
1290 CMS_SA_ALLOW try {
1291 if (lastTransitionType().itemPosition() != InputSource::ItemPosition::LastItemToBeMerged) {
1292 readAndMergeRunEntriesAsync(status, nextTask);
1293 } else {
1294 setNeedToCallNext(true);
1295 }
1296 } catch (...) {
1297 status->setStopBeforeProcessingRun(true);
1298 nextTask.doneWaiting(std::current_exception());
1299 }
1300 }) | then([this, status, &es](auto nextTask) {
1301 if (status->stopBeforeProcessingRun()) {
1302 return;
1303 }
1304 RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1305 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1306 beginGlobalTransitionAsync<Traits>(
1307 nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1308 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1309 if (status->stopBeforeProcessingRun()) {
1310 return;
1311 }
1312 looper_->prefetchAsync(
1313 nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1314 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1315 if (status->stopBeforeProcessingRun()) {
1316 return;
1317 }
1318 ServiceRegistry::Operate operateLooper(serviceToken_);
1319 looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1320 }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1321 if (iException) {
1322 WaitingTaskHolder copyHolder(holder);
1323 copyHolder.doneWaiting(*iException);
1324 } else {
1325 status->globalBeginDidSucceed();
1326 }
1327
1328 if (status->stopBeforeProcessingRun()) {
1329
1330 status->resetBeginResources();
1331 queueWhichWaitsForIOVsToFinish_.resume();
1332 status->resumeGlobalRunQueue();
1333 return;
1334 }
1335 CMS_SA_ALLOW try {
1336
1337
1338 auto globalEndRunTask =
1339 edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1340 WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1341 status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1342 globalEndRunAsync(std::move(taskHolder), std::move(status));
1343 });
1344 status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1345 } catch (...) {
1346 status->resetBeginResources();
1347 queueWhichWaitsForIOVsToFinish_.resume();
1348 status->resumeGlobalRunQueue();
1349 holder.doneWaiting(std::current_exception());
1350 return;
1351 }
1352
1353
1354
1355 ServiceRegistry::Operate operate(serviceToken_);
1356
1357
1358
1359
1360 PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1361
1362 CMS_SA_ALLOW try {
1363 streamQueuesInserter_.push(*holder.group(), [this, status, holder]() mutable {
1364 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1365 CMS_SA_ALLOW try {
1366 streamQueues_[i].push(*holder.group(), [this, i, status, holder]() mutable {
1367 streamBeginRunAsync(i, std::move(status), std::move(holder));
1368 });
1369 } catch (...) {
1370 if (status->streamFinishedBeginRun()) {
1371 WaitingTaskHolder copyHolder(holder);
1372 copyHolder.doneWaiting(std::current_exception());
1373 status->resetBeginResources();
1374 queueWhichWaitsForIOVsToFinish_.resume();
1375 exceptionRunStatus_ = status;
1376 }
1377 }
1378 }
1379 });
1380 } catch (...) {
1381 WaitingTaskHolder copyHolder(holder);
1382 copyHolder.doneWaiting(std::current_exception());
1383 status->resetBeginResources();
1384 queueWhichWaitsForIOVsToFinish_.resume();
1385 exceptionRunStatus_ = status;
1386 }
1387 handleNextItemAfterMergingRunEntries(status, holder);
1388 }) | runLast(postSourceTask);
1389 } catch (...) {
1390 status->resetBeginResources();
1391 queueWhichWaitsForIOVsToFinish_.resume();
1392 status->resumeGlobalRunQueue();
1393 postSourceTask.doneWaiting(std::current_exception());
1394 }
1395 });
1396 } catch (...) {
1397 status->resetBeginResources();
1398 queueWhichWaitsForIOVsToFinish_.resume();
1399 status->resumeGlobalRunQueue();
1400 postRunQueueTask.doneWaiting(std::current_exception());
1401 }
1402 });
1403 } catch (...) {
1404 status->resetBeginResources();
1405 queueWhichWaitsForIOVsToFinish_.resume();
1406 nextTask.doneWaiting(std::current_exception());
1407 }
1408 }) | chain::runLast(std::move(iHolder));
1409 }
1410
1411 void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1412 std::shared_ptr<RunProcessingStatus> status,
1413 WaitingTaskHolder iHolder) noexcept {
1414
1415 streamQueues_[iStream].pause();
1416 ++streamRunActive_;
1417 streamRunStatus_[iStream] = std::move(status);
1418
1419 CMS_SA_ALLOW try {
1420 using namespace edm::waiting_task::chain;
1421 chain::first([this, iStream](auto nextTask) {
1422 RunProcessingStatus& rs = *streamRunStatus_[iStream];
1423 if (rs.didGlobalBeginSucceed()) {
1424 RunTransitionInfo transitionInfo(
1425 *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1426 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1427 beginStreamTransitionAsync<Traits>(
1428 std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1429 }
1430 }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1431 if (exceptionFromBeginStreamRun) {
1432 nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1433 }
1434 releaseBeginRunResources(iStream);
1435 }) | runLast(iHolder);
1436 } catch (...) {
1437 releaseBeginRunResources(iStream);
1438 iHolder.doneWaiting(std::current_exception());
1439 }
1440 }
1441
1442 void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1443 auto& status = streamRunStatus_[iStream];
1444 if (status->streamFinishedBeginRun()) {
1445 status->resetBeginResources();
1446 queueWhichWaitsForIOVsToFinish_.resume();
1447 }
1448 streamQueues_[iStream].resume();
1449 }
1450
1451 void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1452 RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1453 iRunStatus->setEndTime();
1454 IOVSyncValue ts(
1455 EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1456 runPrincipal.endTime());
1457 CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1458 WaitingTaskHolder copyHolder(iHolder);
1459 copyHolder.doneWaiting(std::current_exception());
1460 }
1461
1462 chain::first([this, &iRunStatus, &ts](auto nextTask) {
1463 espController_->runOrQueueEventSetupForInstanceAsync(ts,
1464 nextTask,
1465 iRunStatus->endIOVWaitingTasksEndRun(),
1466 iRunStatus->eventSetupImplsEndRun(),
1467 queueWhichWaitsForIOVsToFinish_,
1468 actReg_.get(),
1469 serviceToken_);
1470 }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1471 if (iException) {
1472 iRunStatus->setEndingEventSetupSucceeded(false);
1473 handleEndRunExceptions(*iException, nextTask);
1474 }
1475 ServiceRegistry::Operate operate(serviceToken_);
1476 streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1477 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1478 CMS_SA_ALLOW try {
1479 streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1480 streamQueues_[i].pause();
1481 streamEndRunAsync(std::move(nextTask), i);
1482 });
1483 } catch (...) {
1484 WaitingTaskHolder copyHolder(nextTask);
1485 copyHolder.doneWaiting(std::current_exception());
1486 }
1487 }
1488 });
1489
1490 if (lastTransitionType() == InputSource::ItemType::IsRun) {
1491 CMS_SA_ALLOW try {
1492 beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1493 } catch (...) {
1494 WaitingTaskHolder copyHolder(nextTask);
1495 copyHolder.doneWaiting(std::current_exception());
1496 }
1497 }
1498 }) | chain::runLast(std::move(iHolder));
1499 }
1500
1501 void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1502 if (holder.taskHasFailed()) {
1503 setExceptionMessageRuns();
1504 } else {
1505 WaitingTaskHolder tmp(holder);
1506 tmp.doneWaiting(iException);
1507 }
1508 }
1509
1510 void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1511 auto& runPrincipal = *(iRunStatus->runPrincipal());
1512 bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1513 bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1514 EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1515 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1516 bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1517
1518 MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1519 using namespace edm::waiting_task::chain;
1520 chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1521 auto nextTask) {
1522 if (endingEventSetupSucceeded) {
1523 RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1524 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1525 endGlobalTransitionAsync<Traits>(
1526 std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1527 }
1528 }) |
1529 ifThen(looper_ && endingEventSetupSucceeded,
1530 [this, &runPrincipal, &es](auto nextTask) {
1531 looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1532 }) |
1533 ifThen(looper_ && endingEventSetupSucceeded,
1534 [this, &runPrincipal, &es](auto nextTask) {
1535 ServiceRegistry::Operate operate(serviceToken_);
1536 looper_->doEndRun(runPrincipal, es, &processContext_);
1537 }) |
1538 ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1539 [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1540 mergeableRunProductMetadata->preWriteRun();
1541 writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1542 }) |
1543 then([status = std::move(iRunStatus),
1544 this,
1545 didGlobalBeginSucceed,
1546 mergeableRunProductMetadata,
1547 endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1548 if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1549 mergeableRunProductMetadata->postWriteRun();
1550 }
1551 if (iException) {
1552 handleEndRunExceptions(*iException, nextTask);
1553 }
1554 ServiceRegistry::Operate operate(serviceToken_);
1555
1556 std::exception_ptr ptr;
1557
1558
1559
1560
1561 CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1562 if (not ptr) {
1563 ptr = std::current_exception();
1564 }
1565 }
1566 CMS_SA_ALLOW try {
1567 status->resumeGlobalRunQueue();
1568 queueWhichWaitsForIOVsToFinish_.resume();
1569 } catch (...) {
1570 if (not ptr) {
1571 ptr = std::current_exception();
1572 }
1573 }
1574 CMS_SA_ALLOW try {
1575 status->resetEndResources();
1576 status.reset();
1577 } catch (...) {
1578 if (not ptr) {
1579 ptr = std::current_exception();
1580 }
1581 }
1582
1583 if (ptr && !iException) {
1584 handleEndRunExceptions(ptr, nextTask);
1585 }
1586 }) |
1587 runLast(std::move(iTask));
1588 }
1589
1590 void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1591 CMS_SA_ALLOW try {
1592 if (!streamRunStatus_[iStreamIndex]) {
1593 if (exceptionRunStatus_->streamFinishedRun()) {
1594 exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1595 exceptionRunStatus_.reset();
1596 }
1597 return;
1598 }
1599
1600 auto runDoneTask =
1601 edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1602 if (iException) {
1603 handleEndRunExceptions(*iException, iTask);
1604 }
1605
1606 auto runStatus = streamRunStatus_[iStreamIndex];
1607
1608
1609 if (runStatus->streamFinishedRun()) {
1610 runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1611 }
1612 streamRunStatus_[iStreamIndex].reset();
1613 --streamRunActive_;
1614 streamQueues_[iStreamIndex].resume();
1615 });
1616
1617 WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1618
1619 auto runStatus = streamRunStatus_[iStreamIndex].get();
1620
1621 if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1622 EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1623 auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1624 bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1625
1626 auto& runPrincipal = *runStatus->runPrincipal();
1627 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1628 RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1629 endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1630 *schedule_,
1631 iStreamIndex,
1632 transitionInfo,
1633 serviceToken_,
1634 subProcesses_,
1635 cleaningUpAfterException);
1636 }
1637 } catch (...) {
1638 handleEndRunExceptions(std::current_exception(), iTask);
1639 }
1640 }
1641
1642 void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1643 if (streamRunActive_ > 0) {
1644 FinalWaitingTask waitTask{taskGroup_};
1645
1646 auto runStatus = streamRunStatus_[0].get();
1647 runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1648 WaitingTaskHolder holder{taskGroup_, &waitTask};
1649 runStatus->setHolderOfTaskInProcessRuns(holder);
1650 lastSourceTransition_ = InputSource::ItemType::IsStop;
1651 endRunAsync(streamRunStatus_[0], std::move(holder));
1652 waitTask.wait();
1653 }
1654 }
1655
1656 void EventProcessor::beginLumiAsync(IOVSyncValue const& iSync,
1657 std::shared_ptr<RunProcessingStatus> iRunStatus,
1658 edm::WaitingTaskHolder iHolder) {
1659 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1660
1661 auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1662 chain::first([this, &iSync, &status](auto nextTask) {
1663 espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1664 nextTask,
1665 status->endIOVWaitingTasks(),
1666 status->eventSetupImpls(),
1667 queueWhichWaitsForIOVsToFinish_,
1668 actReg_.get(),
1669 serviceToken_);
1670 }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1671 CMS_SA_ALLOW try {
1672
1673 if (iException) {
1674 WaitingTaskHolder copyHolder(nextTask);
1675 copyHolder.doneWaiting(*iException);
1676 }
1677
1678 lumiQueue_->pushAndPause(
1679 *nextTask.group(),
1680 [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1681 CMS_SA_ALLOW try {
1682 if (postLumiQueueTask.taskHasFailed()) {
1683 status->resetResources();
1684 queueWhichWaitsForIOVsToFinish_.resume();
1685 endRunAsync(iRunStatus, postLumiQueueTask);
1686 return;
1687 }
1688
1689 status->setResumer(std::move(iResumer));
1690
1691 sourceResourcesAcquirer_.serialQueueChain().push(
1692 *postLumiQueueTask.group(),
1693 [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1694 CMS_SA_ALLOW try {
1695 ServiceRegistry::Operate operate(serviceToken_);
1696
1697 if (postSourceTask.taskHasFailed()) {
1698 status->resetResources();
1699 queueWhichWaitsForIOVsToFinish_.resume();
1700 endRunAsync(iRunStatus, postSourceTask);
1701 return;
1702 }
1703
1704 status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1705
1706 LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1707 {
1708 SendSourceTerminationSignalIfException sentry(actReg_.get());
1709 input_->doBeginLumi(lumiPrincipal, &processContext_);
1710 sentry.completedSuccessfully();
1711 }
1712
1713 Service<RandomNumberGenerator> rng;
1714 if (rng.isAvailable()) {
1715 LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1716 rng->preBeginLumi(lb);
1717 }
1718
1719 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1720
1721 using namespace edm::waiting_task::chain;
1722 chain::first([this, status](auto nextTask) mutable {
1723 if (lastTransitionType().itemPosition() != InputSource::ItemPosition::LastItemToBeMerged) {
1724 readAndMergeLumiEntriesAsync(std::move(status), std::move(nextTask));
1725 } else {
1726 setNeedToCallNext(true);
1727 }
1728 }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1729 LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1730 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1731 beginGlobalTransitionAsync<Traits>(
1732 nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1733 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1734 looper_->prefetchAsync(
1735 nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1736 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1737 ServiceRegistry::Operate operateLooper(serviceToken_);
1738 looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1739 }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1740 status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1741
1742 if (iException) {
1743 WaitingTaskHolder copyHolder(holder);
1744 copyHolder.doneWaiting(*iException);
1745 globalEndLumiAsync(holder, status);
1746 endRunAsync(iRunStatus, holder);
1747 } else {
1748 status->globalBeginDidSucceed();
1749
1750 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1751 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1752
1753 streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1754 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1755 streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1756 if (!status->shouldStreamStartLumi()) {
1757 return;
1758 }
1759 streamQueues_[i].pause();
1760
1761 auto& event = principalCache_.eventPrincipal(i);
1762 auto eventSetupImpls = &status->eventSetupImpls();
1763 auto lp = status->lumiPrincipal().get();
1764 streamLumiStatus_[i] = std::move(status);
1765 ++streamLumiActive_;
1766 event.setLuminosityBlockPrincipal(lp);
1767 LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1768 using namespace edm::waiting_task::chain;
1769 chain::first([this, i, &transitionInfo](auto nextTask) {
1770 beginStreamTransitionAsync<Traits>(std::move(nextTask),
1771 *schedule_,
1772 i,
1773 transitionInfo,
1774 serviceToken_,
1775 subProcesses_);
1776 }) |
1777 then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1778 auto nextTask) {
1779 if (exceptionFromBeginStreamLumi) {
1780 WaitingTaskHolder copyHolder(nextTask);
1781 copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1782 }
1783 handleNextEventForStreamAsync(std::move(nextTask), i);
1784 }) |
1785 runLast(std::move(holder));
1786 });
1787 }
1788 });
1789 }
1790 }) | runLast(postSourceTask);
1791 } catch (...) {
1792 status->resetResources();
1793 queueWhichWaitsForIOVsToFinish_.resume();
1794 WaitingTaskHolder copyHolder(postSourceTask);
1795 copyHolder.doneWaiting(std::current_exception());
1796 endRunAsync(iRunStatus, postSourceTask);
1797 }
1798 });
1799 } catch (...) {
1800 status->resetResources();
1801 queueWhichWaitsForIOVsToFinish_.resume();
1802 WaitingTaskHolder copyHolder(postLumiQueueTask);
1803 copyHolder.doneWaiting(std::current_exception());
1804 endRunAsync(iRunStatus, postLumiQueueTask);
1805 }
1806 });
1807 } catch (...) {
1808 status->resetResources();
1809 queueWhichWaitsForIOVsToFinish_.resume();
1810 WaitingTaskHolder copyHolder(nextTask);
1811 copyHolder.doneWaiting(std::current_exception());
1812 endRunAsync(iRunStatus, nextTask);
1813 }
1814 }) | chain::runLast(std::move(iHolder));
1815 }
1816
1817 void EventProcessor::continueLumiAsync(edm::WaitingTaskHolder iHolder) {
1818 chain::first([this](auto nextTask) {
1819
1820 auto status = streamLumiStatus_[0];
1821 status->setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kProcessing);
1822
1823 while (lastTransitionType() == InputSource::ItemType::IsLumi and
1824 status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1825 readAndMergeLumi(*status);
1826 nextTransitionType();
1827 }
1828 }) | chain::then([this](auto nextTask) mutable {
1829 unsigned int streamIndex = 0;
1830 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1831 for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1832 arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1833 }
1834 nextTask.group()->run(
1835 [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1836 }) | chain::runLast(std::move(iHolder));
1837 }
1838
1839 void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1840 if (holder.taskHasFailed()) {
1841 setExceptionMessageLumis();
1842 } else {
1843 WaitingTaskHolder tmp(holder);
1844 tmp.doneWaiting(iException);
1845 }
1846 }
1847
1848 void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask,
1849 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1850
1851
1852 auto& lp = *(iLumiStatus->lumiPrincipal());
1853 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1854 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1855 EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1856 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1857
1858 using namespace edm::waiting_task::chain;
1859 chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1860 IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1861
1862 LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1863 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1864 endGlobalTransitionAsync<Traits>(
1865 std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1866 }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1867
1868 if (didGlobalBeginSucceed) {
1869 writeLumiAsync(std::move(nextTask), lumiPrincipal);
1870 }
1871 }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1872 looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1873 }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1874
1875 ServiceRegistry::Operate operate(serviceToken_);
1876 looper_->doEndLuminosityBlock(lp, es, &processContext_);
1877 }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1878 if (iException) {
1879 handleEndLumiExceptions(*iException, nextTask);
1880 }
1881 ServiceRegistry::Operate operate(serviceToken_);
1882
1883 std::exception_ptr ptr;
1884
1885
1886
1887
1888
1889 CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1890 if (not ptr) {
1891 ptr = std::current_exception();
1892 }
1893 }
1894
1895 CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1896 if (not ptr) {
1897 ptr = std::current_exception();
1898 }
1899 }
1900
1901 CMS_SA_ALLOW try {
1902 status->resetResources();
1903 status->globalEndRunHolderDoneWaiting();
1904 status.reset();
1905 } catch (...) {
1906 if (not ptr) {
1907 ptr = std::current_exception();
1908 }
1909 }
1910
1911 if (ptr && !iException) {
1912 handleEndLumiExceptions(ptr, nextTask);
1913 }
1914 }) | runLast(std::move(iTask));
1915 }
1916
1917 void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1918 auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1919 auto status = streamLumiStatus_[iStreamIndex];
1920 if (iException) {
1921 handleEndLumiExceptions(*iException, iTask);
1922 }
1923
1924
1925 streamLumiStatus_[iStreamIndex].reset();
1926 --streamLumiActive_;
1927 streamQueues_[iStreamIndex].resume();
1928
1929
1930 if (status->streamFinishedLumi()) {
1931 globalEndLumiAsync(iTask, std::move(status));
1932 }
1933 });
1934
1935 edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1936
1937
1938
1939 auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1940 lumiStatus->setEndTime();
1941
1942 EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1943 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1944 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1945
1946 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1947 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1948 LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1949 endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1950 *schedule_,
1951 iStreamIndex,
1952 transitionInfo,
1953 serviceToken_,
1954 subProcesses_,
1955 cleaningUpAfterException);
1956 }
1957
1958 void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1959 if (streamRunActive_ == 0) {
1960 assert(streamLumiActive_ == 0);
1961 } else {
1962 assert(streamRunActive_ == preallocations_.numberOfStreams());
1963 if (streamLumiActive_ > 0) {
1964 FinalWaitingTask globalWaitTask{taskGroup_};
1965 assert(streamLumiActive_ == preallocations_.numberOfStreams());
1966 streamLumiStatus_[0]->noMoreEventsInLumi();
1967 streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1968 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1969 streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1970 }
1971 globalWaitTask.wait();
1972 }
1973 }
1974 }
1975
1976 void EventProcessor::readProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
1977 SendSourceTerminationSignalIfException sentry(actReg_.get());
1978 input_->readProcessBlock(processBlockPrincipal);
1979 sentry.completedSuccessfully();
1980 }
1981
1982 std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
1983 auto rp = principalCache_.getAvailableRunPrincipalPtr();
1984 assert(rp);
1985 rp->setAux(*input_->runAuxiliary());
1986 {
1987 SendSourceTerminationSignalIfException sentry(actReg_.get());
1988 input_->readRun(*rp, *historyAppender_);
1989 sentry.completedSuccessfully();
1990 }
1991 assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1992 return rp;
1993 }
1994
1995 void EventProcessor::readAndMergeRun(RunProcessingStatus& iStatus) {
1996 RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1997
1998 bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1999 assert(runOK);
2000 runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
2001 {
2002 SendSourceTerminationSignalIfException sentry(actReg_.get());
2003 input_->readAndMergeRun(runPrincipal);
2004 sentry.completedSuccessfully();
2005 }
2006 }
2007
2008 std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
2009 auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
2010 assert(lbp);
2011 lbp->setAux(*input_->luminosityBlockAuxiliary());
2012 {
2013 SendSourceTerminationSignalIfException sentry(actReg_.get());
2014 input_->readLuminosityBlock(*lbp, *historyAppender_);
2015 sentry.completedSuccessfully();
2016 }
2017 lbp->setRunPrincipal(std::move(rp));
2018 return lbp;
2019 }
2020
2021 void EventProcessor::readAndMergeLumi(LuminosityBlockProcessingStatus& iStatus) {
2022 auto& lumiPrincipal = *iStatus.lumiPrincipal();
2023 assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
2024 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2025 input_->processHistoryRegistry().reducedProcessHistoryID(
2026 input_->luminosityBlockAuxiliary()->processHistoryID()));
2027 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2028 assert(lumiOK);
2029 lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2030 {
2031 SendSourceTerminationSignalIfException sentry(actReg_.get());
2032 input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2033 sentry.completedSuccessfully();
2034 }
2035 }
2036
2037 void EventProcessor::writeProcessBlockAsync(WaitingTaskHolder task, ProcessBlockType processBlockType) {
2038 using namespace edm::waiting_task;
2039 chain::first([&](auto nextTask) {
2040 ServiceRegistry::Operate op(serviceToken_);
2041 schedule_->writeProcessBlockAsync(
2042 nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2043 }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2044 ServiceRegistry::Operate op(serviceToken_);
2045 for (auto& s : subProcesses_) {
2046 s.writeProcessBlockAsync(nextTask, processBlockType);
2047 }
2048 }) | chain::runLast(std::move(task));
2049 }
2050
2051 void EventProcessor::writeRunAsync(WaitingTaskHolder task,
2052 RunPrincipal const& runPrincipal,
2053 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
2054 using namespace edm::waiting_task;
2055 if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2056 chain::first([&](auto nextTask) {
2057 ServiceRegistry::Operate op(serviceToken_);
2058 schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2059 }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2060 ServiceRegistry::Operate op(serviceToken_);
2061 for (auto& s : subProcesses_) {
2062 s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2063 }
2064 }) | chain::runLast(std::move(task));
2065 }
2066 }
2067
2068 void EventProcessor::clearRunPrincipal(RunProcessingStatus& iStatus) {
2069 for (auto& s : subProcesses_) {
2070 s.clearRunPrincipal(*iStatus.runPrincipal());
2071 }
2072 iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2073 iStatus.runPrincipal()->clearPrincipal();
2074 }
2075
2076 void EventProcessor::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& lumiPrincipal) {
2077 using namespace edm::waiting_task;
2078 if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2079 chain::first([&](auto nextTask) {
2080 ServiceRegistry::Operate op(serviceToken_);
2081
2082 lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2083 schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2084 }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2085 ServiceRegistry::Operate op(serviceToken_);
2086 for (auto& s : subProcesses_) {
2087 s.writeLumiAsync(nextTask, lumiPrincipal);
2088 }
2089 }) | chain::lastTask(std::move(task));
2090 }
2091 }
2092
2093 void EventProcessor::clearLumiPrincipal(LuminosityBlockProcessingStatus& iStatus) {
2094 for (auto& s : subProcesses_) {
2095 s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2096 }
2097 iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2098 iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2099 iStatus.lumiPrincipal()->clearPrincipal();
2100 }
2101
2102 void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2103 WaitingTaskHolder iHolder) {
2104 auto group = iHolder.group();
2105 sourceResourcesAcquirer_.serialQueueChain().push(
2106 *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2107 CMS_SA_ALLOW try {
2108 ServiceRegistry::Operate operate(serviceToken_);
2109
2110 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2111
2112 nextTransitionType();
2113 setNeedToCallNext(false);
2114
2115 while (lastTransitionType() == InputSource::ItemType::IsRun and
2116 status->runPrincipal()->run() == input_->run() and
2117 status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2118 if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2119 status->setStopBeforeProcessingRun(true);
2120 return;
2121 }
2122 readAndMergeRun(*status);
2123 if (lastTransitionType().itemPosition() == InputSource::ItemPosition::LastItemToBeMerged) {
2124 setNeedToCallNext(true);
2125 return;
2126 }
2127 nextTransitionType();
2128 }
2129 } catch (...) {
2130 status->setStopBeforeProcessingRun(true);
2131 holder.doneWaiting(std::current_exception());
2132 }
2133 });
2134 }
2135
2136 void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2137 WaitingTaskHolder iHolder) {
2138 auto group = iHolder.group();
2139 sourceResourcesAcquirer_.serialQueueChain().push(
2140 *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2141 CMS_SA_ALLOW try {
2142 ServiceRegistry::Operate operate(serviceToken_);
2143
2144 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2145
2146 nextTransitionType();
2147 setNeedToCallNext(false);
2148
2149 while (lastTransitionType() == InputSource::ItemType::IsLumi and
2150 iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2151 readAndMergeLumi(*iLumiStatus);
2152 if (lastTransitionType().itemPosition() == InputSource::ItemPosition::LastItemToBeMerged) {
2153 setNeedToCallNext(true);
2154 return;
2155 }
2156 nextTransitionType();
2157 }
2158 } catch (...) {
2159 holder.doneWaiting(std::current_exception());
2160 }
2161 });
2162 }
2163
2164 void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2165 WaitingTaskHolder iHolder) {
2166 chain::first([this, iRunStatus](auto nextTask) mutable {
2167 if (needToCallNext()) {
2168 nextTransitionTypeAsync(std::move(iRunStatus), std::move(nextTask));
2169 }
2170 }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
2171 ServiceRegistry::Operate operate(serviceToken_);
2172 if (iException) {
2173 WaitingTaskHolder copyHolder(nextTask);
2174 copyHolder.doneWaiting(*iException);
2175 }
2176 if (lastTransitionType() == InputSource::ItemType::IsFile) {
2177 iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2178 return;
2179 }
2180 if (lastTransitionType() == InputSource::ItemType::IsLumi && !nextTask.taskHasFailed()) {
2181 CMS_SA_ALLOW try {
2182 beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2183 input_->luminosityBlockAuxiliary()->beginTime()),
2184 iRunStatus,
2185 nextTask);
2186 return;
2187 } catch (...) {
2188 WaitingTaskHolder copyHolder(nextTask);
2189 copyHolder.doneWaiting(std::current_exception());
2190 }
2191 }
2192
2193
2194 endRunAsync(iRunStatus, std::move(nextTask));
2195 }) | chain::runLast(std::move(iHolder));
2196 }
2197
2198 bool EventProcessor::readNextEventForStream(WaitingTaskHolder const& iTask,
2199 unsigned int iStreamIndex,
2200 LuminosityBlockProcessingStatus& iStatus) {
2201
2202
2203
2204 if (iTask.taskHasFailed()) {
2205
2206
2207
2208 if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2209 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2210 }
2211 return false;
2212 }
2213
2214
2215 if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2216 return false;
2217 }
2218
2219
2220 if (shouldWeStop()) {
2221 lastSourceTransition_ = InputSource::ItemType::IsStop;
2222 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2223 return false;
2224 }
2225
2226 ServiceRegistry::Operate operate(serviceToken_);
2227
2228
2229
2230
2231 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2232
2233
2234
2235
2236 InputSource::ItemType itemType = needToCallNext() ? nextTransitionType() : lastTransitionType();
2237 setNeedToCallNext(true);
2238
2239 if (InputSource::ItemType::IsEvent != itemType) {
2240
2241
2242
2243 if (InputSource::ItemType::IsStop == itemType or InputSource::ItemType::IsLumi == itemType or
2244 (InputSource::ItemType::IsRun == itemType and
2245 (iStatus.lumiPrincipal()->run() != input_->run() or
2246 iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2247 if (itemType == InputSource::ItemType::IsLumi &&
2248 iStatus.lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2249 throw Exception(errors::LogicError)
2250 << "InputSource claimed previous Lumi Entry was last to be merged in this file,\n"
2251 << "but the next lumi entry has the same lumi number.\n"
2252 << "This is probably a bug in the InputSource. Please report to the Core group.\n";
2253 }
2254 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2255 } else {
2256 iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2257 }
2258 return false;
2259 }
2260 readEvent(iStreamIndex);
2261 return true;
2262 }
2263
2264 void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2265 if (streamLumiStatus_[iStreamIndex]->haveStartedNextLumiOrEndedRun()) {
2266 streamEndLumiAsync(iTask, iStreamIndex);
2267 return;
2268 }
2269 auto group = iTask.group();
2270 sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2271 CMS_SA_ALLOW try {
2272 auto status = streamLumiStatus_[iStreamIndex].get();
2273 ServiceRegistry::Operate operate(serviceToken_);
2274
2275 if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2276 auto recursionTask =
2277 make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2278 if (iEventException) {
2279 WaitingTaskHolder copyHolder(iTask);
2280 copyHolder.doneWaiting(*iEventException);
2281
2282
2283
2284 }
2285 handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2286 });
2287
2288 processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2289 } else {
2290
2291 if (status->eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi) {
2292 if (not status->haveStartedNextLumiOrEndedRun()) {
2293 status->noMoreEventsInLumi();
2294 status->startNextLumiOrEndRun();
2295 if (lastTransitionType() == InputSource::ItemType::IsLumi && !iTask.taskHasFailed()) {
2296 CMS_SA_ALLOW try {
2297 beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2298 input_->luminosityBlockAuxiliary()->beginTime()),
2299 streamRunStatus_[iStreamIndex],
2300 iTask);
2301 } catch (...) {
2302 WaitingTaskHolder copyHolder(iTask);
2303 copyHolder.doneWaiting(std::current_exception());
2304 endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2305 }
2306 } else {
2307
2308 endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2309 }
2310 }
2311 streamEndLumiAsync(iTask, iStreamIndex);
2312 } else {
2313 assert(status->eventProcessingState() ==
2314 LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2315 auto runStatus = streamRunStatus_[iStreamIndex].get();
2316
2317 if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2318 runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2319 }
2320 }
2321 }
2322 } catch (...) {
2323 WaitingTaskHolder copyHolder(iTask);
2324 copyHolder.doneWaiting(std::current_exception());
2325 handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2326 }
2327 });
2328 }
2329
2330 void EventProcessor::readEvent(unsigned int iStreamIndex) {
2331
2332 auto& event = principalCache_.eventPrincipal(iStreamIndex);
2333 StreamContext streamContext(event.streamID(), &processContext_);
2334
2335 SendSourceTerminationSignalIfException sentry(actReg_.get());
2336 input_->readEvent(event, streamContext);
2337
2338 streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2339 streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2340 sentry.completedSuccessfully();
2341
2342 FDEBUG(1) << "\treadEvent\n";
2343 }
2344
2345 void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2346 iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2347 }
2348
2349 namespace {
2350 struct ClearEventGuard {
2351 ClearEventGuard(edm::ActivityRegistry& iReg, edm::StreamContext const& iContext)
2352 : act_(iReg), context_(iContext) {
2353 iReg.preClearEventSignal_.emit(iContext);
2354 }
2355 ~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
2356 edm::ActivityRegistry& act_;
2357 edm::StreamContext const& context_;
2358 };
2359 }
2360
2361 void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2362 auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2363
2364 ServiceRegistry::Operate operate(serviceToken_);
2365 Service<RandomNumberGenerator> rng;
2366 if (rng.isAvailable()) {
2367 Event ev(*pep, ModuleDescription(), nullptr);
2368 rng->postEventRead(ev);
2369 }
2370
2371 EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2372 using namespace edm::waiting_task::chain;
2373 chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2374 EventTransitionInfo info(*pep, es);
2375 schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2376 }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2377 for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2378 subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2379 }
2380 }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2381
2382 ServiceRegistry::Operate operateLooper(serviceToken_);
2383 processEventWithLooper(*pep, iStreamIndex);
2384 }) | then([this, pep](auto nextTask) {
2385 FDEBUG(1) << "\tprocessEvent\n";
2386 StreamContext streamContext(pep->streamID(),
2387 StreamContext::Transition::kEvent,
2388 pep->id(),
2389 pep->runPrincipal().index(),
2390 pep->luminosityBlockPrincipal().index(),
2391 pep->time(),
2392 &processContext_);
2393 ClearEventGuard guard(*this->actReg_.get(), streamContext);
2394 pep->clearEventPrincipal();
2395 }) | runLast(iHolder);
2396 }
2397
2398 void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2399 bool randomAccess = input_->randomAccess();
2400 ProcessingController::ForwardState forwardState = input_->forwardState();
2401 ProcessingController::ReverseState reverseState = input_->reverseState();
2402 ProcessingController pc(forwardState, reverseState, randomAccess);
2403
2404 EDLooperBase::Status status = EDLooperBase::kContinue;
2405 do {
2406 StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2407 EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2408 status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2409
2410 bool succeeded = true;
2411 if (randomAccess) {
2412 if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2413 input_->skipEvents(-2);
2414 } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2415 succeeded = input_->goToEvent(pc.specifiedEventTransition());
2416 }
2417 }
2418 pc.setLastOperationSucceeded(succeeded);
2419 } while (!pc.lastOperationSucceeded());
2420 if (status != EDLooperBase::kContinue) {
2421 shouldWeStop_ = true;
2422 }
2423 }
2424
2425 bool EventProcessor::shouldWeStop() const {
2426 FDEBUG(1) << "\tshouldWeStop\n";
2427 if (shouldWeStop_)
2428 return true;
2429 if (!subProcesses_.empty()) {
2430 for (auto const& subProcess : subProcesses_) {
2431 if (subProcess.terminate()) {
2432 return true;
2433 }
2434 }
2435 return false;
2436 }
2437 return schedule_->terminate();
2438 }
2439
2440 void EventProcessor::setExceptionMessageFiles(std::string& message) { exceptionMessageFiles_ = message; }
2441
2442 void EventProcessor::setExceptionMessageRuns() { exceptionMessageRuns_ = true; }
2443
2444 void EventProcessor::setExceptionMessageLumis() { exceptionMessageLumis_ = true; }
2445
2446 bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2447 bool expected = false;
2448 if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2449 deferredExceptionPtr_ = iException;
2450 return true;
2451 }
2452 return false;
2453 }
2454
2455 void EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization() const {
2456 cms::Exception ex("ModulesSynchingOnLumis");
2457 ex << "The framework is configured to use at least two streams, but the following modules\n"
2458 << "require synchronizing on LuminosityBlock boundaries:";
2459 bool found = false;
2460 for (auto worker : schedule_->allWorkers()) {
2461 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2462 found = true;
2463 ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2464 }
2465 }
2466 if (found) {
2467 ex << "\n\nThe situation can be fixed by either\n"
2468 << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2469 << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2470 throw ex;
2471 }
2472 }
2473
2474 void EventProcessor::warnAboutModulesRequiringRunSynchronization() const {
2475 std::unique_ptr<LogSystem> s;
2476 for (auto worker : schedule_->allWorkers()) {
2477 if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2478 if (not s) {
2479 s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2480 (*s) << "The following modules require synchronizing on Run boundaries:";
2481 }
2482 (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2483 }
2484 }
2485 }
2486 }