File indexing completed on 2022-06-03 00:59:02
0001 #include "FWCore/Framework/interface/EventProcessor.h"
0002 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0003 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0004 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0005 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0007 #include "DataFormats/Provenance/interface/SubProcessParentageHelper.h"
0008
0009 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0010 #include "FWCore/Framework/src/CommonParams.h"
0011 #include "FWCore/Framework/interface/EDLooperBase.h"
0012 #include "FWCore/Framework/interface/EventPrincipal.h"
0013 #include "FWCore/Framework/interface/EventSetupProvider.h"
0014 #include "FWCore/Framework/interface/EventSetupRecord.h"
0015 #include "FWCore/Framework/interface/FileBlock.h"
0016 #include "FWCore/Framework/interface/HistoryAppender.h"
0017 #include "FWCore/Framework/interface/InputSourceDescription.h"
0018 #include "FWCore/Framework/interface/IOVSyncValue.h"
0019 #include "FWCore/Framework/interface/LooperFactory.h"
0020 #include "FWCore/Framework/interface/LuminosityBlock.h"
0021 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0022 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0023 #include "FWCore/Framework/interface/ModuleChanger.h"
0024 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0025 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0026 #include "FWCore/Framework/interface/ProcessingController.h"
0027 #include "FWCore/Framework/interface/RunPrincipal.h"
0028 #include "FWCore/Framework/interface/Schedule.h"
0029 #include "FWCore/Framework/interface/ScheduleInfo.h"
0030 #include "FWCore/Framework/interface/ScheduleItems.h"
0031 #include "FWCore/Framework/interface/SubProcess.h"
0032 #include "FWCore/Framework/interface/Event.h"
0033 #include "FWCore/Framework/interface/ESRecordsToProxyIndices.h"
0034 #include "FWCore/Framework/src/Breakpoints.h"
0035 #include "FWCore/Framework/interface/EventSetupsController.h"
0036 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0037 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0038 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0039 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0040 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0041 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0042
0043 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0044
0045 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0046 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
0047 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0048 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0049 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0050 #include "FWCore/ParameterSet/interface/Registry.h"
0051 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0052
0053 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0054 #include "FWCore/ServiceRegistry/interface/Service.h"
0055 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0056 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0057
0058 #include "FWCore/Concurrency/interface/WaitingTask.h"
0059 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0060 #include "FWCore/Concurrency/interface/chain_first.h"
0061
0062 #include "FWCore/Utilities/interface/Algorithms.h"
0063 #include "FWCore/Utilities/interface/DebugMacros.h"
0064 #include "FWCore/Utilities/interface/EDMException.h"
0065 #include "FWCore/Utilities/interface/Exception.h"
0066 #include "FWCore/Utilities/interface/ConvertException.h"
0067 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0068 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0069 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0070 #include "FWCore/Utilities/interface/StreamID.h"
0071 #include "FWCore/Utilities/interface/RootHandlers.h"
0072 #include "FWCore/Utilities/interface/propagate_const.h"
0073 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0074
0075 #include "MessageForSource.h"
0076 #include "MessageForParent.h"
0077 #include "LuminosityBlockProcessingStatus.h"
0078
0079 #include "boost/range/adaptor/reversed.hpp"
0080
0081 #include <cassert>
0082 #include <exception>
0083 #include <iomanip>
0084 #include <iostream>
0085 #include <utility>
0086 #include <sstream>
0087
0088 #include <sys/ipc.h>
0089 #include <sys/msg.h>
0090
0091 #include "oneapi/tbb/task.h"
0092
0093
0094 #ifndef __APPLE__
0095 #include <sched.h>
0096 #endif
0097
0098 namespace {
0099
0100
0101
0102
0103 class SendSourceTerminationSignalIfException {
0104 public:
0105 SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
0106 ~SendSourceTerminationSignalIfException() {
0107 if (reg_) {
0108 reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
0109 }
0110 }
0111 void completedSuccessfully() { reg_ = nullptr; }
0112
0113 private:
0114 edm::ActivityRegistry* reg_;
0115 };
0116 }
0117
0118 namespace edm {
0119
0120 namespace chain = waiting_task::chain;
0121
0122
0123 std::unique_ptr<InputSource> makeInput(ParameterSet& params,
0124 CommonParams const& common,
0125 std::shared_ptr<ProductRegistry> preg,
0126 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0127 std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0128 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0129 std::shared_ptr<ActivityRegistry> areg,
0130 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0131 PreallocationConfiguration const& allocations) {
0132 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0133 if (main_input == nullptr) {
0134 throw Exception(errors::Configuration)
0135 << "There must be exactly one source in the configuration.\n"
0136 << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0137 }
0138
0139 std::string modtype(main_input->getParameter<std::string>("@module_type"));
0140
0141 std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0142 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0143 ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0144 filler->fill(descriptions);
0145
0146 try {
0147 convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
0148 } catch (cms::Exception& iException) {
0149 std::ostringstream ost;
0150 ost << "Validating configuration of input source of type " << modtype;
0151 iException.addContext(ost.str());
0152 throw;
0153 }
0154
0155 main_input->registerIt();
0156
0157
0158
0159
0160
0161
0162
0163 ModuleDescription md(main_input->id(),
0164 main_input->getParameter<std::string>("@module_type"),
0165 "source",
0166 processConfiguration.get(),
0167 ModuleDescription::getUniqueID());
0168
0169 InputSourceDescription isdesc(md,
0170 preg,
0171 branchIDListHelper,
0172 processBlockHelper,
0173 thinnedAssociationsHelper,
0174 areg,
0175 common.maxEventsInput_,
0176 common.maxLumisInput_,
0177 common.maxSecondsUntilRampdown_,
0178 allocations);
0179
0180 areg->preSourceConstructionSignal_(md);
0181 std::unique_ptr<InputSource> input;
0182 try {
0183
0184 std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
0185 convertException::wrap([&]() {
0186 input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
0187 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
0188 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
0189 });
0190 } catch (cms::Exception& iException) {
0191 std::ostringstream ost;
0192 ost << "Constructing input source of type " << modtype;
0193 iException.addContext(ost.str());
0194 throw;
0195 }
0196 return input;
0197 }
0198
0199
0200 void validateLooper(ParameterSet& pset) {
0201 auto const modtype = pset.getParameter<std::string>("@module_type");
0202 auto const moduleLabel = pset.getParameter<std::string>("@module_label");
0203 auto filler = ParameterSetDescriptionFillerPluginFactory::get()->create(modtype);
0204 ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0205 filler->fill(descriptions);
0206 try {
0207 edm::convertException::wrap([&]() { descriptions.validate(pset, moduleLabel); });
0208 } catch (cms::Exception& iException) {
0209 iException.addContext(
0210 fmt::format("Validating configuration of EDLooper of type {} with label: '{}'", modtype, moduleLabel));
0211 throw;
0212 }
0213 }
0214
0215 std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
0216 eventsetup::EventSetupProvider& cp,
0217 ParameterSet& params,
0218 std::vector<std::string> const& loopers) {
0219 std::shared_ptr<EDLooperBase> vLooper;
0220
0221 assert(1 == loopers.size());
0222
0223 for (auto const& looperName : loopers) {
0224 ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
0225 validateLooper(*providerPSet);
0226 providerPSet->registerIt();
0227 vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
0228 }
0229 return vLooper;
0230 }
0231
0232
0233 EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0234 ServiceToken const& iToken,
0235 serviceregistry::ServiceLegacy iLegacy,
0236 std::vector<std::string> const& defaultServices,
0237 std::vector<std::string> const& forcedServices)
0238 : actReg_(),
0239 preg_(),
0240 branchIDListHelper_(),
0241 serviceToken_(),
0242 input_(),
0243 espController_(new eventsetup::EventSetupsController),
0244 esp_(),
0245 act_table_(),
0246 processConfiguration_(),
0247 schedule_(),
0248 subProcesses_(),
0249 historyAppender_(new HistoryAppender),
0250 fb_(),
0251 looper_(),
0252 deferredExceptionPtrIsSet_(false),
0253 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0254 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0255 principalCache_(),
0256 beginJobCalled_(false),
0257 shouldWeStop_(false),
0258 fileModeNoMerge_(false),
0259 exceptionMessageFiles_(),
0260 exceptionMessageRuns_(false),
0261 exceptionMessageLumis_(false),
0262 forceLooperToEnd_(false),
0263 looperBeginJobRun_(false),
0264 forceESCacheClearOnNewRun_(false),
0265 eventSetupDataToExcludeFromPrefetching_() {
0266 auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0267 processDesc->addServices(defaultServices, forcedServices);
0268 init(processDesc, iToken, iLegacy);
0269 }
0270
0271 EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0272 std::vector<std::string> const& defaultServices,
0273 std::vector<std::string> const& forcedServices)
0274 : actReg_(),
0275 preg_(),
0276 branchIDListHelper_(),
0277 serviceToken_(),
0278 input_(),
0279 espController_(new eventsetup::EventSetupsController),
0280 esp_(),
0281 act_table_(),
0282 processConfiguration_(),
0283 schedule_(),
0284 subProcesses_(),
0285 historyAppender_(new HistoryAppender),
0286 fb_(),
0287 looper_(),
0288 deferredExceptionPtrIsSet_(false),
0289 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0290 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0291 principalCache_(),
0292 beginJobCalled_(false),
0293 shouldWeStop_(false),
0294 fileModeNoMerge_(false),
0295 exceptionMessageFiles_(),
0296 exceptionMessageRuns_(false),
0297 exceptionMessageLumis_(false),
0298 forceLooperToEnd_(false),
0299 looperBeginJobRun_(false),
0300 forceESCacheClearOnNewRun_(false),
0301 eventSetupDataToExcludeFromPrefetching_() {
0302 auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0303 processDesc->addServices(defaultServices, forcedServices);
0304 init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
0305 }
0306
0307 EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0308 ServiceToken const& token,
0309 serviceregistry::ServiceLegacy legacy)
0310 : actReg_(),
0311 preg_(),
0312 branchIDListHelper_(),
0313 serviceToken_(),
0314 input_(),
0315 espController_(new eventsetup::EventSetupsController),
0316 esp_(),
0317 act_table_(),
0318 processConfiguration_(),
0319 schedule_(),
0320 subProcesses_(),
0321 historyAppender_(new HistoryAppender),
0322 fb_(),
0323 looper_(),
0324 deferredExceptionPtrIsSet_(false),
0325 sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0326 sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0327 principalCache_(),
0328 beginJobCalled_(false),
0329 shouldWeStop_(false),
0330 fileModeNoMerge_(false),
0331 exceptionMessageFiles_(),
0332 exceptionMessageRuns_(false),
0333 exceptionMessageLumis_(false),
0334 forceLooperToEnd_(false),
0335 looperBeginJobRun_(false),
0336 forceESCacheClearOnNewRun_(false),
0337 eventSetupDataToExcludeFromPrefetching_() {
0338 init(processDesc, token, legacy);
0339 }
0340
0341 void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
0342 ServiceToken const& iToken,
0343 serviceregistry::ServiceLegacy iLegacy) {
0344
0345
0346
0347 ParentageRegistry::instance()->insertMapped(Parentage());
0348
0349
0350 ParameterSet().registerIt();
0351
0352 std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
0353
0354
0355 auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
0356 bool const hasSubProcesses = !subProcessVParameterSet.empty();
0357
0358
0359
0360
0361 validateTopLevelParameterSets(parameterSet.get());
0362
0363
0364 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
0365 auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
0366 if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
0367 throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
0368 << fileMode << ".\n"
0369 << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
0370 } else {
0371 fileModeNoMerge_ = (fileMode == "NOMERGE");
0372 }
0373 forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
0374 ensureAvailableAccelerators(*parameterSet);
0375
0376
0377 unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
0378
0379
0380
0381 assert(nThreads != 0);
0382
0383 unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
0384 if (nStreams == 0) {
0385 nStreams = nThreads;
0386 }
0387 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
0388 if (nConcurrentRuns != 1) {
0389 throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
0390 << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
0391 }
0392 unsigned int nConcurrentLumis =
0393 optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
0394 if (nConcurrentLumis == 0) {
0395 nConcurrentLumis = 2;
0396 }
0397 if (nConcurrentLumis > nStreams) {
0398 nConcurrentLumis = nStreams;
0399 }
0400 std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
0401 if (!loopers.empty()) {
0402
0403 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
0404 edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
0405 "of concurrent runs, and the number of concurrent lumis "
0406 "are all being reset to 1. Loopers cannot currently support "
0407 "values greater than 1.";
0408 nStreams = 1;
0409 nConcurrentLumis = 1;
0410 nConcurrentRuns = 1;
0411 }
0412 }
0413 bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
0414 if (dumpOptions) {
0415 dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
0416 } else {
0417 if (nThreads > 1 or nStreams > 1) {
0418 edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
0419 }
0420 }
0421
0422
0423 unsigned int maxConcurrentIOVs = nConcurrentLumis;
0424
0425
0426
0427
0428
0429
0430
0431
0432
0433
0434
0435
0436
0437 IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
0438
0439 printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
0440 deleteNonConsumedUnscheduledModules_ =
0441 optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
0442
0443
0444 if (not hasSubProcesses) {
0445 branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
0446 }
0447
0448
0449 ScheduleItems items;
0450
0451
0452 auto& serviceSets = processDesc->getServicesPSets();
0453 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
0454 serviceToken_ = items.addCPRandTNS(*parameterSet, token);
0455
0456
0457 ServiceRegistry::Operate operate(serviceToken_);
0458
0459 CMS_SA_ALLOW try {
0460 if (nStreams > 1) {
0461 edm::Service<RootHandlers> handler;
0462 handler->willBeUsingThreads();
0463 }
0464
0465
0466 std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
0467
0468
0469 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
0470 esp_ = espController_->makeProvider(
0471 *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
0472
0473
0474 if (!loopers.empty()) {
0475 looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
0476 looper_->setActionTable(items.act_table_.get());
0477 looper_->attachTo(*items.actReg_);
0478
0479
0480 deleteNonConsumedUnscheduledModules_ = false;
0481 }
0482
0483 preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0484
0485 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
0486 streamQueues_.resize(nStreams);
0487 streamLumiStatus_.resize(nStreams);
0488
0489 processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0490
0491
0492 input_ = makeInput(*parameterSet,
0493 *common,
0494 items.preg(),
0495 items.branchIDListHelper(),
0496 get_underlying_safe(processBlockHelper_),
0497 items.thinnedAssociationsHelper(),
0498 items.actReg_,
0499 items.processConfiguration(),
0500 preallocations_);
0501
0502
0503 schedule_ =
0504 items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_, *processBlockHelper_);
0505
0506
0507 act_table_ = std::move(items.act_table_);
0508 actReg_ = items.actReg_;
0509 preg_ = items.preg();
0510 mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0511 branchIDListHelper_ = items.branchIDListHelper();
0512 thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0513 processConfiguration_ = items.processConfiguration();
0514 processContext_.setProcessConfiguration(processConfiguration_.get());
0515 principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
0516
0517 FDEBUG(2) << parameterSet << std::endl;
0518
0519 principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0520 for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0521
0522 auto ep = std::make_shared<EventPrincipal>(preg(),
0523 branchIDListHelper(),
0524 thinnedAssociationsHelper(),
0525 *processConfiguration_,
0526 historyAppender_.get(),
0527 index,
0528 true ,
0529 &*processBlockHelper_);
0530 principalCache_.insert(std::move(ep));
0531 }
0532
0533 for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0534 auto lp =
0535 std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
0536 principalCache_.insert(std::move(lp));
0537 }
0538
0539 {
0540 auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0541 principalCache_.insert(std::move(pb));
0542
0543 auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0544 principalCache_.insertForInput(std::move(pbForInput));
0545 }
0546
0547
0548 subProcesses_.reserve(subProcessVParameterSet.size());
0549 for (auto& subProcessPSet : subProcessVParameterSet) {
0550 subProcesses_.emplace_back(subProcessPSet,
0551 *parameterSet,
0552 preg(),
0553 branchIDListHelper(),
0554 *processBlockHelper_,
0555 *thinnedAssociationsHelper_,
0556 SubProcessParentageHelper(),
0557 *espController_,
0558 *actReg_,
0559 token,
0560 serviceregistry::kConfigurationOverrides,
0561 preallocations_,
0562 &processContext_);
0563 }
0564 } catch (...) {
0565
0566
0567 espController_ = nullptr;
0568 esp_ = nullptr;
0569 schedule_ = nullptr;
0570 input_ = nullptr;
0571 looper_ = nullptr;
0572 actReg_ = nullptr;
0573 throw;
0574 }
0575 }
0576
0577 EventProcessor::~EventProcessor() {
0578
0579 ServiceToken token = getToken();
0580 ServiceRegistry::Operate op(token);
0581
0582
0583
0584 espController_ = nullptr;
0585 esp_ = nullptr;
0586 schedule_ = nullptr;
0587 input_ = nullptr;
0588 looper_ = nullptr;
0589 actReg_ = nullptr;
0590
0591 pset::Registry::instance()->clear();
0592 ParentageRegistry::instance()->clear();
0593 }
0594
0595 void EventProcessor::taskCleanup() {
0596 edm::FinalWaitingTask task;
0597 espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
0598 taskGroup_.wait();
0599 assert(task.done());
0600 }
0601
0602 void EventProcessor::beginJob() {
0603 if (beginJobCalled_)
0604 return;
0605 beginJobCalled_ = true;
0606 bk::beginJob();
0607
0608
0609
0610 ServiceRegistry::Operate operate(serviceToken_);
0611
0612 service::SystemBounds bounds(preallocations_.numberOfStreams(),
0613 preallocations_.numberOfLuminosityBlocks(),
0614 preallocations_.numberOfRuns(),
0615 preallocations_.numberOfThreads());
0616 actReg_->preallocateSignal_(bounds);
0617 schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0618 pathsAndConsumesOfModules_.initialize(schedule_.get(), preg());
0619
0620 std::vector<ModuleProcessName> consumedBySubProcesses;
0621 for_all(subProcesses_,
0622 [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
0623 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
0624 if (consumedBySubProcesses.empty()) {
0625 consumedBySubProcesses = std::move(c);
0626 } else if (not c.empty()) {
0627 std::vector<ModuleProcessName> tmp;
0628 tmp.reserve(consumedBySubProcesses.size() + c.size());
0629 std::merge(consumedBySubProcesses.begin(),
0630 consumedBySubProcesses.end(),
0631 c.begin(),
0632 c.end(),
0633 std::back_inserter(tmp));
0634 std::swap(consumedBySubProcesses, tmp);
0635 }
0636 });
0637
0638
0639 checkForModuleDependencyCorrectness(pathsAndConsumesOfModules_, printDependencies_);
0640 if (deleteNonConsumedUnscheduledModules_) {
0641 if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
0642 not unusedModules.empty()) {
0643 pathsAndConsumesOfModules_.removeModules(unusedModules);
0644
0645 edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
0646 l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
0647 "and "
0648 "therefore they are deleted before beginJob transition.";
0649 for (auto const& description : unusedModules) {
0650 l << "\n " << description->moduleLabel();
0651 }
0652 });
0653 for (auto const& description : unusedModules) {
0654 schedule_->deleteModule(description->moduleLabel(), actReg_.get());
0655 }
0656 }
0657 }
0658
0659
0660
0661 if (not branchesToDeleteEarly_.empty()) {
0662 schedule_->initializeEarlyDelete(branchesToDeleteEarly_, *preg_);
0663 decltype(branchesToDeleteEarly_)().swap(branchesToDeleteEarly_);
0664 }
0665
0666 actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
0667
0668 if (preallocations_.numberOfLuminosityBlocks() > 1) {
0669 throwAboutModulesRequiringLuminosityBlockSynchronization();
0670 }
0671 warnAboutLegacyModules();
0672
0673
0674
0675
0676
0677
0678
0679
0680
0681
0682
0683
0684
0685
0686
0687 try {
0688 convertException::wrap([&]() { input_->doBeginJob(); });
0689 } catch (cms::Exception& ex) {
0690 ex.addContext("Calling beginJob for the source");
0691 throw;
0692 }
0693 espController_->finishConfiguration();
0694 schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
0695 if (looper_) {
0696 constexpr bool mustPrefetchMayGet = true;
0697 auto const processBlockLookup = preg_->productLookup(InProcess);
0698 auto const runLookup = preg_->productLookup(InRun);
0699 auto const lumiLookup = preg_->productLookup(InLumi);
0700 auto const eventLookup = preg_->productLookup(InEvent);
0701 looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
0702 looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
0703 looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
0704 looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
0705 looper_->updateLookup(esp_->recordsToProxyIndices());
0706 }
0707
0708 for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
0709 actReg_->postBeginJobSignal_();
0710
0711 FinalWaitingTask last;
0712 oneapi::tbb::task_group group;
0713 using namespace edm::waiting_task::chain;
0714 first([this](auto nextTask) {
0715 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0716 first([i, this](auto nextTask) {
0717 ServiceRegistry::Operate operate(serviceToken_);
0718 schedule_->beginStream(i);
0719 }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
0720 ServiceRegistry::Operate operate(serviceToken_);
0721 for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
0722 }) | lastTask(nextTask);
0723 }
0724 }) | runLast(WaitingTaskHolder(group, &last));
0725 group.wait();
0726 if (last.exceptionPtr()) {
0727 std::rethrow_exception(*last.exceptionPtr());
0728 }
0729 }
0730
0731 void EventProcessor::endJob() {
0732
0733 ExceptionCollector c(
0734 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0735
0736
0737 ServiceRegistry::Operate operate(serviceToken_);
0738
0739 using namespace edm::waiting_task::chain;
0740
0741 edm::FinalWaitingTask waitTask;
0742 oneapi::tbb::task_group group;
0743
0744 {
0745
0746 edm::WaitingTaskHolder taskHolder(group, &waitTask);
0747 std::mutex collectorMutex;
0748 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0749 first([this, i, &c, &collectorMutex](auto nextTask) {
0750 std::exception_ptr ep;
0751 try {
0752 ServiceRegistry::Operate operate(serviceToken_);
0753 this->schedule_->endStream(i);
0754 } catch (...) {
0755 ep = std::current_exception();
0756 }
0757 if (ep) {
0758 std::lock_guard<std::mutex> l(collectorMutex);
0759 c.call([&ep]() { std::rethrow_exception(ep); });
0760 }
0761 }) | then([this, i, &c, &collectorMutex](auto nextTask) {
0762 for (auto& subProcess : subProcesses_) {
0763 first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
0764 std::exception_ptr ep;
0765 try {
0766 ServiceRegistry::Operate operate(serviceToken_);
0767 subProcess.doEndStream(i);
0768 } catch (...) {
0769 ep = std::current_exception();
0770 }
0771 if (ep) {
0772 std::lock_guard<std::mutex> l(collectorMutex);
0773 c.call([&ep]() { std::rethrow_exception(ep); });
0774 }
0775 }) | lastTask(nextTask);
0776 }
0777 }) | lastTask(taskHolder);
0778 }
0779 }
0780 group.wait();
0781
0782 auto actReg = actReg_.get();
0783 c.call([actReg]() { actReg->preEndJobSignal_(); });
0784 schedule_->endJob(c);
0785 for (auto& subProcess : subProcesses_) {
0786 c.call(std::bind(&SubProcess::doEndJob, &subProcess));
0787 }
0788 c.call(std::bind(&InputSource::doEndJob, input_.get()));
0789 if (looper_) {
0790 c.call(std::bind(&EDLooperBase::endOfJob, looper()));
0791 }
0792 c.call([actReg]() { actReg->postEndJobSignal_(); });
0793 if (c.hasThrown()) {
0794 c.rethrow();
0795 }
0796 }
0797
0798 ServiceToken EventProcessor::getToken() { return serviceToken_; }
0799
0800 std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
0801 return schedule_->getAllModuleDescriptions();
0802 }
0803
0804 int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
0805
0806 int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
0807
0808 int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
0809
0810 void EventProcessor::clearCounters() { schedule_->clearCounters(); }
0811
0812 namespace {
0813 #include "TransitionProcessors.icc"
0814 }
0815
0816 bool EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
0817 bool returnValue = false;
0818
0819
0820 if (shutdown_flag.load(std::memory_order_acquire)) {
0821 returnValue = true;
0822 returnCode = epSignal;
0823 }
0824 return returnValue;
0825 }
0826
0827 InputSource::ItemType EventProcessor::nextTransitionType() {
0828 if (deferredExceptionPtrIsSet_.load()) {
0829 lastSourceTransition_ = InputSource::IsStop;
0830 return InputSource::IsStop;
0831 }
0832
0833 SendSourceTerminationSignalIfException sentry(actReg_.get());
0834 InputSource::ItemType itemType;
0835
0836 do {
0837 itemType = input_->nextItemType();
0838 } while (itemType == InputSource::IsSynchronize);
0839
0840 lastSourceTransition_ = itemType;
0841 sentry.completedSuccessfully();
0842
0843 StatusCode returnCode = epSuccess;
0844
0845 if (checkForAsyncStopRequest(returnCode)) {
0846 actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
0847 lastSourceTransition_ = InputSource::IsStop;
0848 }
0849
0850 return lastSourceTransition_;
0851 }
0852
0853 std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
0854 return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
0855 }
0856
0857 edm::LuminosityBlockNumber_t EventProcessor::nextLuminosityBlockID() { return input_->luminosityBlock(); }
0858
0859 EventProcessor::StatusCode EventProcessor::runToCompletion() {
0860 beginJob();
0861
0862
0863 ServiceRegistry::Operate operate(serviceToken_);
0864
0865 try {
0866 FilesProcessor fp(fileModeNoMerge_);
0867
0868 convertException::wrap([&]() {
0869 bool firstTime = true;
0870 do {
0871 if (not firstTime) {
0872 prepareForNextLoop();
0873 rewindInput();
0874 } else {
0875 firstTime = false;
0876 }
0877 startingNewLoop();
0878
0879 auto trans = fp.processFiles(*this);
0880
0881 fp.normalEnd();
0882
0883 if (deferredExceptionPtrIsSet_.load()) {
0884 std::rethrow_exception(deferredExceptionPtr_);
0885 }
0886 if (trans != InputSource::IsStop) {
0887
0888 doErrorStuff();
0889
0890 throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
0891 }
0892 } while (not endOfLoop());
0893 });
0894
0895 }
0896 catch (cms::Exception& e) {
0897 if (exceptionMessageLumis_) {
0898 std::string message(
0899 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
0900 e.addAdditionalInfo(message);
0901 if (e.alreadyPrinted()) {
0902 LogAbsolute("Additional Exceptions") << message;
0903 }
0904 }
0905 if (exceptionMessageRuns_) {
0906 std::string message(
0907 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
0908 e.addAdditionalInfo(message);
0909 if (e.alreadyPrinted()) {
0910 LogAbsolute("Additional Exceptions") << message;
0911 }
0912 }
0913 if (!exceptionMessageFiles_.empty()) {
0914 e.addAdditionalInfo(exceptionMessageFiles_);
0915 if (e.alreadyPrinted()) {
0916 LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
0917 }
0918 }
0919 throw;
0920 }
0921 return epSuccess;
0922 }
0923
0924 void EventProcessor::readFile() {
0925 FDEBUG(1) << " \treadFile\n";
0926 size_t size = preg_->size();
0927 SendSourceTerminationSignalIfException sentry(actReg_.get());
0928
0929 principalCache_.preReadFile();
0930
0931 fb_ = input_->readFile();
0932 if (size < preg_->size()) {
0933 principalCache_.adjustIndexesAfterProductRegistryAddition();
0934 }
0935 principalCache_.adjustEventsToNewProductRegistry(preg());
0936 if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
0937 fb_->setNotFastClonable(FileBlock::ParallelProcesses);
0938 }
0939 sentry.completedSuccessfully();
0940 }
0941
0942 void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
0943 if (fileBlockValid()) {
0944 SendSourceTerminationSignalIfException sentry(actReg_.get());
0945 input_->closeFile(fb_.get(), cleaningUpAfterException);
0946 sentry.completedSuccessfully();
0947 }
0948 FDEBUG(1) << "\tcloseInputFile\n";
0949 }
0950
0951 void EventProcessor::openOutputFiles() {
0952 if (fileBlockValid()) {
0953 schedule_->openOutputFiles(*fb_);
0954 for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
0955 }
0956 FDEBUG(1) << "\topenOutputFiles\n";
0957 }
0958
0959 void EventProcessor::closeOutputFiles() {
0960 schedule_->closeOutputFiles();
0961 for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
0962 processBlockHelper_->clearAfterOutputFilesClose();
0963 FDEBUG(1) << "\tcloseOutputFiles\n";
0964 }
0965
0966 void EventProcessor::respondToOpenInputFile() {
0967 if (fileBlockValid()) {
0968 for_all(subProcesses_,
0969 [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
0970 schedule_->respondToOpenInputFile(*fb_);
0971 for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
0972 }
0973 FDEBUG(1) << "\trespondToOpenInputFile\n";
0974 }
0975
0976 void EventProcessor::respondToCloseInputFile() {
0977 if (fileBlockValid()) {
0978 schedule_->respondToCloseInputFile(*fb_);
0979 for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
0980 }
0981 FDEBUG(1) << "\trespondToCloseInputFile\n";
0982 }
0983
0984 void EventProcessor::startingNewLoop() {
0985 shouldWeStop_ = false;
0986
0987
0988 if (looper_ && looperBeginJobRun_) {
0989 looper_->doStartingNewLoop();
0990 }
0991 FDEBUG(1) << "\tstartingNewLoop\n";
0992 }
0993
0994 bool EventProcessor::endOfLoop() {
0995 if (looper_) {
0996 ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
0997 looper_->setModuleChanger(&changer);
0998 EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
0999 looper_->setModuleChanger(nullptr);
1000 if (status != EDLooperBase::kContinue || forceLooperToEnd_)
1001 return true;
1002 else
1003 return false;
1004 }
1005 FDEBUG(1) << "\tendOfLoop\n";
1006 return true;
1007 }
1008
1009 void EventProcessor::rewindInput() {
1010 input_->repeat();
1011 input_->rewind();
1012 FDEBUG(1) << "\trewind\n";
1013 }
1014
1015 void EventProcessor::prepareForNextLoop() {
1016 looper_->prepareForNextLoop(esp_.get());
1017 FDEBUG(1) << "\tprepareForNextLoop\n";
1018 }
1019
1020 bool EventProcessor::shouldWeCloseOutput() const {
1021 FDEBUG(1) << "\tshouldWeCloseOutput\n";
1022 if (!subProcesses_.empty()) {
1023 for (auto const& subProcess : subProcesses_) {
1024 if (subProcess.shouldWeCloseOutput()) {
1025 return true;
1026 }
1027 }
1028 return false;
1029 }
1030 return schedule_->shouldWeCloseOutput();
1031 }
1032
1033 void EventProcessor::doErrorStuff() {
1034 FDEBUG(1) << "\tdoErrorStuff\n";
1035 LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1036 << "and went to the error state\n"
1037 << "Will attempt to terminate processing normally\n"
1038 << "(IF using the looper the next loop will be attempted)\n"
1039 << "This likely indicates a bug in an input module or corrupted input or both\n";
1040 }
1041
1042 void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1043 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1044 processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1045
1046 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1047 FinalWaitingTask globalWaitTask;
1048
1049 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1050 beginGlobalTransitionAsync<Traits>(
1051 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1052
1053 do {
1054 taskGroup_.wait();
1055 } while (not globalWaitTask.done());
1056
1057 if (globalWaitTask.exceptionPtr() != nullptr) {
1058 std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1059 }
1060 beginProcessBlockSucceeded = true;
1061 }
1062
1063 void EventProcessor::inputProcessBlocks() {
1064 input_->fillProcessBlockHelper();
1065 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1066 while (input_->nextProcessBlock(processBlockPrincipal)) {
1067 readProcessBlock(processBlockPrincipal);
1068
1069 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1070 FinalWaitingTask globalWaitTask;
1071
1072 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1073 beginGlobalTransitionAsync<Traits>(
1074 WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1075
1076 do {
1077 taskGroup_.wait();
1078 } while (not globalWaitTask.done());
1079 if (globalWaitTask.exceptionPtr() != nullptr) {
1080 std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1081 }
1082
1083 FinalWaitingTask writeWaitTask;
1084 writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::Input);
1085 do {
1086 taskGroup_.wait();
1087 } while (not writeWaitTask.done());
1088 if (writeWaitTask.exceptionPtr()) {
1089 std::rethrow_exception(*writeWaitTask.exceptionPtr());
1090 }
1091
1092 processBlockPrincipal.clearPrincipal();
1093 for (auto& s : subProcesses_) {
1094 s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1095 }
1096 }
1097 }
1098
1099 void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1100 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1101
1102 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1103 FinalWaitingTask globalWaitTask;
1104
1105 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1106 endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1107 *schedule_,
1108 transitionInfo,
1109 serviceToken_,
1110 subProcesses_,
1111 cleaningUpAfterException);
1112 do {
1113 taskGroup_.wait();
1114 } while (not globalWaitTask.done());
1115 if (globalWaitTask.exceptionPtr() != nullptr) {
1116 std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1117 }
1118
1119 if (beginProcessBlockSucceeded) {
1120 FinalWaitingTask writeWaitTask;
1121 writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::New);
1122 do {
1123 taskGroup_.wait();
1124 } while (not writeWaitTask.done());
1125 if (writeWaitTask.exceptionPtr()) {
1126 std::rethrow_exception(*writeWaitTask.exceptionPtr());
1127 }
1128 }
1129
1130 processBlockPrincipal.clearPrincipal();
1131 for (auto& s : subProcesses_) {
1132 s.clearProcessBlockPrincipal(ProcessBlockType::New);
1133 }
1134 }
1135
1136 void EventProcessor::beginRun(ProcessHistoryID const& phid,
1137 RunNumber_t run,
1138 bool& globalBeginSucceeded,
1139 bool& eventSetupForInstanceSucceeded) {
1140 globalBeginSucceeded = false;
1141 RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1142 {
1143 SendSourceTerminationSignalIfException sentry(actReg_.get());
1144
1145 input_->doBeginRun(runPrincipal, &processContext_);
1146 sentry.completedSuccessfully();
1147 }
1148
1149 IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1150 if (forceESCacheClearOnNewRun_) {
1151 espController_->forceCacheClear();
1152 }
1153 {
1154 SendSourceTerminationSignalIfException sentry(actReg_.get());
1155 actReg_->preESSyncIOVSignal_.emit(ts);
1156 synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
1157 actReg_->postESSyncIOVSignal_.emit(ts);
1158 eventSetupForInstanceSucceeded = true;
1159 sentry.completedSuccessfully();
1160 }
1161 auto const& es = esp_->eventSetupImpl();
1162 if (looper_ && looperBeginJobRun_ == false) {
1163 looper_->copyInfo(ScheduleInfo(schedule_.get()));
1164
1165 FinalWaitingTask waitTask;
1166 using namespace edm::waiting_task::chain;
1167 chain::first([this, &es](auto nextTask) {
1168 looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1169 }) | then([this, &es](auto nextTask) mutable {
1170 looper_->beginOfJob(es);
1171 looperBeginJobRun_ = true;
1172 looper_->doStartingNewLoop();
1173 }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1174
1175 do {
1176 taskGroup_.wait();
1177 } while (not waitTask.done());
1178 if (waitTask.exceptionPtr() != nullptr) {
1179 std::rethrow_exception(*(waitTask.exceptionPtr()));
1180 }
1181 }
1182 {
1183 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1184 FinalWaitingTask globalWaitTask;
1185
1186 using namespace edm::waiting_task::chain;
1187 chain::first([&runPrincipal, &es, this](auto waitTask) {
1188 RunTransitionInfo transitionInfo(runPrincipal, es);
1189 beginGlobalTransitionAsync<Traits>(
1190 std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1191 }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1192 globalBeginSucceeded = true;
1193 FDEBUG(1) << "\tbeginRun " << run << "\n";
1194 }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1195 looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1196 }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1197 looper_->doBeginRun(runPrincipal, es, &processContext_);
1198 }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1199
1200 do {
1201 taskGroup_.wait();
1202 } while (not globalWaitTask.done());
1203 if (globalWaitTask.exceptionPtr() != nullptr) {
1204 std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1205 }
1206 }
1207 {
1208
1209 FinalWaitingTask streamLoopWaitTask;
1210
1211 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1212
1213 RunTransitionInfo transitionInfo(runPrincipal, es);
1214 beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1215 *schedule_,
1216 preallocations_.numberOfStreams(),
1217 transitionInfo,
1218 serviceToken_,
1219 subProcesses_);
1220 do {
1221 taskGroup_.wait();
1222 } while (not streamLoopWaitTask.done());
1223 if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1224 std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1225 }
1226 }
1227 FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1228 if (looper_) {
1229
1230 }
1231 }
1232
1233 void EventProcessor::endUnfinishedRun(ProcessHistoryID const& phid,
1234 RunNumber_t run,
1235 bool globalBeginSucceeded,
1236 bool cleaningUpAfterException,
1237 bool eventSetupForInstanceSucceeded) {
1238 if (eventSetupForInstanceSucceeded) {
1239
1240 endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1241
1242 if (globalBeginSucceeded) {
1243 RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1244 if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
1245 FinalWaitingTask t;
1246 MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1247 mergeableRunProductMetadata->preWriteRun();
1248 writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1249 do {
1250 taskGroup_.wait();
1251 } while (not t.done());
1252 mergeableRunProductMetadata->postWriteRun();
1253 if (t.exceptionPtr()) {
1254 std::rethrow_exception(*t.exceptionPtr());
1255 }
1256 }
1257 }
1258 }
1259 deleteRunFromCache(phid, run);
1260 }
1261
1262 void EventProcessor::endRun(ProcessHistoryID const& phid,
1263 RunNumber_t run,
1264 bool globalBeginSucceeded,
1265 bool cleaningUpAfterException) {
1266 RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1267 runPrincipal.setEndTime(input_->timestamp());
1268
1269 IOVSyncValue ts(
1270 EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1271 runPrincipal.endTime());
1272 {
1273 SendSourceTerminationSignalIfException sentry(actReg_.get());
1274 actReg_->preESSyncIOVSignal_.emit(ts);
1275 synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
1276 actReg_->postESSyncIOVSignal_.emit(ts);
1277 sentry.completedSuccessfully();
1278 }
1279 auto const& es = esp_->eventSetupImpl();
1280 if (globalBeginSucceeded) {
1281
1282 FinalWaitingTask streamLoopWaitTask;
1283
1284 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1285
1286 RunTransitionInfo transitionInfo(runPrincipal, es);
1287 endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1288 *schedule_,
1289 preallocations_.numberOfStreams(),
1290 transitionInfo,
1291 serviceToken_,
1292 subProcesses_,
1293 cleaningUpAfterException);
1294 do {
1295 taskGroup_.wait();
1296 } while (not streamLoopWaitTask.done());
1297 if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1298 std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1299 }
1300 }
1301 FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1302 if (looper_) {
1303
1304 }
1305 {
1306 FinalWaitingTask globalWaitTask;
1307
1308 using namespace edm::waiting_task::chain;
1309 chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1310 RunTransitionInfo transitionInfo(runPrincipal, es);
1311 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1312 endGlobalTransitionAsync<Traits>(
1313 std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1314 }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1315 looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1316 }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1317 looper_->doEndRun(runPrincipal, es, &processContext_);
1318 }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1319
1320 do {
1321 taskGroup_.wait();
1322 } while (not globalWaitTask.done());
1323 if (globalWaitTask.exceptionPtr() != nullptr) {
1324 std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1325 }
1326 }
1327 FDEBUG(1) << "\tendRun " << run << "\n";
1328 }
1329
1330 InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1331 FinalWaitingTask waitTask;
1332 if (streamLumiActive_ > 0) {
1333 assert(streamLumiActive_ == preallocations_.numberOfStreams());
1334
1335 continueLumiAsync(WaitingTaskHolder{taskGroup_, &waitTask});
1336 } else {
1337 beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1338 input_->luminosityBlockAuxiliary()->beginTime()),
1339 iRunResource,
1340 WaitingTaskHolder{taskGroup_, &waitTask});
1341 }
1342 do {
1343 taskGroup_.wait();
1344 } while (not waitTask.done());
1345
1346 if (waitTask.exceptionPtr() != nullptr) {
1347 std::rethrow_exception(*(waitTask.exceptionPtr()));
1348 }
1349 return lastTransitionType();
1350 }
1351
1352 void EventProcessor::beginLumiAsync(IOVSyncValue const& iSync,
1353 std::shared_ptr<void> const& iRunResource,
1354 edm::WaitingTaskHolder iHolder) {
1355 if (iHolder.taskHasFailed()) {
1356 return;
1357 }
1358
1359 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1360
1361
1362
1363
1364
1365
1366
1367
1368 auto status =
1369 std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1370 chain::first([&](auto nextTask) {
1371 auto asyncEventSetup = [](ActivityRegistry* actReg,
1372 auto* espController,
1373 auto& queue,
1374 WaitingTaskHolder task,
1375 auto& status,
1376 IOVSyncValue const& iSync) {
1377 queue.pause();
1378 CMS_SA_ALLOW try {
1379 SendSourceTerminationSignalIfException sentry(actReg);
1380
1381
1382
1383
1384 actReg->preESSyncIOVSignal_.emit(iSync);
1385 espController->eventSetupForInstanceAsync(
1386 iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1387 sentry.completedSuccessfully();
1388 } catch (...) {
1389 task.doneWaiting(std::current_exception());
1390 }
1391 };
1392 if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1393
1394
1395
1396 auto group = nextTask.group();
1397 queueWhichWaitsForIOVsToFinish_.push(
1398 *group, [this, task = std::move(nextTask), iSync, status, asyncEventSetup]() mutable {
1399 asyncEventSetup(
1400 actReg_.get(), espController_.get(), queueWhichWaitsForIOVsToFinish_, std::move(task), status, iSync);
1401 });
1402 } else {
1403 asyncEventSetup(
1404 actReg_.get(), espController_.get(), queueWhichWaitsForIOVsToFinish_, std::move(nextTask), status, iSync);
1405 }
1406 }) | chain::then([this, status, iSync](std::exception_ptr const* iPtr, auto nextTask) {
1407 actReg_->postESSyncIOVSignal_.emit(iSync);
1408
1409 auto copyTask = nextTask;
1410 if (iPtr) {
1411 nextTask.doneWaiting(*iPtr);
1412 }
1413 auto group = copyTask.group();
1414 lumiQueue_->pushAndPause(
1415 *group, [this, task = std::move(copyTask), status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1416 if (task.taskHasFailed()) {
1417 status->resetResources();
1418 return;
1419 }
1420
1421 status->setResumer(std::move(iResumer));
1422
1423 auto group = task.group();
1424 sourceResourcesAcquirer_.serialQueueChain().push(
1425 *group, [this, postQueueTask = std::move(task), status = std::move(status)]() mutable {
1426
1427 ServiceRegistry::Operate operate(serviceToken_);
1428
1429 CMS_SA_ALLOW try {
1430 readLuminosityBlock(*status);
1431
1432 LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1433 {
1434 SendSourceTerminationSignalIfException sentry(actReg_.get());
1435
1436 input_->doBeginLumi(lumiPrincipal, &processContext_);
1437 sentry.completedSuccessfully();
1438 }
1439
1440 Service<RandomNumberGenerator> rng;
1441 if (rng.isAvailable()) {
1442 LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1443 rng->preBeginLumi(lb);
1444 }
1445
1446 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1447
1448 using namespace edm::waiting_task::chain;
1449 chain::first([this, status, &lumiPrincipal](auto nextTask) {
1450 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1451 {
1452 LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1453 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1454 beginGlobalTransitionAsync<Traits>(
1455 nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1456 }
1457 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1458 looper_->prefetchAsync(
1459 nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1460 }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1461 status->globalBeginDidSucceed();
1462
1463 ServiceRegistry::Operate operateLooper(serviceToken_);
1464 looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1465 }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1466 if (iPtr) {
1467 status->resetResources();
1468 holder.doneWaiting(*iPtr);
1469 } else {
1470 if (not looper_) {
1471 status->globalBeginDidSucceed();
1472 }
1473 EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1474 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1475
1476 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1477 streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1478 streamQueues_[i].pause();
1479
1480 auto& event = principalCache_.eventPrincipal(i);
1481
1482
1483
1484 auto eventSetupImpls = &status->eventSetupImpls();
1485 auto lp = status->lumiPrincipal().get();
1486 streamLumiStatus_[i] = std::move(status);
1487 ++streamLumiActive_;
1488 event.setLuminosityBlockPrincipal(lp);
1489 LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1490 using namespace edm::waiting_task::chain;
1491 chain::first([this, i, &transitionInfo](auto nextTask) {
1492 beginStreamTransitionAsync<Traits>(
1493 std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1494 }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1495 if (exceptionFromBeginStreamLumi) {
1496 WaitingTaskHolder tmp(nextTask);
1497 tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1498 streamEndLumiAsync(nextTask, i);
1499 } else {
1500 handleNextEventForStreamAsync(std::move(nextTask), i);
1501 }
1502 }) | runLast(holder);
1503 });
1504 }
1505 }
1506 }) | runLast(postQueueTask);
1507
1508 } catch (...) {
1509 status->resetResources();
1510 postQueueTask.doneWaiting(std::current_exception());
1511 }
1512 });
1513 });
1514 }) | chain::runLast(std::move(iHolder));
1515 }
1516
1517 void EventProcessor::continueLumiAsync(edm::WaitingTaskHolder iHolder) {
1518 {
1519
1520 auto status = streamLumiStatus_[0];
1521 status->needToContinueLumi();
1522 status->startProcessingEvents();
1523 }
1524
1525 unsigned int streamIndex = 0;
1526 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1527 for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1528 arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1529 }
1530 iHolder.group()->run(
1531 [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1532 }
1533
1534 void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1535 if (setDeferredException(*iPtr)) {
1536 WaitingTaskHolder tmp(holder);
1537 tmp.doneWaiting(*iPtr);
1538 } else {
1539 setExceptionMessageLumis();
1540 }
1541 }
1542
1543 void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask,
1544 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1545
1546
1547 auto& lp = *(iLumiStatus->lumiPrincipal());
1548 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1549 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1550 EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1551 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1552
1553 using namespace edm::waiting_task::chain;
1554 chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1555 IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1556
1557 LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1558 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1559 endGlobalTransitionAsync<Traits>(
1560 std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1561 }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1562
1563 if (didGlobalBeginSucceed) {
1564 writeLumiAsync(std::move(nextTask), lumiPrincipal);
1565 }
1566 }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1567 looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1568 }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1569
1570 ServiceRegistry::Operate operate(serviceToken_);
1571 looper_->doEndLuminosityBlock(lp, es, &processContext_);
1572 }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1573 std::exception_ptr ptr;
1574 if (iPtr) {
1575 ptr = *iPtr;
1576 }
1577 ServiceRegistry::Operate operate(serviceToken_);
1578
1579
1580
1581
1582
1583 CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1584 if (not ptr) {
1585 ptr = std::current_exception();
1586 }
1587 }
1588
1589 CMS_SA_ALLOW try {
1590 status->resumeGlobalLumiQueue();
1591 queueWhichWaitsForIOVsToFinish_.resume();
1592 } catch (...) {
1593 if (not ptr) {
1594 ptr = std::current_exception();
1595 }
1596 }
1597
1598 CMS_SA_ALLOW try {
1599
1600
1601
1602 status->resetResources();
1603 status.reset();
1604 } catch (...) {
1605 if (not ptr) {
1606 ptr = std::current_exception();
1607 }
1608 }
1609
1610 if (ptr) {
1611 handleEndLumiExceptions(&ptr, nextTask);
1612 }
1613 }) | runLast(std::move(iTask));
1614 }
1615
1616 void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1617 auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1618 if (iPtr) {
1619 handleEndLumiExceptions(iPtr, iTask);
1620 }
1621 auto status = streamLumiStatus_[iStreamIndex];
1622
1623 streamLumiStatus_[iStreamIndex].reset();
1624 --streamLumiActive_;
1625 streamQueues_[iStreamIndex].resume();
1626
1627
1628 if (status->streamFinishedLumi()) {
1629 globalEndLumiAsync(iTask, std::move(status));
1630 }
1631 });
1632
1633 edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1634
1635
1636
1637 auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1638 lumiStatus->setEndTime();
1639
1640 EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1641
1642 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1643 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1644
1645 if (lumiStatus->didGlobalBeginSucceed()) {
1646 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1647 IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1648 lumiPrincipal.endTime());
1649 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1650 LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1651 endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1652 *schedule_,
1653 iStreamIndex,
1654 transitionInfo,
1655 serviceToken_,
1656 subProcesses_,
1657 cleaningUpAfterException);
1658 }
1659 }
1660
1661 void EventProcessor::endUnfinishedLumi() {
1662 if (streamLumiActive_.load() > 0) {
1663 FinalWaitingTask globalWaitTask;
1664 {
1665 WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1666 for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1667 if (streamLumiStatus_[i]) {
1668 streamEndLumiAsync(globalTaskHolder, i);
1669 }
1670 }
1671 }
1672 do {
1673 taskGroup_.wait();
1674 } while (not globalWaitTask.done());
1675 if (globalWaitTask.exceptionPtr() != nullptr) {
1676 std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1677 }
1678 }
1679 }
1680
1681 void EventProcessor::readProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
1682 SendSourceTerminationSignalIfException sentry(actReg_.get());
1683 input_->readProcessBlock(processBlockPrincipal);
1684 sentry.completedSuccessfully();
1685 }
1686
1687 std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1688 if (principalCache_.hasRunPrincipal()) {
1689 throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1690 << "Illegal attempt to insert run into cache\n"
1691 << "Contact a Framework Developer\n";
1692 }
1693 auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1694 preg(),
1695 *processConfiguration_,
1696 historyAppender_.get(),
1697 0,
1698 true,
1699 &mergeableRunProductProcesses_);
1700 {
1701 SendSourceTerminationSignalIfException sentry(actReg_.get());
1702 input_->readRun(*rp, *historyAppender_);
1703 sentry.completedSuccessfully();
1704 }
1705 assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1706 principalCache_.insert(rp);
1707 return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1708 }
1709
1710 std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1711 principalCache_.merge(input_->runAuxiliary(), preg());
1712 auto runPrincipal = principalCache_.runPrincipalPtr();
1713 {
1714 SendSourceTerminationSignalIfException sentry(actReg_.get());
1715 input_->readAndMergeRun(*runPrincipal);
1716 sentry.completedSuccessfully();
1717 }
1718 assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1719 return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1720 }
1721
1722 void EventProcessor::readLuminosityBlock(LuminosityBlockProcessingStatus& iStatus) {
1723 if (!principalCache_.hasRunPrincipal()) {
1724 throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1725 << "Illegal attempt to insert lumi into cache\n"
1726 << "Run is invalid\n"
1727 << "Contact a Framework Developer\n";
1728 }
1729 auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
1730 assert(lbp);
1731 lbp->setAux(*input_->luminosityBlockAuxiliary());
1732 {
1733 SendSourceTerminationSignalIfException sentry(actReg_.get());
1734 input_->readLuminosityBlock(*lbp, *historyAppender_);
1735 sentry.completedSuccessfully();
1736 }
1737 lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1738 iStatus.lumiPrincipal() = std::move(lbp);
1739 }
1740
1741 int EventProcessor::readAndMergeLumi(LuminosityBlockProcessingStatus& iStatus) {
1742 auto& lumiPrincipal = *iStatus.lumiPrincipal();
1743 assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1744 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1745 input_->processHistoryRegistry().reducedProcessHistoryID(
1746 input_->luminosityBlockAuxiliary()->processHistoryID()));
1747 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1748 assert(lumiOK);
1749 lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1750 {
1751 SendSourceTerminationSignalIfException sentry(actReg_.get());
1752 input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1753 sentry.completedSuccessfully();
1754 }
1755 return input_->luminosityBlock();
1756 }
1757
1758 void EventProcessor::writeProcessBlockAsync(WaitingTaskHolder task, ProcessBlockType processBlockType) {
1759 using namespace edm::waiting_task;
1760 chain::first([&](auto nextTask) {
1761 ServiceRegistry::Operate op(serviceToken_);
1762 schedule_->writeProcessBlockAsync(
1763 nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1764 }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
1765 ServiceRegistry::Operate op(serviceToken_);
1766 for (auto& s : subProcesses_) {
1767 s.writeProcessBlockAsync(nextTask, processBlockType);
1768 }
1769 }) | chain::runLast(std::move(task));
1770 }
1771
1772 void EventProcessor::writeRunAsync(WaitingTaskHolder task,
1773 ProcessHistoryID const& phid,
1774 RunNumber_t run,
1775 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1776 using namespace edm::waiting_task;
1777 chain::first([&](auto nextTask) {
1778 ServiceRegistry::Operate op(serviceToken_);
1779 schedule_->writeRunAsync(nextTask,
1780 principalCache_.runPrincipal(phid, run),
1781 &processContext_,
1782 actReg_.get(),
1783 mergeableRunProductMetadata);
1784 }) | chain::ifThen(not subProcesses_.empty(), [this, phid, run, mergeableRunProductMetadata](auto nextTask) {
1785 ServiceRegistry::Operate op(serviceToken_);
1786 for (auto& s : subProcesses_) {
1787 s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1788 }
1789 }) | chain::runLast(std::move(task));
1790 }
1791
1792 void EventProcessor::deleteRunFromCache(ProcessHistoryID const& phid, RunNumber_t run) {
1793 principalCache_.deleteRun(phid, run);
1794 for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1795 FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1796 }
1797
1798 void EventProcessor::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& lumiPrincipal) {
1799 using namespace edm::waiting_task;
1800 if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
1801 chain::first([&](auto nextTask) {
1802 ServiceRegistry::Operate op(serviceToken_);
1803
1804 lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1805 schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
1806 }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
1807 ServiceRegistry::Operate op(serviceToken_);
1808 for (auto& s : subProcesses_) {
1809 s.writeLumiAsync(nextTask, lumiPrincipal);
1810 }
1811 }) | chain::lastTask(std::move(task));
1812 }
1813 }
1814
1815 void EventProcessor::deleteLumiFromCache(LuminosityBlockProcessingStatus& iStatus) {
1816 for (auto& s : subProcesses_) {
1817 s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1818 }
1819 iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
1820 iStatus.lumiPrincipal()->clearPrincipal();
1821
1822 }
1823
1824 bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus& iStatus) {
1825 if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1826 iStatus.endLumi();
1827 return false;
1828 }
1829
1830 if (iStatus.wasEventProcessingStopped()) {
1831 return false;
1832 }
1833
1834 if (shouldWeStop()) {
1835 lastSourceTransition_ = InputSource::IsStop;
1836 iStatus.stopProcessingEvents();
1837 iStatus.endLumi();
1838 return false;
1839 }
1840
1841 ServiceRegistry::Operate operate(serviceToken_);
1842
1843 CMS_SA_ALLOW try {
1844
1845
1846
1847 std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1848
1849 auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1850 if (InputSource::IsLumi == itemType) {
1851 iStatus.haveContinuedLumi();
1852 while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1853 iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1854 readAndMergeLumi(iStatus);
1855 itemType = nextTransitionType();
1856 }
1857 if (InputSource::IsLumi == itemType) {
1858 iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1859 input_->luminosityBlockAuxiliary()->beginTime()));
1860 }
1861 }
1862 if (InputSource::IsEvent != itemType) {
1863 iStatus.stopProcessingEvents();
1864
1865
1866
1867
1868 if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1869 (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1870 iStatus.endLumi();
1871 }
1872 return false;
1873 }
1874 readEvent(iStreamIndex);
1875 } catch (...) {
1876 bool expected = false;
1877 if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1878 deferredExceptionPtr_ = std::current_exception();
1879 iStatus.endLumi();
1880 }
1881 return false;
1882 }
1883 return true;
1884 }
1885
1886 void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1887 sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1888 ServiceRegistry::Operate operate(serviceToken_);
1889
1890
1891 auto status = streamLumiStatus_[iStreamIndex].get();
1892
1893 CMS_SA_ALLOW try {
1894 if (readNextEventForStream(iStreamIndex, *status)) {
1895 auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1896 if (iPtr) {
1897
1898
1899 bool expected = false;
1900 if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1901
1902
1903 deferredExceptionPtr_ = *iPtr;
1904 WaitingTaskHolder tempHolder(iTask);
1905 tempHolder.doneWaiting(*iPtr);
1906 }
1907 streamEndLumiAsync(std::move(iTask), iStreamIndex);
1908
1909 return;
1910 }
1911 handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1912 });
1913
1914 processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1915 } else {
1916
1917 if (status->isLumiEnding()) {
1918 if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1919 status->startNextLumi();
1920 beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1921 }
1922 streamEndLumiAsync(std::move(iTask), iStreamIndex);
1923 } else {
1924 iTask.doneWaiting(std::exception_ptr{});
1925 }
1926 }
1927 } catch (...) {
1928
1929
1930 if (streamLumiStatus_[iStreamIndex]) {
1931 streamEndLumiAsync(iTask, iStreamIndex);
1932 }
1933 bool expected = false;
1934 if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1935 auto e = std::current_exception();
1936 deferredExceptionPtr_ = e;
1937 iTask.doneWaiting(e);
1938 }
1939 }
1940 });
1941 }
1942
1943 void EventProcessor::readEvent(unsigned int iStreamIndex) {
1944
1945 auto& event = principalCache_.eventPrincipal(iStreamIndex);
1946 StreamContext streamContext(event.streamID(), &processContext_);
1947
1948 SendSourceTerminationSignalIfException sentry(actReg_.get());
1949 input_->readEvent(event, streamContext);
1950
1951 streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1952 sentry.completedSuccessfully();
1953
1954 FDEBUG(1) << "\treadEvent\n";
1955 }
1956
1957 void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1958 iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1959 }
1960
1961 void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1962 auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1963
1964 ServiceRegistry::Operate operate(serviceToken_);
1965 Service<RandomNumberGenerator> rng;
1966 if (rng.isAvailable()) {
1967 Event ev(*pep, ModuleDescription(), nullptr);
1968 rng->postEventRead(ev);
1969 }
1970
1971 EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1972 using namespace edm::waiting_task::chain;
1973 chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1974 EventTransitionInfo info(*pep, es);
1975 schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1976 }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1977 for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1978 subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1979 }
1980 }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1981
1982 ServiceRegistry::Operate operateLooper(serviceToken_);
1983 processEventWithLooper(*pep, iStreamIndex);
1984 }) | then([pep](auto nextTask) {
1985 FDEBUG(1) << "\tprocessEvent\n";
1986 pep->clearEventPrincipal();
1987 }) | runLast(iHolder);
1988 }
1989
1990 void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
1991 bool randomAccess = input_->randomAccess();
1992 ProcessingController::ForwardState forwardState = input_->forwardState();
1993 ProcessingController::ReverseState reverseState = input_->reverseState();
1994 ProcessingController pc(forwardState, reverseState, randomAccess);
1995
1996 EDLooperBase::Status status = EDLooperBase::kContinue;
1997 do {
1998 StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1999 EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2000 status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2001
2002 bool succeeded = true;
2003 if (randomAccess) {
2004 if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2005 input_->skipEvents(-2);
2006 } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2007 succeeded = input_->goToEvent(pc.specifiedEventTransition());
2008 }
2009 }
2010 pc.setLastOperationSucceeded(succeeded);
2011 } while (!pc.lastOperationSucceeded());
2012 if (status != EDLooperBase::kContinue) {
2013 shouldWeStop_ = true;
2014 lastSourceTransition_ = InputSource::IsStop;
2015 }
2016 }
2017
2018 bool EventProcessor::shouldWeStop() const {
2019 FDEBUG(1) << "\tshouldWeStop\n";
2020 if (shouldWeStop_)
2021 return true;
2022 if (!subProcesses_.empty()) {
2023 for (auto const& subProcess : subProcesses_) {
2024 if (subProcess.terminate()) {
2025 return true;
2026 }
2027 }
2028 return false;
2029 }
2030 return schedule_->terminate();
2031 }
2032
2033 void EventProcessor::setExceptionMessageFiles(std::string& message) { exceptionMessageFiles_ = message; }
2034
2035 void EventProcessor::setExceptionMessageRuns() { exceptionMessageRuns_ = true; }
2036
2037 void EventProcessor::setExceptionMessageLumis() { exceptionMessageLumis_ = true; }
2038
2039 bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2040 bool expected = false;
2041 if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2042 deferredExceptionPtr_ = iException;
2043 return true;
2044 }
2045 return false;
2046 }
2047
2048 void EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization() const {
2049 cms::Exception ex("ModulesSynchingOnLumis");
2050 ex << "The framework is configured to use at least two streams, but the following modules\n"
2051 << "require synchronizing on LuminosityBlock boundaries:";
2052 bool found = false;
2053 for (auto worker : schedule_->allWorkers()) {
2054 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2055 found = true;
2056 ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2057 }
2058 }
2059 if (found) {
2060 ex << "\n\nThe situation can be fixed by either\n"
2061 << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2062 << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2063 throw ex;
2064 }
2065 }
2066
2067 void EventProcessor::warnAboutLegacyModules() const {
2068 std::unique_ptr<LogSystem> s;
2069 for (auto worker : schedule_->allWorkers()) {
2070 if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2071 if (not s) {
2072 s = std::make_unique<LogSystem>("LegacyModules");
2073 (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2074 "is going to end soon. These modules need to be converted to have type\n"
2075 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2076 }
2077 (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2078 }
2079 }
2080 }
2081 }