Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-29 22:58:03

0001 #include "FWCore/Framework/interface/EventProcessor.h"
0002 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0003 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0004 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0005 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0007 
0008 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0009 #include "FWCore/Framework/src/CommonParams.h"
0010 #include "FWCore/Framework/interface/EDLooperBase.h"
0011 #include "FWCore/Framework/interface/EventPrincipal.h"
0012 #include "FWCore/Framework/interface/EventSetupProvider.h"
0013 #include "FWCore/Framework/interface/EventSetupRecord.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/Framework/interface/HistoryAppender.h"
0016 #include "FWCore/Framework/interface/InputSourceDescription.h"
0017 #include "FWCore/Framework/interface/IOVSyncValue.h"
0018 #include "FWCore/Framework/interface/LooperFactory.h"
0019 #include "FWCore/Framework/interface/LuminosityBlock.h"
0020 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0021 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0022 #include "FWCore/Framework/interface/ModuleChanger.h"
0023 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0024 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0025 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0026 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0027 #include "FWCore/Framework/interface/ProcessingController.h"
0028 #include "FWCore/Framework/interface/RunPrincipal.h"
0029 #include "FWCore/Framework/interface/Schedule.h"
0030 #include "FWCore/Framework/interface/ScheduleInfo.h"
0031 #include "FWCore/Framework/interface/ScheduleItems.h"
0032 #include "FWCore/Framework/interface/Event.h"
0033 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0034 #include "FWCore/Framework/src/Breakpoints.h"
0035 #include "FWCore/Framework/interface/EventSetupsController.h"
0036 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0037 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0038 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0039 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0040 #include "FWCore/Framework/interface/TriggerNamesService.h"
0041 #include "FWCore/Framework/src/SendSourceTerminationSignalIfException.h"
0042 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0043 
0044 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0045 
0046 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0047 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
0048 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0049 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0050 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0051 #include "FWCore/ParameterSet/interface/Registry.h"
0052 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0053 
0054 #include "FWCore/AbstractServices/interface/RandomNumberGenerator.h"
0055 #include "FWCore/AbstractServices/interface/RootHandlers.h"
0056 
0057 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0058 #include "FWCore/ServiceRegistry/interface/Service.h"
0059 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0060 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0061 
0062 #include "FWCore/Concurrency/interface/WaitingTask.h"
0063 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0064 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0065 #include "FWCore/Concurrency/interface/chain_first.h"
0066 
0067 #include "FWCore/Utilities/interface/Algorithms.h"
0068 #include "FWCore/Utilities/interface/DebugMacros.h"
0069 #include "FWCore/Utilities/interface/EDMException.h"
0070 #include "FWCore/Utilities/interface/Exception.h"
0071 #include "FWCore/Utilities/interface/ConvertException.h"
0072 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0073 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0074 #include "FWCore/Utilities/interface/StreamID.h"
0075 #include "FWCore/Utilities/interface/propagate_const.h"
0076 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0077 
0078 #include "MessageForSource.h"
0079 #include "MessageForParent.h"
0080 #include "LuminosityBlockProcessingStatus.h"
0081 #include "RunProcessingStatus.h"
0082 
0083 #include "boost/range/adaptor/reversed.hpp"
0084 
0085 #include <cassert>
0086 #include <exception>
0087 #include <iomanip>
0088 #include <iostream>
0089 #include <utility>
0090 #include <sstream>
0091 
0092 #include <sys/ipc.h>
0093 #include <sys/msg.h>
0094 
0095 #include "oneapi/tbb/task.h"
0096 #include "oneapi/tbb/task_arena.h"
0097 
0098 //Used for CPU affinity
0099 #ifndef __APPLE__
0100 #include <sched.h>
0101 #endif
0102 
0103 namespace {
0104   class PauseQueueSentry {
0105   public:
0106     PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
0107     ~PauseQueueSentry() { queue_.resume(); }
0108 
0109   private:
0110     edm::SerialTaskQueue& queue_;
0111   };
0112 }  // namespace
0113 
0114 namespace edm {
0115 
0116   namespace chain = waiting_task::chain;
0117 
0118   // ---------------------------------------------------------------
0119   std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
0120                                          ParameterSet& params,
0121                                          CommonParams const& common,
0122                                          std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0123                                          std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0124                                          std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0125                                          std::shared_ptr<ActivityRegistry> areg,
0126                                          std::shared_ptr<ProcessConfiguration const> processConfiguration,
0127                                          PreallocationConfiguration const& allocations) {
0128     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0129     if (main_input == nullptr) {
0130       throw Exception(errors::Configuration)
0131           << "There must be exactly one source in the configuration.\n"
0132           << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0133     }
0134 
0135     std::string modtype(main_input->getParameter<std::string>("@module_type"));
0136 
0137     std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0138         ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0139     ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0140     filler->fill(descriptions);
0141 
0142     try {
0143       convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
0144     } catch (cms::Exception& iException) {
0145       std::ostringstream ost;
0146       ost << "Validating configuration of input source of type " << modtype;
0147       iException.addContext(ost.str());
0148       throw;
0149     }
0150 
0151     main_input->registerIt();
0152 
0153     // Fill in "ModuleDescription", in case the input source produces
0154     // any EDProducts, which would be registered in the ProductRegistry.
0155     // Also fill in the process history item for this process.
0156     // There is no module label for the unnamed input source, so
0157     // just use "source".
0158     // Only the tracked parameters belong in the process configuration.
0159     ModuleDescription md(main_input->id(),
0160                          main_input->getParameter<std::string>("@module_type"),
0161                          "source",
0162                          processConfiguration.get(),
0163                          moduleIndex);
0164 
0165     InputSourceDescription isdesc(md,
0166                                   branchIDListHelper,
0167                                   processBlockHelper,
0168                                   thinnedAssociationsHelper,
0169                                   areg,
0170                                   common.maxEventsInput_,
0171                                   common.maxLumisInput_,
0172                                   common.maxSecondsUntilRampdown_,
0173                                   allocations);
0174 
0175     areg->preSourceConstructionSignal_(md);
0176     std::unique_ptr<InputSource> input;
0177     try {
0178       //even if we have an exception, send the signal
0179       std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
0180       convertException::wrap([&]() {
0181         input = InputSourceFactory::get()->makeInputSource(*main_input, isdesc);
0182         input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
0183         input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
0184       });
0185     } catch (cms::Exception& iException) {
0186       std::ostringstream ost;
0187       ost << "Constructing input source of type " << modtype;
0188       iException.addContext(ost.str());
0189       throw;
0190     }
0191     return input;
0192   }
0193 
0194   // ---------------------------------------------------------------
0195   std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
0196                                            eventsetup::EventSetupProvider& cp,
0197                                            ParameterSet& params,
0198                                            std::vector<std::string> const& loopers) {
0199     std::shared_ptr<EDLooperBase> vLooper;
0200 
0201     assert(1 == loopers.size());
0202 
0203     for (auto const& looperName : loopers) {
0204       ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
0205       // Unlikely we would ever need the ModuleTypeResolver in Looper
0206       vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
0207     }
0208     return vLooper;
0209   }
0210 
0211   // ---------------------------------------------------------------
0212   EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,  //std::string const& config,
0213                                  ServiceToken const& iToken,
0214                                  serviceregistry::ServiceLegacy iLegacy,
0215                                  std::vector<std::string> const& defaultServices,
0216                                  std::vector<std::string> const& forcedServices)
0217       : actReg_(),
0218         preg_(),
0219         branchIDListHelper_(),
0220         serviceToken_(),
0221         input_(),
0222         moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0223         espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0224         esp_(),
0225         act_table_(),
0226         processConfiguration_(),
0227         schedule_(),
0228         historyAppender_(new HistoryAppender),
0229         fb_(),
0230         looper_(),
0231         deferredExceptionPtrIsSet_(false),
0232         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0233         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0234         principalCache_(),
0235         beginJobCalled_(false),
0236         shouldWeStop_(false),
0237         fileModeNoMerge_(false),
0238         exceptionMessageFiles_(),
0239         exceptionMessageRuns_(false),
0240         exceptionMessageLumis_(false),
0241         forceLooperToEnd_(false),
0242         looperBeginJobRun_(false),
0243         forceESCacheClearOnNewRun_(false),
0244         eventSetupDataToExcludeFromPrefetching_() {
0245     auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0246     processDesc->addServices(defaultServices, forcedServices);
0247     init(processDesc, iToken, iLegacy);
0248   }
0249 
0250   EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,  //std::string const& config,
0251                                  std::vector<std::string> const& defaultServices,
0252                                  std::vector<std::string> const& forcedServices)
0253       : actReg_(),
0254         preg_(),
0255         branchIDListHelper_(),
0256         serviceToken_(),
0257         input_(),
0258         moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0259         espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0260         esp_(),
0261         act_table_(),
0262         processConfiguration_(),
0263         schedule_(),
0264         historyAppender_(new HistoryAppender),
0265         fb_(),
0266         looper_(),
0267         deferredExceptionPtrIsSet_(false),
0268         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0269         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0270         principalCache_(),
0271         beginJobCalled_(false),
0272         shouldWeStop_(false),
0273         fileModeNoMerge_(false),
0274         exceptionMessageFiles_(),
0275         exceptionMessageRuns_(false),
0276         exceptionMessageLumis_(false),
0277         forceLooperToEnd_(false),
0278         looperBeginJobRun_(false),
0279         forceESCacheClearOnNewRun_(false),
0280         eventSetupDataToExcludeFromPrefetching_() {
0281     auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0282     processDesc->addServices(defaultServices, forcedServices);
0283     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
0284   }
0285 
0286   EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0287                                  ServiceToken const& token,
0288                                  serviceregistry::ServiceLegacy legacy)
0289       : actReg_(),
0290         preg_(),
0291         branchIDListHelper_(),
0292         serviceToken_(),
0293         input_(),
0294         moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
0295         espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0296         esp_(),
0297         act_table_(),
0298         processConfiguration_(),
0299         schedule_(),
0300         historyAppender_(new HistoryAppender),
0301         fb_(),
0302         looper_(),
0303         deferredExceptionPtrIsSet_(false),
0304         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0305         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0306         principalCache_(),
0307         beginJobCalled_(false),
0308         shouldWeStop_(false),
0309         fileModeNoMerge_(false),
0310         exceptionMessageFiles_(),
0311         exceptionMessageRuns_(false),
0312         exceptionMessageLumis_(false),
0313         forceLooperToEnd_(false),
0314         looperBeginJobRun_(false),
0315         forceESCacheClearOnNewRun_(false),
0316         eventSetupDataToExcludeFromPrefetching_() {
0317     init(processDesc, token, legacy);
0318   }
0319 
0320   void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
0321                             ServiceToken const& iToken,
0322                             serviceregistry::ServiceLegacy iLegacy) {
0323     //std::cerr << processDesc->dump() << std::endl;
0324 
0325     // register the empty parentage vector , once and for all
0326     ParentageRegistry::instance()->insertMapped(Parentage());
0327 
0328     // register the empty parameter set, once and for all.
0329     ParameterSet().registerIt();
0330 
0331     std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
0332 
0333     // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
0334     // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
0335     // set in here if the parameters were not explicitly set.
0336     validateTopLevelParameterSets(parameterSet.get());
0337 
0338     // Now set some parameters specific to the main process.
0339     ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
0340     auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
0341     if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
0342       throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
0343           << fileMode << ".\n"
0344           << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
0345     } else {
0346       fileModeNoMerge_ = (fileMode == "NOMERGE");
0347     }
0348     forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
0349     ensureAvailableAccelerators(*parameterSet);
0350 
0351     //threading
0352     unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
0353 
0354     // Even if numberOfThreads was set to zero in the Python configuration, the code
0355     // in cmsRun.cpp should have reset it to something else.
0356     assert(nThreads != 0);
0357 
0358     unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
0359     if (nStreams == 0) {
0360       nStreams = nThreads;
0361     }
0362     unsigned int nConcurrentLumis =
0363         optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
0364     if (nConcurrentLumis == 0) {
0365       nConcurrentLumis = 2;
0366     }
0367     if (nConcurrentLumis > nStreams) {
0368       nConcurrentLumis = nStreams;
0369     }
0370     unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
0371     if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
0372       nConcurrentRuns = nConcurrentLumis;
0373     }
0374     std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
0375     if (!loopers.empty()) {
0376       //For now loopers make us run only 1 transition at a time
0377       if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
0378         edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
0379                                                 "of concurrent runs, and the number of concurrent lumis "
0380                                                 "are all being reset to 1. Loopers cannot currently support "
0381                                                 "values greater than 1.";
0382         nStreams = 1;
0383         nConcurrentLumis = 1;
0384         nConcurrentRuns = 1;
0385       }
0386     }
0387     bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
0388     if (dumpOptions) {
0389       dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
0390     } else {
0391       if (nThreads > 1 or nStreams > 1) {
0392         edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
0393       }
0394     }
0395 
0396     // The number of concurrent IOVs is configured individually for each record in
0397     // the class NumberOfConcurrentIOVs to values less than or equal to this.
0398     // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
0399     // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
0400     // concurrent run past the first in use cases where IOVs change within a run.
0401     unsigned int maxConcurrentIOVs =
0402         3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
0403 
0404     IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
0405 
0406     printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
0407     deleteNonConsumedUnscheduledModules_ =
0408         optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
0409 
0410     branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
0411 
0412     if (not branchesToDeleteEarly_.empty()) {
0413       auto referencePSets =
0414           optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
0415       for (auto const& pset : referencePSets) {
0416         auto product = pset.getParameter<std::string>("product");
0417         auto references = pset.getParameter<std::vector<std::string>>("references");
0418         for (auto const& ref : references) {
0419           referencesToBranches_.emplace(product, ref);
0420         }
0421       }
0422       modulesToIgnoreForDeleteEarly_ =
0423           optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
0424     }
0425 
0426     // Now do general initialization
0427     ScheduleItems items;
0428 
0429     //initialize the services
0430     auto& serviceSets = processDesc->getServicesPSets();
0431     ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy);
0432     serviceToken_ = items.addTNS(*parameterSet, token);
0433 
0434     //make the services available
0435     ServiceRegistry::Operate operate(serviceToken_);
0436 
0437     CMS_SA_ALLOW try {
0438       if (nThreads > 1) {
0439         edm::Service<RootHandlers> handler;
0440         handler->willBeUsingThreads();
0441       }
0442 
0443       // intialize miscellaneous items
0444       std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
0445 
0446       // intialize the event setup provider
0447       ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
0448       esp_ = espController_->makeProvider(
0449           *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
0450 
0451       // initialize the looper, if any
0452       if (!loopers.empty()) {
0453         looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
0454         looper_->setActionTable(items.act_table_.get());
0455         looper_->attachTo(*items.actReg_);
0456 
0457         // in presence of looper do not delete modules
0458         deleteNonConsumedUnscheduledModules_ = false;
0459       }
0460 
0461       preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0462 
0463       runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
0464       lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
0465       streamQueues_.resize(nStreams);
0466       streamRunStatus_.resize(nStreams);
0467       streamLumiStatus_.resize(nStreams);
0468 
0469       processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0470 
0471       {
0472         std::optional<ScheduleItems::MadeModules> madeModules;
0473 
0474         //setup input and modules concurrently
0475         tbb::task_group group;
0476 
0477         // initialize the input source
0478         auto sourceID = ModuleDescription::getUniqueID();
0479 
0480         group.run([&, this]() {
0481           // initialize the Schedule
0482           ServiceRegistry::Operate operate(serviceToken_);
0483           auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0484           madeModules =
0485               items.initModules(*parameterSet, tns, preallocations_, &processContext_, moduleTypeResolverMaker_.get());
0486         });
0487 
0488         group.run([&, this]() {
0489           ServiceRegistry::Operate operate(serviceToken_);
0490           input_ = makeInput(sourceID,
0491                              *parameterSet,
0492                              *common,
0493                              items.branchIDListHelper(),
0494                              get_underlying_safe(processBlockHelper_),
0495                              items.thinnedAssociationsHelper(),
0496                              items.actReg_,
0497                              items.processConfiguration(),
0498                              preallocations_);
0499         });
0500 
0501         group.wait();
0502         items.preg()->addFromInput(input_->productRegistry());
0503         {
0504           auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0505           schedule_ = items.finishSchedule(
0506               std::move(*madeModules), *parameterSet, tns, preallocations_, &processContext_, *processBlockHelper_);
0507         }
0508       }
0509 
0510       // set the data members
0511       act_table_ = std::move(items.act_table_);
0512       actReg_ = items.actReg_;
0513       preg_ = std::make_shared<ProductRegistry>(items.preg()->moveTo());
0514       mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0515       branchIDListHelper_ = items.branchIDListHelper();
0516       thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0517       processConfiguration_ = items.processConfiguration();
0518       processContext_.setProcessConfiguration(processConfiguration_.get());
0519 
0520       {
0521         edm::Service<edm::JobReport> jr;
0522         if (jr.isAvailable()) {
0523           ProcessConfiguration reduced = *processConfiguration_;
0524           reduced.reduce();
0525           jr->reportProcess(reduced.processName(), reduced.id(), reduced.parameterSetID());
0526         }
0527       }
0528 
0529       FDEBUG(2) << parameterSet << std::endl;
0530 
0531       principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0532       for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0533         // Reusable event principal
0534         auto ep = std::make_shared<EventPrincipal>(preg(),
0535                                                    productResolversFactory::makePrimary,
0536                                                    branchIDListHelper(),
0537                                                    thinnedAssociationsHelper(),
0538                                                    *processConfiguration_,
0539                                                    historyAppender_.get(),
0540                                                    index,
0541                                                    &*processBlockHelper_);
0542         principalCache_.insert(std::move(ep));
0543       }
0544 
0545       for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0546         auto rp = std::make_unique<RunPrincipal>(preg(),
0547                                                  productResolversFactory::makePrimary,
0548                                                  *processConfiguration_,
0549                                                  historyAppender_.get(),
0550                                                  index,
0551                                                  &mergeableRunProductProcesses_);
0552         principalCache_.insert(std::move(rp));
0553       }
0554 
0555       for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0556         auto lp = std::make_unique<LuminosityBlockPrincipal>(
0557             preg(), productResolversFactory::makePrimary, *processConfiguration_, historyAppender_.get(), index);
0558         principalCache_.insert(std::move(lp));
0559       }
0560 
0561       {
0562         auto pb = std::make_unique<ProcessBlockPrincipal>(
0563             preg(), productResolversFactory::makePrimary, *processConfiguration_);
0564         principalCache_.insert(std::move(pb));
0565 
0566         auto pbForInput = std::make_unique<ProcessBlockPrincipal>(
0567             preg(), productResolversFactory::makePrimary, *processConfiguration_);
0568         principalCache_.insertForInput(std::move(pbForInput));
0569       }
0570     } catch (...) {
0571       //in case of an exception, make sure Services are available
0572       // during the following destructors
0573       espController_ = nullptr;
0574       esp_ = nullptr;
0575       schedule_ = nullptr;
0576       input_ = nullptr;
0577       looper_ = nullptr;
0578       actReg_ = nullptr;
0579       throw;
0580     }
0581   }
0582 
0583   EventProcessor::~EventProcessor() {
0584     // Make the services available while everything is being deleted.
0585     ServiceToken token = getToken();
0586     ServiceRegistry::Operate op(token);
0587 
0588     // manually destroy all these thing that may need the services around
0589     // propagate_const<T> has no reset() function
0590     espController_ = nullptr;
0591     esp_ = nullptr;
0592     schedule_ = nullptr;
0593     input_ = nullptr;
0594     looper_ = nullptr;
0595     actReg_ = nullptr;
0596 
0597     pset::Registry::instance()->clear();
0598     ParentageRegistry::instance()->clear();
0599   }
0600 
0601   void EventProcessor::taskCleanup() {
0602     edm::FinalWaitingTask task{taskGroup_};
0603     espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
0604     task.waitNoThrow();
0605     assert(task.done());
0606   }
0607 
0608   void EventProcessor::beginJob() {
0609     if (beginJobCalled_)
0610       return;
0611     beginJobCalled_ = true;
0612     bk::beginJob();
0613 
0614     ServiceRegistry::Operate operate(serviceToken_);
0615 
0616     service::SystemBounds bounds(preallocations_.numberOfStreams(),
0617                                  preallocations_.numberOfLuminosityBlocks(),
0618                                  preallocations_.numberOfRuns(),
0619                                  preallocations_.numberOfThreads());
0620     actReg_->preallocateSignal_(bounds);
0621     schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0622 
0623     PathsAndConsumesOfModules pathsAndConsumesOfModules;
0624     pathsAndConsumesOfModules.initialize(schedule_.get(), preg());
0625 
0626     // Note: all these may throw
0627     checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, printDependencies_);
0628     if (deleteNonConsumedUnscheduledModules_) {
0629       if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules);
0630           not unusedModules.empty()) {
0631         pathsAndConsumesOfModules.removeModules(unusedModules);
0632 
0633         edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
0634           l << "The following modules are not in any Path or EndPath, nor is their output consumed by any other "
0635                "module, "
0636                "and therefore they are deleted before the beginJob transition.";
0637           for (auto const& description : unusedModules) {
0638             l << "\n " << description->moduleLabel();
0639           }
0640         });
0641         for (auto const& description : unusedModules) {
0642           schedule_->deleteModule(description->moduleLabel(), actReg_.get());
0643         }
0644       }
0645     }
0646     // Initialize after the deletion of non-consumed unscheduled
0647     // modules to avoid non-consumed non-run modules to keep the
0648     // products unnecessarily alive
0649     if (not branchesToDeleteEarly_.empty()) {
0650       auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
0651       auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
0652       auto referencesToBranches = std::move(referencesToBranches_);
0653       schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
0654     }
0655 
0656     if (preallocations_.numberOfLuminosityBlocks() > 1) {
0657       throwAboutModulesRequiringLuminosityBlockSynchronization();
0658     }
0659     if (preallocations_.numberOfRuns() > 1) {
0660       warnAboutModulesRequiringRunSynchronization();
0661     }
0662 
0663     //NOTE:  This implementation assumes 'Job' means one call
0664     // the EventProcessor::run
0665     // If it really means once per 'application' then this code will
0666     // have to be changed.
0667     // Also have to deal with case where have 'run' then new Module
0668     // added and do 'run'
0669     // again.  In that case the newly added Module needs its 'beginJob'
0670     // to be called.
0671 
0672     //NOTE: in future we should have a beginOfJob for looper that takes no arguments
0673     //  For now we delay calling beginOfJob until first beginOfRun
0674     //if(looper_) {
0675     //   looper_->beginOfJob(es);
0676     //}
0677     espController_->finishConfiguration();
0678 
0679     eventsetup::ESRecordsToProductResolverIndices esRecordsToProductResolverIndices = esp_->recordsToResolverIndices();
0680 
0681     actReg_->eventSetupConfigurationSignal_(esRecordsToProductResolverIndices, processContext_);
0682     try {
0683       convertException::wrap([&]() { input_->doBeginJob(*preg_); });
0684     } catch (cms::Exception& ex) {
0685       ex.addContext("Calling beginJob for the source");
0686       throw;
0687     }
0688 
0689     beginJobStartedModules_ = true;
0690 
0691     // If we execute the beginJob transition for any module then we execute it
0692     // for all of the modules. We save the first exception and rethrow that
0693     // after they all complete.
0694     std::exception_ptr firstException;
0695     CMS_SA_ALLOW try {
0696       schedule_->beginJob(
0697           *preg_, esRecordsToProductResolverIndices, *processBlockHelper_, processContext_.processName());
0698     } catch (...) {
0699       firstException = std::current_exception();
0700     }
0701     if (looper_ && !firstException) {
0702       CMS_SA_ALLOW try {
0703         constexpr bool mustPrefetchMayGet = true;
0704         auto const processBlockLookup = preg_->productLookup(InProcess);
0705         auto const runLookup = preg_->productLookup(InRun);
0706         auto const lumiLookup = preg_->productLookup(InLumi);
0707         auto const eventLookup = preg_->productLookup(InEvent);
0708         looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
0709         looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
0710         looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
0711         looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
0712         looper_->updateLookup(esRecordsToProductResolverIndices);
0713       } catch (...) {
0714         firstException = std::current_exception();
0715       }
0716     }
0717     if (firstException) {
0718       std::rethrow_exception(firstException);
0719     }
0720     pathsAndConsumesOfModules.initializeForEventSetup(*esp_);
0721     actReg_->lookupInitializationCompleteSignal_(pathsAndConsumesOfModules, processContext_);
0722     schedule_->releaseMemoryPostLookupSignal();
0723 
0724     beginJobSucceeded_ = true;
0725     beginStreams();
0726   }
0727 
0728   void EventProcessor::beginStreams() {
0729     // This will process streams concurrently, but not modules in the
0730     // same stream.
0731     oneapi::tbb::task_group group;
0732     FinalWaitingTask finalWaitingTask{group};
0733     using namespace edm::waiting_task::chain;
0734     {
0735       WaitingTaskHolder taskHolder(group, &finalWaitingTask);
0736       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0737         first([this, i](auto nextTask) {
0738           std::exception_ptr exceptionPtr;
0739           {
0740             ServiceRegistry::Operate operate(serviceToken_);
0741             CMS_SA_ALLOW try { schedule_->beginStream(i); } catch (...) {
0742               exceptionPtr = std::current_exception();
0743             }
0744           }
0745           nextTask.doneWaiting(exceptionPtr);
0746         }) | lastTask(taskHolder);
0747       }
0748     }
0749     finalWaitingTask.wait();
0750   }
0751 
0752   void EventProcessor::endStreams(ExceptionCollector& collector) noexcept {
0753     std::mutex collectorMutex;
0754 
0755     // This will process streams concurrently, but not modules in the
0756     // same stream.
0757     oneapi::tbb::task_group group;
0758     FinalWaitingTask finalWaitingTask{group};
0759     using namespace edm::waiting_task::chain;
0760     {
0761       WaitingTaskHolder taskHolder(group, &finalWaitingTask);
0762       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0763         first([this, i, &collector, &collectorMutex](auto nextTask) {
0764           {
0765             ServiceRegistry::Operate operate(serviceToken_);
0766             schedule_->endStream(i, collector, collectorMutex);
0767           }
0768         }) | lastTask(taskHolder);
0769       }
0770     }
0771     finalWaitingTask.waitNoThrow();
0772   }
0773 
0774   void EventProcessor::endJob() {
0775     // Collects exceptions, so we don't throw before all operations are performed.
0776     ExceptionCollector c(
0777         "Multiple exceptions were thrown while executing endStream and endJob. An exception message follows for "
0778         "each.\n");
0779 
0780     //make the services available
0781     ServiceRegistry::Operate operate(serviceToken_);
0782 
0783     if (beginJobSucceeded_) {
0784       endStreams(c);
0785     }
0786 
0787     if (beginJobStartedModules_) {
0788       schedule_->endJob(c);
0789       c.call(std::bind(&InputSource::doEndJob, input_.get()));
0790       if (looper_) {
0791         c.call(std::bind(&EDLooperBase::endOfJob, looper()));
0792       }
0793       if (c.hasThrown()) {
0794         c.rethrow();
0795       }
0796     }
0797   }
0798 
0799   ServiceToken EventProcessor::getToken() { return serviceToken_; }
0800 
0801   std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
0802     return schedule_->getAllModuleDescriptions();
0803   }
0804 
0805   int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
0806 
0807   int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
0808 
0809   int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
0810 
0811   void EventProcessor::clearCounters() { schedule_->clearCounters(); }
0812 
0813   namespace {
0814 #include "TransitionProcessors.icc"
0815   }
0816 
0817   bool EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
0818     bool returnValue = false;
0819 
0820     // Look for a shutdown signal
0821     if (shutdown_flag.load(std::memory_order_acquire)) {
0822       returnValue = true;
0823       edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
0824       edm::Service<edm::JobReport> jr;
0825       jr->reportShutdownSignal();
0826       returnCode = epSignal;
0827     }
0828     return returnValue;
0829   }
0830 
0831   namespace {
0832     struct SourceNextGuard {
0833       SourceNextGuard(edm::ActivityRegistry& iReg) : act_(iReg) { iReg.preSourceNextTransitionSignal_.emit(); }
0834       ~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
0835       edm::ActivityRegistry& act_;
0836     };
0837   }  // namespace
0838 
0839   InputSource::ItemTypeInfo EventProcessor::nextTransitionType() {
0840     SendSourceTerminationSignalIfException sentry(actReg_.get());
0841     InputSource::ItemTypeInfo itemTypeInfo;
0842     {
0843       SourceNextGuard guard(*actReg_.get());
0844       //For now, do nothing with InputSource::IsSynchronize
0845       do {
0846         itemTypeInfo = input_->nextItemType();
0847       } while (itemTypeInfo == InputSource::ItemType::IsSynchronize);
0848     }
0849     lastSourceTransition_ = itemTypeInfo;
0850     sentry.completedSuccessfully();
0851 
0852     StatusCode returnCode = epSuccess;
0853 
0854     if (checkForAsyncStopRequest(returnCode)) {
0855       actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
0856       lastSourceTransition_ = InputSource::ItemType::IsStop;
0857     }
0858 
0859     return lastSourceTransition_;
0860   }
0861 
0862   void EventProcessor::nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
0863                                                WaitingTaskHolder nextTask) {
0864     auto group = nextTask.group();
0865     sourceResourcesAcquirer_.serialQueueChain().push(
0866         *group, [this, runStatus = std::move(iRunStatus), nextHolder = std::move(nextTask)]() mutable {
0867           CMS_SA_ALLOW try {
0868             ServiceRegistry::Operate operate(serviceToken_);
0869             std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
0870             nextTransitionType();
0871             if (lastTransitionType() == InputSource::ItemType::IsRun &&
0872                 runStatus->runPrincipal()->run() == input_->run() &&
0873                 runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
0874               throw Exception(errors::LogicError)
0875                   << "InputSource claimed previous Run Entry was last to be merged in this file,\n"
0876                   << "but the next entry has the same run number and reduced ProcessHistoryID.\n"
0877                   << "This is probably a bug in the InputSource. Please report to the Core group.\n";
0878             }
0879           } catch (...) {
0880             nextHolder.doneWaiting(std::current_exception());
0881           }
0882         });
0883   }
0884 
0885   EventProcessor::StatusCode EventProcessor::runToCompletion() {
0886     beginJob();  //make sure this was called
0887 
0888     // make the services available
0889     ServiceRegistry::Operate operate(serviceToken_);
0890     actReg_->beginProcessingSignal_();
0891     auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
0892     std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
0893     try {
0894       FilesProcessor fp(fileModeNoMerge_);
0895 
0896       convertException::wrap([&]() {
0897         bool firstTime = true;
0898         do {
0899           if (not firstTime) {
0900             prepareForNextLoop();
0901             rewindInput();
0902           } else {
0903             firstTime = false;
0904           }
0905           startingNewLoop();
0906 
0907           auto trans = fp.processFiles(*this);
0908 
0909           fp.normalEnd();
0910 
0911           if (deferredExceptionPtrIsSet_.load()) {
0912             std::rethrow_exception(deferredExceptionPtr_);
0913           }
0914           if (trans != InputSource::ItemType::IsStop) {
0915             //problem with the source
0916             doErrorStuff();
0917 
0918             throw cms::Exception("BadTransition") << "Unexpected transition change " << static_cast<int>(trans);
0919           }
0920         } while (not endOfLoop());
0921       });  // convertException::wrap
0922 
0923     }  // Try block
0924     catch (cms::Exception& e) {
0925       if (exceptionMessageLumis_) {
0926         std::string message(
0927             "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
0928         e.addAdditionalInfo(message);
0929         if (e.alreadyPrinted()) {
0930           LogAbsolute("Additional Exceptions") << message;
0931         }
0932       }
0933       if (exceptionMessageRuns_) {
0934         std::string message(
0935             "Another exception was caught while trying to clean up runs after the primary fatal exception.");
0936         e.addAdditionalInfo(message);
0937         if (e.alreadyPrinted()) {
0938           LogAbsolute("Additional Exceptions") << message;
0939         }
0940       }
0941       if (!exceptionMessageFiles_.empty()) {
0942         e.addAdditionalInfo(exceptionMessageFiles_);
0943         if (e.alreadyPrinted()) {
0944           LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
0945         }
0946       }
0947       throw;
0948     }
0949     return epSuccess;
0950   }
0951 
0952   void EventProcessor::readFile() {
0953     FDEBUG(1) << " \treadFile\n";
0954     SendSourceTerminationSignalIfException sentry(actReg_.get());
0955 
0956     if (streamRunActive_ > 0) {
0957       //deals with data structures that allows merged Run products to be split on Lumi boundaries then
0958       // in later processes reintegrated.
0959       streamRunStatus_[0]->runPrincipal()->preReadFile();
0960     }
0961 
0962     auto oldCacheID = input_->productRegistry().cacheIdentifier();
0963     fb_ = input_->readFile();
0964     //incase the input's registry changed
0965     if (input_->productRegistry().cacheIdentifier() != oldCacheID) {
0966       auto temp = std::make_shared<edm::ProductRegistry>(*preg_);
0967       temp->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string());
0968       preg_ = std::move(temp);
0969     }
0970     if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
0971       fb_->setNotFastClonable(FileBlock::ParallelProcesses);
0972     }
0973     sentry.completedSuccessfully();
0974   }
0975 
0976   void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
0977     if (fileBlockValid()) {
0978       SendSourceTerminationSignalIfException sentry(actReg_.get());
0979       input_->closeFile(fb_.get(), cleaningUpAfterException);
0980       sentry.completedSuccessfully();
0981     }
0982     FDEBUG(1) << "\tcloseInputFile\n";
0983   }
0984 
0985   void EventProcessor::openOutputFiles() {
0986     if (fileBlockValid()) {
0987       schedule_->openOutputFiles(*fb_);
0988     }
0989     FDEBUG(1) << "\topenOutputFiles\n";
0990   }
0991 
0992   void EventProcessor::closeOutputFiles() {
0993     schedule_->closeOutputFiles();
0994     processBlockHelper_->clearAfterOutputFilesClose();
0995     FDEBUG(1) << "\tcloseOutputFiles\n";
0996   }
0997 
0998   void EventProcessor::respondToOpenInputFile() {
0999     if (fileBlockValid()) {
1000       schedule_->respondToOpenInputFile(*fb_);
1001     }
1002     FDEBUG(1) << "\trespondToOpenInputFile\n";
1003   }
1004 
1005   void EventProcessor::respondToCloseInputFile() {
1006     if (fileBlockValid()) {
1007       schedule_->respondToCloseInputFile(*fb_);
1008     }
1009     FDEBUG(1) << "\trespondToCloseInputFile\n";
1010   }
1011 
1012   void EventProcessor::startingNewLoop() {
1013     shouldWeStop_ = false;
1014     //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1015     // until after we've called beginOfJob
1016     if (looper_ && looperBeginJobRun_) {
1017       looper_->doStartingNewLoop();
1018     }
1019     FDEBUG(1) << "\tstartingNewLoop\n";
1020   }
1021 
1022   bool EventProcessor::endOfLoop() {
1023     if (looper_) {
1024       SignallingProductRegistryFiller sReg(*preg());
1025       ModuleChanger changer(schedule_.get(), &sReg, esp_->recordsToResolverIndices());
1026       looper_->setModuleChanger(&changer);
1027       EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1028       looper_->setModuleChanger(nullptr);
1029       if (status != EDLooperBase::kContinue || forceLooperToEnd_)
1030         return true;
1031       else
1032         return false;
1033     }
1034     FDEBUG(1) << "\tendOfLoop\n";
1035     return true;
1036   }
1037 
1038   void EventProcessor::rewindInput() {
1039     input_->repeat();
1040     input_->rewind();
1041     FDEBUG(1) << "\trewind\n";
1042   }
1043 
1044   void EventProcessor::prepareForNextLoop() {
1045     looper_->prepareForNextLoop(esp_.get());
1046     FDEBUG(1) << "\tprepareForNextLoop\n";
1047   }
1048 
1049   bool EventProcessor::shouldWeCloseOutput() const {
1050     FDEBUG(1) << "\tshouldWeCloseOutput\n";
1051     return schedule_->shouldWeCloseOutput();
1052   }
1053 
1054   void EventProcessor::doErrorStuff() {
1055     FDEBUG(1) << "\tdoErrorStuff\n";
1056     LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1057                              << "and went to the error state\n"
1058                              << "Will attempt to terminate processing normally\n"
1059                              << "(IF using the looper the next loop will be attempted)\n"
1060                              << "This likely indicates a bug in an input module or corrupted input or both\n";
1061   }
1062 
1063   void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1064     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1065     processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1066 
1067     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1068     FinalWaitingTask globalWaitTask{taskGroup_};
1069 
1070     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1071     schedule_->processOneGlobalAsync<Traits>(
1072         WaitingTaskHolder(taskGroup_, &globalWaitTask), transitionInfo, serviceToken_);
1073 
1074     globalWaitTask.wait();
1075     beginProcessBlockSucceeded = true;
1076   }
1077 
1078   void EventProcessor::inputProcessBlocks() {
1079     input_->fillProcessBlockHelper();
1080     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1081     while (input_->nextProcessBlock(processBlockPrincipal)) {
1082       readProcessBlock(processBlockPrincipal);
1083 
1084       using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1085       FinalWaitingTask globalWaitTask{taskGroup_};
1086 
1087       ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1088       schedule_->processOneGlobalAsync<Traits>(
1089           WaitingTaskHolder(taskGroup_, &globalWaitTask), transitionInfo, serviceToken_);
1090 
1091       globalWaitTask.wait();
1092 
1093       FinalWaitingTask writeWaitTask{taskGroup_};
1094       writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::Input);
1095       writeWaitTask.wait();
1096 
1097       processBlockPrincipal.clearPrincipal();
1098     }
1099   }
1100 
1101   void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1102     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1103 
1104     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1105     FinalWaitingTask globalWaitTask{taskGroup_};
1106 
1107     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1108     schedule_->processOneGlobalAsync<Traits>(
1109         WaitingTaskHolder(taskGroup_, &globalWaitTask), transitionInfo, serviceToken_, cleaningUpAfterException);
1110     globalWaitTask.wait();
1111 
1112     if (beginProcessBlockSucceeded) {
1113       FinalWaitingTask writeWaitTask{taskGroup_};
1114       writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::New);
1115       writeWaitTask.wait();
1116     }
1117 
1118     processBlockPrincipal.clearPrincipal();
1119   }
1120 
1121   InputSource::ItemType EventProcessor::processRuns() {
1122     FinalWaitingTask waitTask{taskGroup_};
1123     assert(lastTransitionType() == InputSource::ItemType::IsRun);
1124     if (streamRunActive_ == 0) {
1125       assert(streamLumiActive_ == 0);
1126 
1127       beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1128                     WaitingTaskHolder{taskGroup_, &waitTask});
1129     } else {
1130       assert(streamRunActive_ == preallocations_.numberOfStreams());
1131 
1132       auto runStatus = streamRunStatus_[0];
1133 
1134       while (lastTransitionType() == InputSource::ItemType::IsRun and
1135              runStatus->runPrincipal()->run() == input_->run() and
1136              runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1137         readAndMergeRun(*runStatus);
1138         nextTransitionType();
1139       }
1140 
1141       setNeedToCallNext(false);
1142 
1143       WaitingTaskHolder holder{taskGroup_, &waitTask};
1144       runStatus->setHolderOfTaskInProcessRuns(holder);
1145       if (streamLumiActive_ > 0) {
1146         assert(streamLumiActive_ == preallocations_.numberOfStreams());
1147         continueLumiAsync(std::move(holder));
1148       } else {
1149         handleNextItemAfterMergingRunEntries(std::move(runStatus), std::move(holder));
1150       }
1151     }
1152     waitTask.wait();
1153     return lastTransitionType();
1154   }
1155 
1156   void EventProcessor::beginRunAsync(IOVSyncValue const& iSync, WaitingTaskHolder iHolder) {
1157     if (iHolder.taskHasFailed()) {
1158       return;
1159     }
1160 
1161     actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1162 
1163     auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1164 
1165     chain::first([this, &status, iSync](auto nextTask) {
1166       espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1167                                                            nextTask,
1168                                                            status->endIOVWaitingTasks(),
1169                                                            status->eventSetupImpls(),
1170                                                            queueWhichWaitsForIOVsToFinish_,
1171                                                            actReg_.get(),
1172                                                            serviceToken_,
1173                                                            forceESCacheClearOnNewRun_);
1174     }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1175       CMS_SA_ALLOW try {
1176         if (iException) {
1177           WaitingTaskHolder copyHolder(nextTask);
1178           copyHolder.doneWaiting(*iException);
1179           // Finish handling the exception in the task pushed to runQueue_
1180         }
1181         ServiceRegistry::Operate operate(serviceToken_);
1182 
1183         runQueue_->pushAndPause(
1184             *nextTask.group(),
1185             [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1186               CMS_SA_ALLOW try {
1187                 if (postRunQueueTask.taskHasFailed()) {
1188                   status->resetBeginResources();
1189                   queueWhichWaitsForIOVsToFinish_.resume();
1190                   return;
1191                 }
1192 
1193                 status->setResumer(std::move(iResumer));
1194 
1195                 sourceResourcesAcquirer_.serialQueueChain().push(
1196                     *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1197                       CMS_SA_ALLOW try {
1198                         ServiceRegistry::Operate operate(serviceToken_);
1199 
1200                         if (postSourceTask.taskHasFailed()) {
1201                           status->resetBeginResources();
1202                           queueWhichWaitsForIOVsToFinish_.resume();
1203                           status->resumeGlobalRunQueue();
1204                           return;
1205                         }
1206 
1207                         {
1208                           std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1209                           status->setRunPrincipal(readRun());
1210 
1211                           RunPrincipal& runPrincipal = *status->runPrincipal();
1212                           {
1213                             SendSourceTerminationSignalIfException sentry(actReg_.get());
1214                             input_->doBeginRun(runPrincipal, &processContext_);
1215                             sentry.completedSuccessfully();
1216                           }
1217                         }
1218 
1219                         EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1220                         if (looper_ && looperBeginJobRun_ == false) {
1221                           looper_->copyInfo(ScheduleInfo(schedule_.get()));
1222 
1223                           oneapi::tbb::task_group group;
1224                           FinalWaitingTask waitTask{group};
1225                           using namespace edm::waiting_task::chain;
1226                           chain::first([this, &es](auto nextTask) {
1227                             looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1228                           }) | then([this, &es](auto nextTask) mutable {
1229                             looper_->beginOfJob(es);
1230                             looperBeginJobRun_ = true;
1231                             looper_->doStartingNewLoop();
1232                           }) | runLast(WaitingTaskHolder(group, &waitTask));
1233                           waitTask.wait();
1234                         }
1235 
1236                         using namespace edm::waiting_task::chain;
1237                         chain::first([this, status](auto nextTask) mutable {
1238                           CMS_SA_ALLOW try {
1239                             if (lastTransitionType().itemPosition() != InputSource::ItemPosition::LastItemToBeMerged) {
1240                               readAndMergeRunEntriesAsync(status, nextTask);
1241                             } else {
1242                               setNeedToCallNext(true);
1243                             }
1244                           } catch (...) {
1245                             status->setStopBeforeProcessingRun(true);
1246                             nextTask.doneWaiting(std::current_exception());
1247                           }
1248                         }) | then([this, status, &es](auto nextTask) {
1249                           if (status->stopBeforeProcessingRun()) {
1250                             return;
1251                           }
1252                           RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1253                           using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1254                           schedule_->processOneGlobalAsync<Traits>(nextTask, transitionInfo, serviceToken_);
1255                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1256                           if (status->stopBeforeProcessingRun()) {
1257                             return;
1258                           }
1259                           looper_->prefetchAsync(
1260                               nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1261                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1262                           if (status->stopBeforeProcessingRun()) {
1263                             return;
1264                           }
1265                           ServiceRegistry::Operate operateLooper(serviceToken_);
1266                           looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1267                         }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1268                           if (iException) {
1269                             WaitingTaskHolder copyHolder(holder);
1270                             copyHolder.doneWaiting(*iException);
1271                           } else {
1272                             status->globalBeginDidSucceed();
1273                           }
1274 
1275                           if (status->stopBeforeProcessingRun()) {
1276                             // We just quit now if there was a failure when merging runs
1277                             status->resetBeginResources();
1278                             queueWhichWaitsForIOVsToFinish_.resume();
1279                             status->resumeGlobalRunQueue();
1280                             return;
1281                           }
1282                           CMS_SA_ALLOW try {
1283                             // Under normal circumstances, this task runs after endRun has completed for all streams
1284                             // and global endLumi has completed for all lumis contained in this run
1285                             auto globalEndRunTask =
1286                                 edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1287                                   WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1288                                   status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1289                                   globalEndRunAsync(std::move(taskHolder), std::move(status));
1290                                 });
1291                             status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1292                           } catch (...) {
1293                             status->resetBeginResources();
1294                             queueWhichWaitsForIOVsToFinish_.resume();
1295                             status->resumeGlobalRunQueue();
1296                             holder.doneWaiting(std::current_exception());
1297                             return;
1298                           }
1299 
1300                           // After this point we are committed to end the run via endRunAsync
1301 
1302                           ServiceRegistry::Operate operate(serviceToken_);
1303 
1304                           // The only purpose of the pause is to cause stream begin run to execute before
1305                           // global begin lumi in the single threaded case (maintains consistency with
1306                           // the order that existed before concurrent runs were implemented).
1307                           PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1308 
1309                           CMS_SA_ALLOW try {
1310                             streamQueuesInserter_.push(*holder.group(), [this, status, holder]() mutable {
1311                               for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1312                                 CMS_SA_ALLOW try {
1313                                   streamQueues_[i].push(*holder.group(), [this, i, status, holder]() mutable {
1314                                     streamBeginRunAsync(i, std::move(status), std::move(holder));
1315                                   });
1316                                 } catch (...) {
1317                                   if (status->streamFinishedBeginRun()) {
1318                                     WaitingTaskHolder copyHolder(holder);
1319                                     copyHolder.doneWaiting(std::current_exception());
1320                                     status->resetBeginResources();
1321                                     queueWhichWaitsForIOVsToFinish_.resume();
1322                                     exceptionRunStatus_ = status;
1323                                   }
1324                                 }
1325                               }
1326                             });
1327                           } catch (...) {
1328                             WaitingTaskHolder copyHolder(holder);
1329                             copyHolder.doneWaiting(std::current_exception());
1330                             status->resetBeginResources();
1331                             queueWhichWaitsForIOVsToFinish_.resume();
1332                             exceptionRunStatus_ = status;
1333                           }
1334                           handleNextItemAfterMergingRunEntries(status, holder);
1335                         }) | runLast(postSourceTask);
1336                       } catch (...) {
1337                         status->resetBeginResources();
1338                         queueWhichWaitsForIOVsToFinish_.resume();
1339                         status->resumeGlobalRunQueue();
1340                         postSourceTask.doneWaiting(std::current_exception());
1341                       }
1342                     });  // task in sourceResourcesAcquirer
1343               } catch (...) {
1344                 status->resetBeginResources();
1345                 queueWhichWaitsForIOVsToFinish_.resume();
1346                 status->resumeGlobalRunQueue();
1347                 postRunQueueTask.doneWaiting(std::current_exception());
1348               }
1349             });  // task in runQueue
1350       } catch (...) {
1351         status->resetBeginResources();
1352         queueWhichWaitsForIOVsToFinish_.resume();
1353         nextTask.doneWaiting(std::current_exception());
1354       }
1355     }) | chain::runLast(std::move(iHolder));
1356   }
1357 
1358   void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1359                                            std::shared_ptr<RunProcessingStatus> status,
1360                                            WaitingTaskHolder iHolder) noexcept {
1361     // These shouldn't throw
1362     streamQueues_[iStream].pause();
1363     ++streamRunActive_;
1364     streamRunStatus_[iStream] = std::move(status);
1365 
1366     CMS_SA_ALLOW try {
1367       using namespace edm::waiting_task::chain;
1368       chain::first([this, iStream](auto nextTask) {
1369         RunProcessingStatus& rs = *streamRunStatus_[iStream];
1370         if (rs.didGlobalBeginSucceed()) {
1371           RunTransitionInfo transitionInfo(
1372               *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1373           using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1374           schedule_->processOneStreamAsync<Traits>(std::move(nextTask), iStream, transitionInfo, serviceToken_);
1375         }
1376       }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1377         if (exceptionFromBeginStreamRun) {
1378           nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1379         }
1380         releaseBeginRunResources(iStream);
1381       }) | runLast(iHolder);
1382     } catch (...) {
1383       releaseBeginRunResources(iStream);
1384       iHolder.doneWaiting(std::current_exception());
1385     }
1386   }
1387 
1388   void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1389     auto& status = streamRunStatus_[iStream];
1390     if (status->streamFinishedBeginRun()) {
1391       status->resetBeginResources();
1392       queueWhichWaitsForIOVsToFinish_.resume();
1393     }
1394     streamQueues_[iStream].resume();
1395   }
1396 
1397   void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1398     RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1399     iRunStatus->setEndTime();
1400     IOVSyncValue ts(
1401         EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1402         runPrincipal.endTime());
1403     CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1404       WaitingTaskHolder copyHolder(iHolder);
1405       copyHolder.doneWaiting(std::current_exception());
1406     }
1407 
1408     chain::first([this, &iRunStatus, &ts](auto nextTask) {
1409       espController_->runOrQueueEventSetupForInstanceAsync(ts,
1410                                                            nextTask,
1411                                                            iRunStatus->endIOVWaitingTasksEndRun(),
1412                                                            iRunStatus->eventSetupImplsEndRun(),
1413                                                            queueWhichWaitsForIOVsToFinish_,
1414                                                            actReg_.get(),
1415                                                            serviceToken_);
1416     }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1417       if (iException) {
1418         iRunStatus->setEndingEventSetupSucceeded(false);
1419         handleEndRunExceptions(*iException, nextTask);
1420       }
1421       ServiceRegistry::Operate operate(serviceToken_);
1422       streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1423         for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1424           CMS_SA_ALLOW try {
1425             streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1426               streamQueues_[i].pause();
1427               streamEndRunAsync(std::move(nextTask), i);
1428             });
1429           } catch (...) {
1430             WaitingTaskHolder copyHolder(nextTask);
1431             copyHolder.doneWaiting(std::current_exception());
1432           }
1433         }
1434       });
1435 
1436       if (lastTransitionType() == InputSource::ItemType::IsRun) {
1437         CMS_SA_ALLOW try {
1438           beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1439         } catch (...) {
1440           WaitingTaskHolder copyHolder(nextTask);
1441           copyHolder.doneWaiting(std::current_exception());
1442         }
1443       }
1444     }) | chain::runLast(std::move(iHolder));
1445   }
1446 
1447   void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1448     if (holder.taskHasFailed()) {
1449       setExceptionMessageRuns();
1450     } else {
1451       WaitingTaskHolder tmp(holder);
1452       tmp.doneWaiting(iException);
1453     }
1454   }
1455 
1456   void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1457     auto& runPrincipal = *(iRunStatus->runPrincipal());
1458     bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1459     bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1460     EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1461     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1462     bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1463 
1464     MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1465     using namespace edm::waiting_task::chain;
1466     chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1467                      auto nextTask) {
1468       if (endingEventSetupSucceeded) {
1469         RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1470         using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1471         schedule_->processOneGlobalAsync<Traits>(
1472             std::move(nextTask), transitionInfo, serviceToken_, cleaningUpAfterException);
1473       }
1474     }) |
1475         ifThen(looper_ && endingEventSetupSucceeded,
1476                [this, &runPrincipal, &es](auto nextTask) {
1477                  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1478                }) |
1479         ifThen(looper_ && endingEventSetupSucceeded,
1480                [this, &runPrincipal, &es](auto nextTask) {
1481                  ServiceRegistry::Operate operate(serviceToken_);
1482                  looper_->doEndRun(runPrincipal, es, &processContext_);
1483                }) |
1484         ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1485                [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1486                  mergeableRunProductMetadata->preWriteRun();
1487                  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1488                }) |
1489         then([status = std::move(iRunStatus),
1490               this,
1491               didGlobalBeginSucceed,
1492               mergeableRunProductMetadata,
1493               endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1494           if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1495             mergeableRunProductMetadata->postWriteRun();
1496           }
1497           if (iException) {
1498             handleEndRunExceptions(*iException, nextTask);
1499           }
1500           ServiceRegistry::Operate operate(serviceToken_);
1501 
1502           std::exception_ptr ptr;
1503 
1504           // Try hard to clean up resources so the
1505           // process can terminate in a controlled
1506           // fashion even after exceptions have occurred.
1507           CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1508             if (not ptr) {
1509               ptr = std::current_exception();
1510             }
1511           }
1512           CMS_SA_ALLOW try {
1513             status->resumeGlobalRunQueue();
1514             queueWhichWaitsForIOVsToFinish_.resume();
1515           } catch (...) {
1516             if (not ptr) {
1517               ptr = std::current_exception();
1518             }
1519           }
1520           CMS_SA_ALLOW try {
1521             status->resetEndResources();
1522             status.reset();
1523           } catch (...) {
1524             if (not ptr) {
1525               ptr = std::current_exception();
1526             }
1527           }
1528 
1529           if (ptr && !iException) {
1530             handleEndRunExceptions(ptr, nextTask);
1531           }
1532         }) |
1533         runLast(std::move(iTask));
1534   }
1535 
1536   void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1537     CMS_SA_ALLOW try {
1538       if (!streamRunStatus_[iStreamIndex]) {
1539         if (exceptionRunStatus_->streamFinishedRun()) {
1540           exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1541           exceptionRunStatus_.reset();
1542         }
1543         return;
1544       }
1545 
1546       auto runDoneTask =
1547           edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1548             if (iException) {
1549               handleEndRunExceptions(*iException, iTask);
1550             }
1551 
1552             auto runStatus = streamRunStatus_[iStreamIndex];
1553 
1554             //reset status before releasing queue else get race condition
1555             if (runStatus->streamFinishedRun()) {
1556               runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1557             }
1558             streamRunStatus_[iStreamIndex].reset();
1559             --streamRunActive_;
1560             streamQueues_[iStreamIndex].resume();
1561           });
1562 
1563       WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1564 
1565       auto runStatus = streamRunStatus_[iStreamIndex].get();
1566 
1567       if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1568         EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1569         auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1570         bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1571 
1572         auto& runPrincipal = *runStatus->runPrincipal();
1573         using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1574         RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1575         schedule_->processOneStreamAsync<Traits>(
1576             std::move(runDoneTaskHolder), iStreamIndex, transitionInfo, serviceToken_, cleaningUpAfterException);
1577       }
1578     } catch (...) {
1579       handleEndRunExceptions(std::current_exception(), iTask);
1580     }
1581   }
1582 
1583   void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1584     if (streamRunActive_ > 0) {
1585       FinalWaitingTask waitTask{taskGroup_};
1586 
1587       auto runStatus = streamRunStatus_[0].get();
1588       runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1589       WaitingTaskHolder holder{taskGroup_, &waitTask};
1590       runStatus->setHolderOfTaskInProcessRuns(holder);
1591       lastSourceTransition_ = InputSource::ItemType::IsStop;
1592       endRunAsync(streamRunStatus_[0], std::move(holder));
1593       waitTask.wait();
1594     }
1595   }
1596 
1597   void EventProcessor::beginLumiAsync(IOVSyncValue const& iSync,
1598                                       std::shared_ptr<RunProcessingStatus> iRunStatus,
1599                                       edm::WaitingTaskHolder iHolder) {
1600     actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1601 
1602     auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1603     chain::first([this, &iSync, &status](auto nextTask) {
1604       espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1605                                                            nextTask,
1606                                                            status->endIOVWaitingTasks(),
1607                                                            status->eventSetupImpls(),
1608                                                            queueWhichWaitsForIOVsToFinish_,
1609                                                            actReg_.get(),
1610                                                            serviceToken_);
1611     }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1612       CMS_SA_ALLOW try {
1613         //the call to doneWaiting will cause the count to decrement
1614         if (iException) {
1615           WaitingTaskHolder copyHolder(nextTask);
1616           copyHolder.doneWaiting(*iException);
1617         }
1618 
1619         lumiQueue_->pushAndPause(
1620             *nextTask.group(),
1621             [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1622               CMS_SA_ALLOW try {
1623                 if (postLumiQueueTask.taskHasFailed()) {
1624                   status->resetResources();
1625                   queueWhichWaitsForIOVsToFinish_.resume();
1626                   endRunAsync(iRunStatus, postLumiQueueTask);
1627                   return;
1628                 }
1629 
1630                 status->setResumer(std::move(iResumer));
1631 
1632                 sourceResourcesAcquirer_.serialQueueChain().push(
1633                     *postLumiQueueTask.group(),
1634                     [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1635                       CMS_SA_ALLOW try {
1636                         ServiceRegistry::Operate operate(serviceToken_);
1637 
1638                         if (postSourceTask.taskHasFailed()) {
1639                           status->resetResources();
1640                           queueWhichWaitsForIOVsToFinish_.resume();
1641                           endRunAsync(iRunStatus, postSourceTask);
1642                           return;
1643                         }
1644 
1645                         {
1646                           std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1647                           status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1648 
1649                           LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1650                           {
1651                             SendSourceTerminationSignalIfException sentry(actReg_.get());
1652                             input_->doBeginLumi(lumiPrincipal, &processContext_);
1653                             sentry.completedSuccessfully();
1654                           }
1655                         }
1656                         LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1657                         Service<RandomNumberGenerator> rng;
1658                         if (rng.isAvailable()) {
1659                           LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1660                           rng->preBeginLumi(lb);
1661                         }
1662 
1663                         EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1664 
1665                         using namespace edm::waiting_task::chain;
1666                         chain::first([this, status](auto nextTask) mutable {
1667                           if (lastTransitionType().itemPosition() != InputSource::ItemPosition::LastItemToBeMerged) {
1668                             readAndMergeLumiEntriesAsync(std::move(status), std::move(nextTask));
1669                           } else {
1670                             setNeedToCallNext(true);
1671                           }
1672                         }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1673                           LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1674                           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1675                           schedule_->processOneGlobalAsync<Traits>(nextTask, transitionInfo, serviceToken_);
1676                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1677                           looper_->prefetchAsync(
1678                               nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1679                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1680                           ServiceRegistry::Operate operateLooper(serviceToken_);
1681                           looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1682                         }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1683                           status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1684 
1685                           if (iException) {
1686                             WaitingTaskHolder copyHolder(holder);
1687                             copyHolder.doneWaiting(*iException);
1688                             globalEndLumiAsync(holder, status);
1689                             endRunAsync(iRunStatus, holder);
1690                           } else {
1691                             status->globalBeginDidSucceed();
1692 
1693                             EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1694                             using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1695 
1696                             streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1697                               for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1698                                 streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1699                                   if (!status->shouldStreamStartLumi()) {
1700                                     return;
1701                                   }
1702                                   streamQueues_[i].pause();
1703 
1704                                   auto& event = principalCache_.eventPrincipal(i);
1705                                   auto eventSetupImpls = &status->eventSetupImpls();
1706                                   auto lp = status->lumiPrincipal().get();
1707                                   streamLumiStatus_[i] = std::move(status);
1708                                   ++streamLumiActive_;
1709                                   event.setLuminosityBlockPrincipal(lp);
1710                                   LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1711                                   using namespace edm::waiting_task::chain;
1712                                   chain::first([this, i, &transitionInfo](auto nextTask) {
1713                                     schedule_->processOneStreamAsync<Traits>(
1714                                         std::move(nextTask), i, transitionInfo, serviceToken_);
1715                                   }) |
1716                                       then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1717                                                      auto nextTask) {
1718                                         if (exceptionFromBeginStreamLumi) {
1719                                           WaitingTaskHolder copyHolder(nextTask);
1720                                           copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1721                                         }
1722                                         handleNextEventForStreamAsync(std::move(nextTask), i);
1723                                       }) |
1724                                       runLast(std::move(holder));
1725                                 });
1726                               }  // end for loop over streams
1727                             });
1728                           }
1729                         }) | runLast(postSourceTask);
1730                       } catch (...) {
1731                         status->resetResources();
1732                         queueWhichWaitsForIOVsToFinish_.resume();
1733                         WaitingTaskHolder copyHolder(postSourceTask);
1734                         copyHolder.doneWaiting(std::current_exception());
1735                         endRunAsync(iRunStatus, postSourceTask);
1736                       }
1737                     });  // task in sourceResourcesAcquirer
1738               } catch (...) {
1739                 status->resetResources();
1740                 queueWhichWaitsForIOVsToFinish_.resume();
1741                 WaitingTaskHolder copyHolder(postLumiQueueTask);
1742                 copyHolder.doneWaiting(std::current_exception());
1743                 endRunAsync(iRunStatus, postLumiQueueTask);
1744               }
1745             });  // task in lumiQueue
1746       } catch (...) {
1747         status->resetResources();
1748         queueWhichWaitsForIOVsToFinish_.resume();
1749         WaitingTaskHolder copyHolder(nextTask);
1750         copyHolder.doneWaiting(std::current_exception());
1751         endRunAsync(iRunStatus, nextTask);
1752       }
1753     }) | chain::runLast(std::move(iHolder));
1754   }
1755 
1756   void EventProcessor::continueLumiAsync(edm::WaitingTaskHolder iHolder) {
1757     chain::first([this](auto nextTask) {
1758       //all streams are sharing the same status at the moment
1759       auto status = streamLumiStatus_[0];  //read from streamLumiActive_ happened in calling routine
1760       status->setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kProcessing);
1761 
1762       while (lastTransitionType() == InputSource::ItemType::IsLumi and
1763              status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1764         readAndMergeLumi(*status);
1765         nextTransitionType();
1766       }
1767     }) | chain::then([this](auto nextTask) mutable {
1768       unsigned int streamIndex = 0;
1769       oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1770       for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1771         arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1772       }
1773       nextTask.group()->run(
1774           [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1775     }) | chain::runLast(std::move(iHolder));
1776   }
1777 
1778   void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1779     if (holder.taskHasFailed()) {
1780       setExceptionMessageLumis();
1781     } else {
1782       WaitingTaskHolder tmp(holder);
1783       tmp.doneWaiting(iException);
1784     }
1785   }
1786 
1787   void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask,
1788                                           std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1789     // Get some needed info out of the status object before moving
1790     // it into finalTaskForThisLumi.
1791     auto& lp = *(iLumiStatus->lumiPrincipal());
1792     bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1793     bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1794     EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1795     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1796 
1797     using namespace edm::waiting_task::chain;
1798     chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1799       IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1800 
1801       LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1802       using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1803       schedule_->processOneGlobalAsync<Traits>(
1804           std::move(nextTask), transitionInfo, serviceToken_, cleaningUpAfterException);
1805     }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1806       //Only call writeLumi if beginLumi succeeded
1807       if (didGlobalBeginSucceed) {
1808         writeLumiAsync(std::move(nextTask), lumiPrincipal);
1809       }
1810     }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1811       looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1812     }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1813       //any thrown exception auto propagates to nextTask via the chain
1814       ServiceRegistry::Operate operate(serviceToken_);
1815       looper_->doEndLuminosityBlock(lp, es, &processContext_);
1816     }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1817       if (iException) {
1818         handleEndLumiExceptions(*iException, nextTask);
1819       }
1820       ServiceRegistry::Operate operate(serviceToken_);
1821 
1822       std::exception_ptr ptr;
1823 
1824       // Try hard to clean up resources so the
1825       // process can terminate in a controlled
1826       // fashion even after exceptions have occurred.
1827       // Caught exception is passed to handleEndLumiExceptions()
1828       CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1829         if (not ptr) {
1830           ptr = std::current_exception();
1831         }
1832       }
1833       // Caught exception is passed to handleEndLumiExceptions()
1834       CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1835         if (not ptr) {
1836           ptr = std::current_exception();
1837         }
1838       }
1839       // Caught exception is passed to handleEndLumiExceptions()
1840       CMS_SA_ALLOW try {
1841         status->resetResources();
1842         status->globalEndRunHolderDoneWaiting();
1843         status.reset();
1844       } catch (...) {
1845         if (not ptr) {
1846           ptr = std::current_exception();
1847         }
1848       }
1849 
1850       if (ptr && !iException) {
1851         handleEndLumiExceptions(ptr, nextTask);
1852       }
1853     }) | runLast(std::move(iTask));
1854   }
1855 
1856   void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1857     auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1858       auto status = streamLumiStatus_[iStreamIndex];
1859       if (iException) {
1860         handleEndLumiExceptions(*iException, iTask);
1861       }
1862 
1863       // reset status before releasing queue else get race condition
1864       streamLumiStatus_[iStreamIndex].reset();
1865       --streamLumiActive_;
1866       streamQueues_[iStreamIndex].resume();
1867 
1868       //are we the last one?
1869       if (status->streamFinishedLumi()) {
1870         globalEndLumiAsync(iTask, std::move(status));
1871       }
1872     });
1873 
1874     edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1875 
1876     // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1877     // therefore we do not want to hold the shared_ptr
1878     auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1879     lumiStatus->setEndTime();
1880 
1881     EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1882     auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1883     bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1884 
1885     auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1886     using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1887     LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1888     schedule_->processOneStreamAsync<Traits>(
1889         std::move(lumiDoneTask), iStreamIndex, transitionInfo, serviceToken_, cleaningUpAfterException);
1890   }
1891 
1892   void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1893     if (streamRunActive_ == 0) {
1894       assert(streamLumiActive_ == 0);
1895     } else {
1896       assert(streamRunActive_ == preallocations_.numberOfStreams());
1897       if (streamLumiActive_ > 0) {
1898         FinalWaitingTask globalWaitTask{taskGroup_};
1899         assert(streamLumiActive_ == preallocations_.numberOfStreams());
1900         streamLumiStatus_[0]->noMoreEventsInLumi();
1901         streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1902         {
1903           WaitingTaskHolder holder{taskGroup_, &globalWaitTask};
1904           for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1905             streamEndLumiAsync(holder, i);
1906           }
1907         }
1908         globalWaitTask.wait();
1909       }
1910     }
1911   }
1912 
1913   void EventProcessor::readProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
1914     SendSourceTerminationSignalIfException sentry(actReg_.get());
1915     input_->readProcessBlock(processBlockPrincipal);
1916     sentry.completedSuccessfully();
1917   }
1918 
1919   std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
1920     auto rp = principalCache_.getAvailableRunPrincipalPtr();
1921     //a new file may have been opened since the last use of this Run
1922     rp->possiblyUpdateAfterAddition(preg());
1923     assert(rp);
1924     rp->setAux(*input_->runAuxiliary());
1925     {
1926       SendSourceTerminationSignalIfException sentry(actReg_.get());
1927       input_->readRun(*rp, *historyAppender_);
1928       sentry.completedSuccessfully();
1929     }
1930     assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1931     return rp;
1932   }
1933 
1934   void EventProcessor::readAndMergeRun(RunProcessingStatus& iStatus) {
1935     RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1936     //If a file open happened and we are continuing the Run we may need
1937     // to do the update
1938     runPrincipal.possiblyUpdateAfterAddition(preg());
1939 
1940     runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
1941     {
1942       SendSourceTerminationSignalIfException sentry(actReg_.get());
1943       input_->readAndMergeRun(runPrincipal);
1944       sentry.completedSuccessfully();
1945     }
1946   }
1947 
1948   std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
1949     auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
1950     //A new file may have been opened since the last use of the LuminosityBlock
1951     lbp->possiblyUpdateAfterAddition(preg());
1952     assert(lbp);
1953     lbp->setAux(*input_->luminosityBlockAuxiliary());
1954     {
1955       SendSourceTerminationSignalIfException sentry(actReg_.get());
1956       input_->readLuminosityBlock(*lbp, *historyAppender_);
1957       sentry.completedSuccessfully();
1958     }
1959     lbp->setRunPrincipal(std::move(rp));
1960     return lbp;
1961   }
1962 
1963   void EventProcessor::readAndMergeLumi(LuminosityBlockProcessingStatus& iStatus) {
1964     auto& lumiPrincipal = *iStatus.lumiPrincipal();
1965     assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1966            input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1967                input_->processHistoryRegistry().reducedProcessHistoryID(
1968                    input_->luminosityBlockAuxiliary()->processHistoryID()));
1969     //If a file was opened and the LuminosityBlock is continuing
1970     // we may need to do the update
1971     lumiPrincipal.possiblyUpdateAfterAddition(preg());
1972     lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1973     {
1974       SendSourceTerminationSignalIfException sentry(actReg_.get());
1975       input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1976       sentry.completedSuccessfully();
1977     }
1978   }
1979 
1980   void EventProcessor::writeProcessBlockAsync(WaitingTaskHolder task, ProcessBlockType processBlockType) {
1981     ServiceRegistry::Operate op(serviceToken_);
1982     // Don't move task because the lifetime of the task should be greater than the lifetime of the Operate object
1983     schedule_->writeProcessBlockAsync(
1984         task, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1985   }
1986 
1987   void EventProcessor::writeRunAsync(WaitingTaskHolder task,
1988                                      RunPrincipal const& runPrincipal,
1989                                      MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1990     if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
1991       ServiceRegistry::Operate op(serviceToken_);
1992       // Don't move task because the lifetime of the task should be greater than the lifetime of the Operate object
1993       schedule_->writeRunAsync(task, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
1994     }
1995   }
1996 
1997   void EventProcessor::clearRunPrincipal(RunProcessingStatus& iStatus) {
1998     iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
1999     iStatus.runPrincipal()->clearPrincipal();
2000   }
2001 
2002   void EventProcessor::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& lumiPrincipal) {
2003     using namespace edm::waiting_task;
2004     if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2005       chain::first([&](auto nextTask) {
2006         ServiceRegistry::Operate op(serviceToken_);
2007 
2008         lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2009         schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2010       }) | chain::lastTask(std::move(task));
2011     }
2012   }
2013 
2014   void EventProcessor::clearLumiPrincipal(LuminosityBlockProcessingStatus& iStatus) {
2015     iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2016     iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2017     iStatus.lumiPrincipal()->clearPrincipal();
2018   }
2019 
2020   void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2021                                                    WaitingTaskHolder iHolder) {
2022     auto group = iHolder.group();
2023     sourceResourcesAcquirer_.serialQueueChain().push(
2024         *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2025           CMS_SA_ALLOW try {
2026             ServiceRegistry::Operate operate(serviceToken_);
2027 
2028             std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2029 
2030             nextTransitionType();
2031             setNeedToCallNext(false);
2032 
2033             while (lastTransitionType() == InputSource::ItemType::IsRun and
2034                    status->runPrincipal()->run() == input_->run() and
2035                    status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2036               if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2037                 status->setStopBeforeProcessingRun(true);
2038                 return;
2039               }
2040               readAndMergeRun(*status);
2041               if (lastTransitionType().itemPosition() == InputSource::ItemPosition::LastItemToBeMerged) {
2042                 setNeedToCallNext(true);
2043                 return;
2044               }
2045               nextTransitionType();
2046             }
2047           } catch (...) {
2048             status->setStopBeforeProcessingRun(true);
2049             holder.doneWaiting(std::current_exception());
2050           }
2051         });
2052   }
2053 
2054   void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2055                                                     WaitingTaskHolder iHolder) {
2056     auto group = iHolder.group();
2057     sourceResourcesAcquirer_.serialQueueChain().push(
2058         *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2059           CMS_SA_ALLOW try {
2060             ServiceRegistry::Operate operate(serviceToken_);
2061 
2062             std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2063 
2064             nextTransitionType();
2065             setNeedToCallNext(false);
2066 
2067             while (lastTransitionType() == InputSource::ItemType::IsLumi and
2068                    iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2069               readAndMergeLumi(*iLumiStatus);
2070               if (lastTransitionType().itemPosition() == InputSource::ItemPosition::LastItemToBeMerged) {
2071                 setNeedToCallNext(true);
2072                 return;
2073               }
2074               nextTransitionType();
2075             }
2076           } catch (...) {
2077             holder.doneWaiting(std::current_exception());
2078           }
2079         });
2080   }
2081 
2082   void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2083                                                             WaitingTaskHolder iHolder) {
2084     chain::first([this, iRunStatus](auto nextTask) mutable {
2085       if (needToCallNext()) {
2086         nextTransitionTypeAsync(std::move(iRunStatus), std::move(nextTask));
2087       }
2088     }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
2089       ServiceRegistry::Operate operate(serviceToken_);
2090       if (iException) {
2091         WaitingTaskHolder copyHolder(nextTask);
2092         copyHolder.doneWaiting(*iException);
2093       }
2094       if (lastTransitionType() == InputSource::ItemType::IsFile) {
2095         iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2096         return;
2097       }
2098       if (lastTransitionType() == InputSource::ItemType::IsLumi && !nextTask.taskHasFailed()) {
2099         CMS_SA_ALLOW try {
2100           beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2101                                       input_->luminosityBlockAuxiliary()->beginTime()),
2102                          iRunStatus,
2103                          nextTask);
2104           return;
2105         } catch (...) {
2106           WaitingTaskHolder copyHolder(nextTask);
2107           copyHolder.doneWaiting(std::current_exception());
2108         }
2109       }
2110       // Note that endRunAsync will call beginRunAsync for the following run
2111       // if appropriate.
2112       endRunAsync(iRunStatus, std::move(nextTask));
2113     }) | chain::runLast(std::move(iHolder));
2114   }
2115 
2116   bool EventProcessor::readNextEventForStream(WaitingTaskHolder const& iTask,
2117                                               unsigned int iStreamIndex,
2118                                               LuminosityBlockProcessingStatus& iStatus) {
2119     // This function returns true if it successfully reads an event for the stream and that
2120     // requires both that an event is next and there are no problems or requests to stop.
2121 
2122     if (iTask.taskHasFailed()) {
2123       // We want all streams to stop or all streams to pause. If we are already in the
2124       // middle of pausing streams, then finish pausing all of them and the lumi will be
2125       // ended later. Otherwise, just end it now.
2126       if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2127         iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2128       }
2129       return false;
2130     }
2131 
2132     // Did another stream already stop or pause this lumi?
2133     if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2134       return false;
2135     }
2136 
2137     // Are output modules or the looper requesting we stop?
2138     if (shouldWeStop()) {
2139       lastSourceTransition_ = InputSource::ItemType::IsStop;
2140       iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2141       return false;
2142     }
2143 
2144     ServiceRegistry::Operate operate(serviceToken_);
2145 
2146     // need to use lock in addition to the serial task queue because
2147     // of delayed provenance reading and reading data in response to
2148     // edm::Refs etc
2149     std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2150 
2151     // If we didn't already call nextTransitionType while merging lumis, call it here.
2152     // This asks the input source what is next and also checks for signals.
2153 
2154     InputSource::ItemType itemType = needToCallNext() ? nextTransitionType() : lastTransitionType();
2155     setNeedToCallNext(true);
2156 
2157     if (InputSource::ItemType::IsEvent != itemType) {
2158       // IsFile may continue processing the lumi and
2159       // looper_ can cause the input source to declare a new IsRun which is actually
2160       // just a continuation of the previous run
2161       if (InputSource::ItemType::IsStop == itemType or InputSource::ItemType::IsLumi == itemType or
2162           (InputSource::ItemType::IsRun == itemType and
2163            (iStatus.lumiPrincipal()->run() != input_->run() or
2164             iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2165         if (itemType == InputSource::ItemType::IsLumi &&
2166             iStatus.lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2167           throw Exception(errors::LogicError)
2168               << "InputSource claimed previous Lumi Entry was last to be merged in this file,\n"
2169               << "but the next lumi entry has the same lumi number.\n"
2170               << "This is probably a bug in the InputSource. Please report to the Core group.\n";
2171         }
2172         iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2173       } else {
2174         iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2175       }
2176       return false;
2177     }
2178     readEvent(iStreamIndex);
2179     return true;
2180   }
2181 
2182   void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2183     if (streamLumiStatus_[iStreamIndex]->haveStartedNextLumiOrEndedRun()) {
2184       streamEndLumiAsync(iTask, iStreamIndex);
2185       return;
2186     }
2187     auto group = iTask.group();
2188     sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2189       CMS_SA_ALLOW try {
2190         auto status = streamLumiStatus_[iStreamIndex].get();
2191         ServiceRegistry::Operate operate(serviceToken_);
2192 
2193         if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2194           auto recursionTask =
2195               make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2196                 if (iEventException) {
2197                   WaitingTaskHolder copyHolder(iTask);
2198                   copyHolder.doneWaiting(*iEventException);
2199                   // Intentionally, we don't return here. The recursive call to
2200                   // handleNextEvent takes care of immediately ending the run properly
2201                   // using the same code it uses to end the run in other situations.
2202                 }
2203                 handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2204               });
2205 
2206           processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2207         } else {
2208           // the stream will stop processing this lumi now
2209           if (status->eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi) {
2210             if (not status->haveStartedNextLumiOrEndedRun()) {
2211               status->noMoreEventsInLumi();
2212               status->startNextLumiOrEndRun();
2213               if (lastTransitionType() == InputSource::ItemType::IsLumi && !iTask.taskHasFailed()) {
2214                 CMS_SA_ALLOW try {
2215                   beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2216                                               input_->luminosityBlockAuxiliary()->beginTime()),
2217                                  streamRunStatus_[iStreamIndex],
2218                                  iTask);
2219                 } catch (...) {
2220                   WaitingTaskHolder copyHolder(iTask);
2221                   copyHolder.doneWaiting(std::current_exception());
2222                   endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2223                 }
2224               } else {
2225                 // If appropriate, this will also start the next run.
2226                 endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2227               }
2228             }
2229             streamEndLumiAsync(iTask, iStreamIndex);
2230           } else {
2231             assert(status->eventProcessingState() ==
2232                    LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2233             auto runStatus = streamRunStatus_[iStreamIndex].get();
2234 
2235             if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2236               runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2237             }
2238           }
2239         }
2240       } catch (...) {
2241         WaitingTaskHolder copyHolder(iTask);
2242         copyHolder.doneWaiting(std::current_exception());
2243         handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2244       }
2245     });
2246   }
2247 
2248   void EventProcessor::readEvent(unsigned int iStreamIndex) {
2249     //TODO this will have to become per stream
2250     auto& event = principalCache_.eventPrincipal(iStreamIndex);
2251     StreamContext streamContext(event.streamID(), &processContext_);
2252     // a new file may have been read since the last time this event was used
2253     event.possiblyUpdateAfterAddition(preg());
2254 
2255     SendSourceTerminationSignalIfException sentry(actReg_.get());
2256     input_->readEvent(event, streamContext);
2257 
2258     streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2259     streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2260     sentry.completedSuccessfully();
2261 
2262     FDEBUG(1) << "\treadEvent\n";
2263   }
2264 
2265   void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2266     iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2267   }
2268 
2269   namespace {
2270     struct ClearEventGuard {
2271       ClearEventGuard(edm::ActivityRegistry& iReg, edm::StreamContext const& iContext)
2272           : act_(iReg), context_(iContext) {
2273         iReg.preClearEventSignal_.emit(iContext);
2274       }
2275       ~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
2276       edm::ActivityRegistry& act_;
2277       edm::StreamContext const& context_;
2278     };
2279   }  // namespace
2280 
2281   void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2282     auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2283 
2284     ServiceRegistry::Operate operate(serviceToken_);
2285     Service<RandomNumberGenerator> rng;
2286     if (rng.isAvailable()) {
2287       Event ev(*pep, ModuleDescription(), nullptr);
2288       rng->postEventRead(ev);
2289     }
2290 
2291     EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2292     using namespace edm::waiting_task::chain;
2293     chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2294       EventTransitionInfo info(*pep, es);
2295       schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2296     }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2297       //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2298       ServiceRegistry::Operate operateLooper(serviceToken_);
2299       processEventWithLooper(*pep, iStreamIndex);
2300     }) | then([this, pep](auto nextTask) {
2301       FDEBUG(1) << "\tprocessEvent\n";
2302       StreamContext streamContext(pep->streamID(),
2303                                   StreamContext::Transition::kEvent,
2304                                   pep->id(),
2305                                   pep->runPrincipal().index(),
2306                                   pep->luminosityBlockPrincipal().index(),
2307                                   pep->time(),
2308                                   &processContext_);
2309       ClearEventGuard guard(*this->actReg_.get(), streamContext);
2310       pep->clearEventPrincipal();
2311     }) | runLast(iHolder);
2312   }
2313 
2314   void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2315     bool randomAccess = input_->randomAccess();
2316     ProcessingController::ForwardState forwardState = input_->forwardState();
2317     ProcessingController::ReverseState reverseState = input_->reverseState();
2318     ProcessingController pc(forwardState, reverseState, randomAccess);
2319 
2320     EDLooperBase::Status status = EDLooperBase::kContinue;
2321     do {
2322       StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2323       EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2324       status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2325 
2326       bool succeeded = true;
2327       if (randomAccess) {
2328         if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2329           input_->skipEvents(-2);
2330         } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2331           succeeded = input_->goToEvent(pc.specifiedEventTransition());
2332         }
2333       }
2334       pc.setLastOperationSucceeded(succeeded);
2335     } while (!pc.lastOperationSucceeded());
2336     if (status != EDLooperBase::kContinue) {
2337       shouldWeStop_ = true;
2338     }
2339   }
2340 
2341   bool EventProcessor::shouldWeStop() const {
2342     FDEBUG(1) << "\tshouldWeStop\n";
2343     if (shouldWeStop_)
2344       return true;
2345     return schedule_->terminate();
2346   }
2347 
2348   void EventProcessor::setExceptionMessageFiles(std::string& message) { exceptionMessageFiles_ = message; }
2349 
2350   void EventProcessor::setExceptionMessageRuns() { exceptionMessageRuns_ = true; }
2351 
2352   void EventProcessor::setExceptionMessageLumis() { exceptionMessageLumis_ = true; }
2353 
2354   bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2355     bool expected = false;
2356     if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2357       deferredExceptionPtr_ = iException;
2358       return true;
2359     }
2360     return false;
2361   }
2362 
2363   void EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization() const {
2364     cms::Exception ex("ModulesSynchingOnLumis");
2365     ex << "The framework is configured to use at least two streams, but the following modules\n"
2366        << "require synchronizing on LuminosityBlock boundaries:";
2367     bool found = false;
2368     for (auto worker : schedule_->allWorkers()) {
2369       if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2370         found = true;
2371         ex << "\n  " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2372       }
2373     }
2374     if (found) {
2375       ex << "\n\nThe situation can be fixed by either\n"
2376          << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2377          << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2378       throw ex;
2379     }
2380   }
2381 
2382   void EventProcessor::warnAboutModulesRequiringRunSynchronization() const {
2383     std::unique_ptr<LogSystem> s;
2384     for (auto worker : schedule_->allWorkers()) {
2385       if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2386         if (not s) {
2387           s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2388           (*s) << "The following modules require synchronizing on Run boundaries:";
2389         }
2390         (*s) << "\n  " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2391       }
2392     }
2393   }
2394 }  // namespace edm