Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-04 04:34:54

0001 #include "FWCore/Framework/interface/EventProcessor.h"
0002 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0003 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0004 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0005 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0007 #include "DataFormats/Provenance/interface/SubProcessParentageHelper.h"
0008 
0009 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0010 #include "FWCore/Framework/src/CommonParams.h"
0011 #include "FWCore/Framework/interface/EDLooperBase.h"
0012 #include "FWCore/Framework/interface/EventPrincipal.h"
0013 #include "FWCore/Framework/interface/EventSetupProvider.h"
0014 #include "FWCore/Framework/interface/EventSetupRecord.h"
0015 #include "FWCore/Framework/interface/FileBlock.h"
0016 #include "FWCore/Framework/interface/HistoryAppender.h"
0017 #include "FWCore/Framework/interface/InputSourceDescription.h"
0018 #include "FWCore/Framework/interface/IOVSyncValue.h"
0019 #include "FWCore/Framework/interface/LooperFactory.h"
0020 #include "FWCore/Framework/interface/LuminosityBlock.h"
0021 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0022 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0023 #include "FWCore/Framework/interface/ModuleChanger.h"
0024 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0025 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0026 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0027 #include "FWCore/Framework/interface/ProcessingController.h"
0028 #include "FWCore/Framework/interface/RunPrincipal.h"
0029 #include "FWCore/Framework/interface/Schedule.h"
0030 #include "FWCore/Framework/interface/ScheduleInfo.h"
0031 #include "FWCore/Framework/interface/ScheduleItems.h"
0032 #include "FWCore/Framework/interface/SubProcess.h"
0033 #include "FWCore/Framework/interface/Event.h"
0034 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0035 #include "FWCore/Framework/src/Breakpoints.h"
0036 #include "FWCore/Framework/interface/EventSetupsController.h"
0037 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0038 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0039 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0040 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0041 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0042 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0043 #include "FWCore/Framework/interface/TriggerNamesService.h"
0044 #include "FWCore/Framework/src/SendSourceTerminationSignalIfException.h"
0045 
0046 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0047 
0048 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0049 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
0050 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0051 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0052 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0053 #include "FWCore/ParameterSet/interface/Registry.h"
0054 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0055 
0056 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0057 #include "FWCore/ServiceRegistry/interface/Service.h"
0058 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0059 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0060 
0061 #include "FWCore/Concurrency/interface/WaitingTask.h"
0062 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0063 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0064 #include "FWCore/Concurrency/interface/chain_first.h"
0065 
0066 #include "FWCore/Utilities/interface/Algorithms.h"
0067 #include "FWCore/Utilities/interface/DebugMacros.h"
0068 #include "FWCore/Utilities/interface/EDMException.h"
0069 #include "FWCore/Utilities/interface/Exception.h"
0070 #include "FWCore/Utilities/interface/ConvertException.h"
0071 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0072 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0073 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0074 #include "FWCore/Utilities/interface/StreamID.h"
0075 #include "FWCore/Utilities/interface/RootHandlers.h"
0076 #include "FWCore/Utilities/interface/propagate_const.h"
0077 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0078 
0079 #include "MessageForSource.h"
0080 #include "MessageForParent.h"
0081 #include "LuminosityBlockProcessingStatus.h"
0082 #include "RunProcessingStatus.h"
0083 
0084 #include "boost/range/adaptor/reversed.hpp"
0085 
0086 #include <cassert>
0087 #include <exception>
0088 #include <iomanip>
0089 #include <iostream>
0090 #include <utility>
0091 #include <sstream>
0092 
0093 #include <sys/ipc.h>
0094 #include <sys/msg.h>
0095 
0096 #include "oneapi/tbb/task.h"
0097 
0098 //Used for CPU affinity
0099 #ifndef __APPLE__
0100 #include <sched.h>
0101 #endif
0102 
0103 namespace {
0104   class PauseQueueSentry {
0105   public:
0106     PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
0107     ~PauseQueueSentry() { queue_.resume(); }
0108 
0109   private:
0110     edm::SerialTaskQueue& queue_;
0111   };
0112 }  // namespace
0113 
0114 namespace edm {
0115 
0116   namespace chain = waiting_task::chain;
0117 
0118   // ---------------------------------------------------------------
0119   std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
0120                                          ParameterSet& params,
0121                                          CommonParams const& common,
0122                                          std::shared_ptr<ProductRegistry> preg,
0123                                          std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0124                                          std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0125                                          std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0126                                          std::shared_ptr<ActivityRegistry> areg,
0127                                          std::shared_ptr<ProcessConfiguration const> processConfiguration,
0128                                          PreallocationConfiguration const& allocations) {
0129     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0130     if (main_input == nullptr) {
0131       throw Exception(errors::Configuration)
0132           << "There must be exactly one source in the configuration.\n"
0133           << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0134     }
0135 
0136     std::string modtype(main_input->getParameter<std::string>("@module_type"));
0137 
0138     std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0139         ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0140     ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0141     filler->fill(descriptions);
0142 
0143     try {
0144       convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
0145     } catch (cms::Exception& iException) {
0146       std::ostringstream ost;
0147       ost << "Validating configuration of input source of type " << modtype;
0148       iException.addContext(ost.str());
0149       throw;
0150     }
0151 
0152     main_input->registerIt();
0153 
0154     // Fill in "ModuleDescription", in case the input source produces
0155     // any EDProducts, which would be registered in the ProductRegistry.
0156     // Also fill in the process history item for this process.
0157     // There is no module label for the unnamed input source, so
0158     // just use "source".
0159     // Only the tracked parameters belong in the process configuration.
0160     ModuleDescription md(main_input->id(),
0161                          main_input->getParameter<std::string>("@module_type"),
0162                          "source",
0163                          processConfiguration.get(),
0164                          moduleIndex);
0165 
0166     InputSourceDescription isdesc(md,
0167                                   preg,
0168                                   branchIDListHelper,
0169                                   processBlockHelper,
0170                                   thinnedAssociationsHelper,
0171                                   areg,
0172                                   common.maxEventsInput_,
0173                                   common.maxLumisInput_,
0174                                   common.maxSecondsUntilRampdown_,
0175                                   allocations);
0176 
0177     areg->preSourceConstructionSignal_(md);
0178     std::unique_ptr<InputSource> input;
0179     try {
0180       //even if we have an exception, send the signal
0181       std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
0182       convertException::wrap([&]() {
0183         input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
0184         input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
0185         input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
0186       });
0187     } catch (cms::Exception& iException) {
0188       std::ostringstream ost;
0189       ost << "Constructing input source of type " << modtype;
0190       iException.addContext(ost.str());
0191       throw;
0192     }
0193     return input;
0194   }
0195 
0196   // ---------------------------------------------------------------
0197   std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
0198                                            eventsetup::EventSetupProvider& cp,
0199                                            ParameterSet& params,
0200                                            std::vector<std::string> const& loopers) {
0201     std::shared_ptr<EDLooperBase> vLooper;
0202 
0203     assert(1 == loopers.size());
0204 
0205     for (auto const& looperName : loopers) {
0206       ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
0207       // Unlikely we would ever need the ModuleTypeResolver in Looper
0208       vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
0209     }
0210     return vLooper;
0211   }
0212 
0213   // ---------------------------------------------------------------
0214   EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,  //std::string const& config,
0215                                  ServiceToken const& iToken,
0216                                  serviceregistry::ServiceLegacy iLegacy,
0217                                  std::vector<std::string> const& defaultServices,
0218                                  std::vector<std::string> const& forcedServices)
0219       : actReg_(),
0220         preg_(),
0221         branchIDListHelper_(),
0222         serviceToken_(),
0223         input_(),
0224         moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0225         espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0226         esp_(),
0227         act_table_(),
0228         processConfiguration_(),
0229         schedule_(),
0230         subProcesses_(),
0231         historyAppender_(new HistoryAppender),
0232         fb_(),
0233         looper_(),
0234         deferredExceptionPtrIsSet_(false),
0235         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0236         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0237         principalCache_(),
0238         beginJobCalled_(false),
0239         shouldWeStop_(false),
0240         fileModeNoMerge_(false),
0241         exceptionMessageFiles_(),
0242         exceptionMessageRuns_(false),
0243         exceptionMessageLumis_(false),
0244         forceLooperToEnd_(false),
0245         looperBeginJobRun_(false),
0246         forceESCacheClearOnNewRun_(false),
0247         eventSetupDataToExcludeFromPrefetching_() {
0248     auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0249     processDesc->addServices(defaultServices, forcedServices);
0250     init(processDesc, iToken, iLegacy);
0251   }
0252 
0253   EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,  //std::string const& config,
0254                                  std::vector<std::string> const& defaultServices,
0255                                  std::vector<std::string> const& forcedServices)
0256       : actReg_(),
0257         preg_(),
0258         branchIDListHelper_(),
0259         serviceToken_(),
0260         input_(),
0261         moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0262         espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0263         esp_(),
0264         act_table_(),
0265         processConfiguration_(),
0266         schedule_(),
0267         subProcesses_(),
0268         historyAppender_(new HistoryAppender),
0269         fb_(),
0270         looper_(),
0271         deferredExceptionPtrIsSet_(false),
0272         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0273         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0274         principalCache_(),
0275         beginJobCalled_(false),
0276         shouldWeStop_(false),
0277         fileModeNoMerge_(false),
0278         exceptionMessageFiles_(),
0279         exceptionMessageRuns_(false),
0280         exceptionMessageLumis_(false),
0281         forceLooperToEnd_(false),
0282         looperBeginJobRun_(false),
0283         forceESCacheClearOnNewRun_(false),
0284         eventSetupDataToExcludeFromPrefetching_() {
0285     auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0286     processDesc->addServices(defaultServices, forcedServices);
0287     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
0288   }
0289 
0290   EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0291                                  ServiceToken const& token,
0292                                  serviceregistry::ServiceLegacy legacy)
0293       : actReg_(),
0294         preg_(),
0295         branchIDListHelper_(),
0296         serviceToken_(),
0297         input_(),
0298         moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
0299         espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0300         esp_(),
0301         act_table_(),
0302         processConfiguration_(),
0303         schedule_(),
0304         subProcesses_(),
0305         historyAppender_(new HistoryAppender),
0306         fb_(),
0307         looper_(),
0308         deferredExceptionPtrIsSet_(false),
0309         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0310         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0311         principalCache_(),
0312         beginJobCalled_(false),
0313         shouldWeStop_(false),
0314         fileModeNoMerge_(false),
0315         exceptionMessageFiles_(),
0316         exceptionMessageRuns_(false),
0317         exceptionMessageLumis_(false),
0318         forceLooperToEnd_(false),
0319         looperBeginJobRun_(false),
0320         forceESCacheClearOnNewRun_(false),
0321         eventSetupDataToExcludeFromPrefetching_() {
0322     init(processDesc, token, legacy);
0323   }
0324 
0325   void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
0326                             ServiceToken const& iToken,
0327                             serviceregistry::ServiceLegacy iLegacy) {
0328     //std::cerr << processDesc->dump() << std::endl;
0329 
0330     // register the empty parentage vector , once and for all
0331     ParentageRegistry::instance()->insertMapped(Parentage());
0332 
0333     // register the empty parameter set, once and for all.
0334     ParameterSet().registerIt();
0335 
0336     std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
0337 
0338     // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
0339     auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
0340     bool const hasSubProcesses = !subProcessVParameterSet.empty();
0341 
0342     // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
0343     // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
0344     // set in here if the parameters were not explicitly set.
0345     validateTopLevelParameterSets(parameterSet.get());
0346 
0347     // Now set some parameters specific to the main process.
0348     ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
0349     auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
0350     if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
0351       throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
0352           << fileMode << ".\n"
0353           << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
0354     } else {
0355       fileModeNoMerge_ = (fileMode == "NOMERGE");
0356     }
0357     forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
0358     ensureAvailableAccelerators(*parameterSet);
0359 
0360     //threading
0361     unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
0362 
0363     // Even if numberOfThreads was set to zero in the Python configuration, the code
0364     // in cmsRun.cpp should have reset it to something else.
0365     assert(nThreads != 0);
0366 
0367     unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
0368     if (nStreams == 0) {
0369       nStreams = nThreads;
0370     }
0371     unsigned int nConcurrentLumis =
0372         optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
0373     if (nConcurrentLumis == 0) {
0374       nConcurrentLumis = 2;
0375     }
0376     if (nConcurrentLumis > nStreams) {
0377       nConcurrentLumis = nStreams;
0378     }
0379     unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
0380     if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
0381       nConcurrentRuns = nConcurrentLumis;
0382     }
0383     std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
0384     if (!loopers.empty()) {
0385       //For now loopers make us run only 1 transition at a time
0386       if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
0387         edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
0388                                                 "of concurrent runs, and the number of concurrent lumis "
0389                                                 "are all being reset to 1. Loopers cannot currently support "
0390                                                 "values greater than 1.";
0391         nStreams = 1;
0392         nConcurrentLumis = 1;
0393         nConcurrentRuns = 1;
0394       }
0395     }
0396     bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
0397     if (dumpOptions) {
0398       dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
0399     } else {
0400       if (nThreads > 1 or nStreams > 1) {
0401         edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
0402       }
0403     }
0404 
0405     // The number of concurrent IOVs is configured individually for each record in
0406     // the class NumberOfConcurrentIOVs to values less than or equal to this.
0407     // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
0408     // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
0409     // concurrent run past the first in use cases where IOVs change within a run.
0410     unsigned int maxConcurrentIOVs =
0411         3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
0412 
0413     IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
0414 
0415     printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
0416     deleteNonConsumedUnscheduledModules_ =
0417         optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
0418     //for now, if have a subProcess, don't allow early delete
0419     // In the future we should use the SubProcess's 'keep list' to decide what can be kept
0420     if (not hasSubProcesses) {
0421       branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
0422     }
0423     if (not branchesToDeleteEarly_.empty()) {
0424       auto referencePSets =
0425           optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
0426       for (auto const& pset : referencePSets) {
0427         auto product = pset.getParameter<std::string>("product");
0428         auto references = pset.getParameter<std::vector<std::string>>("references");
0429         for (auto const& ref : references) {
0430           referencesToBranches_.emplace(product, ref);
0431         }
0432       }
0433       modulesToIgnoreForDeleteEarly_ =
0434           optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
0435     }
0436 
0437     // Now do general initialization
0438     ScheduleItems items;
0439 
0440     //initialize the services
0441     auto& serviceSets = processDesc->getServicesPSets();
0442     ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
0443     serviceToken_ = items.addCPRandTNS(*parameterSet, token);
0444 
0445     //make the services available
0446     ServiceRegistry::Operate operate(serviceToken_);
0447 
0448     CMS_SA_ALLOW try {
0449       if (nThreads > 1) {
0450         edm::Service<RootHandlers> handler;
0451         handler->willBeUsingThreads();
0452       }
0453 
0454       // intialize miscellaneous items
0455       std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
0456 
0457       // intialize the event setup provider
0458       ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
0459       esp_ = espController_->makeProvider(
0460           *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
0461 
0462       // initialize the looper, if any
0463       if (!loopers.empty()) {
0464         looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
0465         looper_->setActionTable(items.act_table_.get());
0466         looper_->attachTo(*items.actReg_);
0467 
0468         // in presence of looper do not delete modules
0469         deleteNonConsumedUnscheduledModules_ = false;
0470       }
0471 
0472       preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0473 
0474       runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
0475       lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
0476       streamQueues_.resize(nStreams);
0477       streamRunStatus_.resize(nStreams);
0478       streamLumiStatus_.resize(nStreams);
0479 
0480       processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0481 
0482       {
0483         std::optional<ScheduleItems::MadeModules> madeModules;
0484 
0485         //setup input and modules concurrently
0486         tbb::task_group group;
0487 
0488         // initialize the input source
0489         auto tempReg = std::make_shared<ProductRegistry>();
0490         auto sourceID = ModuleDescription::getUniqueID();
0491 
0492         group.run([&, this]() {
0493           // initialize the Schedule
0494           ServiceRegistry::Operate operate(serviceToken_);
0495           auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0496           madeModules =
0497               items.initModules(*parameterSet, tns, preallocations_, &processContext_, moduleTypeResolverMaker_.get());
0498         });
0499 
0500         group.run([&, this, tempReg]() {
0501           ServiceRegistry::Operate operate(serviceToken_);
0502           input_ = makeInput(sourceID,
0503                              *parameterSet,
0504                              *common,
0505                              /*items.preg(),*/ tempReg,
0506                              items.branchIDListHelper(),
0507                              get_underlying_safe(processBlockHelper_),
0508                              items.thinnedAssociationsHelper(),
0509                              items.actReg_,
0510                              items.processConfiguration(),
0511                              preallocations_);
0512         });
0513 
0514         group.wait();
0515         items.preg()->addFromInput(*tempReg);
0516         input_->switchTo(items.preg());
0517 
0518         {
0519           auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0520           schedule_ = items.finishSchedule(std::move(*madeModules),
0521                                            *parameterSet,
0522                                            tns,
0523                                            hasSubProcesses,
0524                                            preallocations_,
0525                                            &processContext_,
0526                                            *processBlockHelper_);
0527         }
0528       }
0529 
0530       // set the data members
0531       act_table_ = std::move(items.act_table_);
0532       actReg_ = items.actReg_;
0533       preg_ = items.preg();
0534       mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0535       branchIDListHelper_ = items.branchIDListHelper();
0536       thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0537       processConfiguration_ = items.processConfiguration();
0538       processContext_.setProcessConfiguration(processConfiguration_.get());
0539 
0540       FDEBUG(2) << parameterSet << std::endl;
0541 
0542       principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0543       for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0544         // Reusable event principal
0545         auto ep = std::make_shared<EventPrincipal>(preg(),
0546                                                    branchIDListHelper(),
0547                                                    thinnedAssociationsHelper(),
0548                                                    *processConfiguration_,
0549                                                    historyAppender_.get(),
0550                                                    index,
0551                                                    true /*primary process*/,
0552                                                    &*processBlockHelper_);
0553         principalCache_.insert(std::move(ep));
0554       }
0555 
0556       for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0557         auto rp = std::make_unique<RunPrincipal>(
0558             preg(), *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_);
0559         principalCache_.insert(std::move(rp));
0560       }
0561 
0562       for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0563         auto lp =
0564             std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
0565         principalCache_.insert(std::move(lp));
0566       }
0567 
0568       {
0569         auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0570         principalCache_.insert(std::move(pb));
0571 
0572         auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0573         principalCache_.insertForInput(std::move(pbForInput));
0574       }
0575 
0576       // fill the subprocesses, if there are any
0577       subProcesses_.reserve(subProcessVParameterSet.size());
0578       for (auto& subProcessPSet : subProcessVParameterSet) {
0579         subProcesses_.emplace_back(subProcessPSet,
0580                                    *parameterSet,
0581                                    preg(),
0582                                    branchIDListHelper(),
0583                                    *processBlockHelper_,
0584                                    *thinnedAssociationsHelper_,
0585                                    SubProcessParentageHelper(),
0586                                    *espController_,
0587                                    *actReg_,
0588                                    token,
0589                                    serviceregistry::kConfigurationOverrides,
0590                                    preallocations_,
0591                                    &processContext_,
0592                                    moduleTypeResolverMaker_);
0593       }
0594     } catch (...) {
0595       //in case of an exception, make sure Services are available
0596       // during the following destructors
0597       espController_ = nullptr;
0598       esp_ = nullptr;
0599       schedule_ = nullptr;
0600       input_ = nullptr;
0601       looper_ = nullptr;
0602       actReg_ = nullptr;
0603       throw;
0604     }
0605   }
0606 
0607   EventProcessor::~EventProcessor() {
0608     // Make the services available while everything is being deleted.
0609     ServiceToken token = getToken();
0610     ServiceRegistry::Operate op(token);
0611 
0612     // manually destroy all these thing that may need the services around
0613     // propagate_const<T> has no reset() function
0614     espController_ = nullptr;
0615     esp_ = nullptr;
0616     schedule_ = nullptr;
0617     input_ = nullptr;
0618     looper_ = nullptr;
0619     actReg_ = nullptr;
0620 
0621     pset::Registry::instance()->clear();
0622     ParentageRegistry::instance()->clear();
0623   }
0624 
0625   void EventProcessor::taskCleanup() {
0626     edm::FinalWaitingTask task{taskGroup_};
0627     espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
0628     task.waitNoThrow();
0629     assert(task.done());
0630   }
0631 
0632   void EventProcessor::beginJob() {
0633     if (beginJobCalled_)
0634       return;
0635     beginJobCalled_ = true;
0636     bk::beginJob();
0637 
0638     // StateSentry toerror(this); // should we add this ?
0639     //make the services available
0640     ServiceRegistry::Operate operate(serviceToken_);
0641 
0642     service::SystemBounds bounds(preallocations_.numberOfStreams(),
0643                                  preallocations_.numberOfLuminosityBlocks(),
0644                                  preallocations_.numberOfRuns(),
0645                                  preallocations_.numberOfThreads());
0646     actReg_->preallocateSignal_(bounds);
0647     schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0648     pathsAndConsumesOfModules_.initialize(schedule_.get(), preg());
0649 
0650     std::vector<ModuleProcessName> consumedBySubProcesses;
0651     for_all(subProcesses_,
0652             [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
0653               auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
0654               if (consumedBySubProcesses.empty()) {
0655                 consumedBySubProcesses = std::move(c);
0656               } else if (not c.empty()) {
0657                 std::vector<ModuleProcessName> tmp;
0658                 tmp.reserve(consumedBySubProcesses.size() + c.size());
0659                 std::merge(consumedBySubProcesses.begin(),
0660                            consumedBySubProcesses.end(),
0661                            c.begin(),
0662                            c.end(),
0663                            std::back_inserter(tmp));
0664                 std::swap(consumedBySubProcesses, tmp);
0665               }
0666             });
0667 
0668     // Note: all these may throw
0669     checkForModuleDependencyCorrectness(pathsAndConsumesOfModules_, printDependencies_);
0670     if (deleteNonConsumedUnscheduledModules_) {
0671       if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
0672           not unusedModules.empty()) {
0673         pathsAndConsumesOfModules_.removeModules(unusedModules);
0674 
0675         edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
0676           l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
0677                "and "
0678                "therefore they are deleted before beginJob transition.";
0679           for (auto const& description : unusedModules) {
0680             l << "\n " << description->moduleLabel();
0681           }
0682         });
0683         for (auto const& description : unusedModules) {
0684           schedule_->deleteModule(description->moduleLabel(), actReg_.get());
0685         }
0686       }
0687     }
0688     // Initialize after the deletion of non-consumed unscheduled
0689     // modules to avoid non-consumed non-run modules to keep the
0690     // products unnecessarily alive
0691     if (not branchesToDeleteEarly_.empty()) {
0692       auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
0693       auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
0694       auto referencesToBranches = std::move(referencesToBranches_);
0695       schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
0696     }
0697 
0698     if (preallocations_.numberOfLuminosityBlocks() > 1) {
0699       throwAboutModulesRequiringLuminosityBlockSynchronization();
0700     }
0701     if (preallocations_.numberOfRuns() > 1) {
0702       warnAboutModulesRequiringRunSynchronization();
0703     }
0704 
0705     //NOTE:  This implementation assumes 'Job' means one call
0706     // the EventProcessor::run
0707     // If it really means once per 'application' then this code will
0708     // have to be changed.
0709     // Also have to deal with case where have 'run' then new Module
0710     // added and do 'run'
0711     // again.  In that case the newly added Module needs its 'beginJob'
0712     // to be called.
0713 
0714     //NOTE: in future we should have a beginOfJob for looper that takes no arguments
0715     //  For now we delay calling beginOfJob until first beginOfRun
0716     //if(looper_) {
0717     //   looper_->beginOfJob(es);
0718     //}
0719     espController_->finishConfiguration();
0720     actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
0721     actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
0722     try {
0723       convertException::wrap([&]() { input_->doBeginJob(); });
0724     } catch (cms::Exception& ex) {
0725       ex.addContext("Calling beginJob for the source");
0726       throw;
0727     }
0728 
0729     schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
0730     if (looper_) {
0731       constexpr bool mustPrefetchMayGet = true;
0732       auto const processBlockLookup = preg_->productLookup(InProcess);
0733       auto const runLookup = preg_->productLookup(InRun);
0734       auto const lumiLookup = preg_->productLookup(InLumi);
0735       auto const eventLookup = preg_->productLookup(InEvent);
0736       looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
0737       looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
0738       looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
0739       looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
0740       looper_->updateLookup(esp_->recordsToResolverIndices());
0741     }
0742     // toerror.succeeded(); // should we add this?
0743     for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
0744     actReg_->postBeginJobSignal_();
0745 
0746     oneapi::tbb::task_group group;
0747     FinalWaitingTask last{group};
0748     using namespace edm::waiting_task::chain;
0749     first([this](auto nextTask) {
0750       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0751         first([i, this](auto nextTask) {
0752           ServiceRegistry::Operate operate(serviceToken_);
0753           schedule_->beginStream(i);
0754         }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
0755           ServiceRegistry::Operate operate(serviceToken_);
0756           for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
0757         }) | lastTask(nextTask);
0758       }
0759     }) | runLast(WaitingTaskHolder(group, &last));
0760     last.wait();
0761   }
0762 
0763   void EventProcessor::endJob() {
0764     // Collects exceptions, so we don't throw before all operations are performed.
0765     ExceptionCollector c(
0766         "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0767 
0768     //make the services available
0769     ServiceRegistry::Operate operate(serviceToken_);
0770 
0771     using namespace edm::waiting_task::chain;
0772 
0773     oneapi::tbb::task_group group;
0774     edm::FinalWaitingTask waitTask{group};
0775 
0776     {
0777       //handle endStream transitions
0778       edm::WaitingTaskHolder taskHolder(group, &waitTask);
0779       std::mutex collectorMutex;
0780       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0781         first([this, i, &c, &collectorMutex](auto nextTask) {
0782           std::exception_ptr ep;
0783           try {
0784             ServiceRegistry::Operate operate(serviceToken_);
0785             this->schedule_->endStream(i);
0786           } catch (...) {
0787             ep = std::current_exception();
0788           }
0789           if (ep) {
0790             std::lock_guard<std::mutex> l(collectorMutex);
0791             c.call([&ep]() { std::rethrow_exception(ep); });
0792           }
0793         }) | then([this, i, &c, &collectorMutex](auto nextTask) {
0794           for (auto& subProcess : subProcesses_) {
0795             first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
0796               std::exception_ptr ep;
0797               try {
0798                 ServiceRegistry::Operate operate(serviceToken_);
0799                 subProcess.doEndStream(i);
0800               } catch (...) {
0801                 ep = std::current_exception();
0802               }
0803               if (ep) {
0804                 std::lock_guard<std::mutex> l(collectorMutex);
0805                 c.call([&ep]() { std::rethrow_exception(ep); });
0806               }
0807             }) | lastTask(nextTask);
0808           }
0809         }) | lastTask(taskHolder);
0810       }
0811     }
0812     waitTask.waitNoThrow();
0813 
0814     auto actReg = actReg_.get();
0815     c.call([actReg]() { actReg->preEndJobSignal_(); });
0816     schedule_->endJob(c);
0817     for (auto& subProcess : subProcesses_) {
0818       c.call(std::bind(&SubProcess::doEndJob, &subProcess));
0819     }
0820     c.call(std::bind(&InputSource::doEndJob, input_.get()));
0821     if (looper_) {
0822       c.call(std::bind(&EDLooperBase::endOfJob, looper()));
0823     }
0824     c.call([actReg]() { actReg->postEndJobSignal_(); });
0825     if (c.hasThrown()) {
0826       c.rethrow();
0827     }
0828   }
0829 
0830   ServiceToken EventProcessor::getToken() { return serviceToken_; }
0831 
0832   std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
0833     return schedule_->getAllModuleDescriptions();
0834   }
0835 
0836   int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
0837 
0838   int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
0839 
0840   int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
0841 
0842   void EventProcessor::clearCounters() { schedule_->clearCounters(); }
0843 
0844   namespace {
0845 #include "TransitionProcessors.icc"
0846   }
0847 
0848   bool EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
0849     bool returnValue = false;
0850 
0851     // Look for a shutdown signal
0852     if (shutdown_flag.load(std::memory_order_acquire)) {
0853       returnValue = true;
0854       edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
0855       edm::Service<edm::JobReport> jr;
0856       jr->reportShutdownSignal();
0857       returnCode = epSignal;
0858     }
0859     return returnValue;
0860   }
0861 
0862   namespace {
0863     struct SourceNextGuard {
0864       SourceNextGuard(edm::ActivityRegistry& iReg) : act_(iReg) { iReg.preSourceNextTransitionSignal_.emit(); }
0865       ~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
0866       edm::ActivityRegistry& act_;
0867     };
0868   }  // namespace
0869 
0870   InputSource::ItemTypeInfo EventProcessor::nextTransitionType() {
0871     SendSourceTerminationSignalIfException sentry(actReg_.get());
0872     InputSource::ItemTypeInfo itemTypeInfo;
0873     {
0874       SourceNextGuard guard(*actReg_.get());
0875       //For now, do nothing with InputSource::IsSynchronize
0876       do {
0877         itemTypeInfo = input_->nextItemType();
0878       } while (itemTypeInfo == InputSource::ItemType::IsSynchronize);
0879     }
0880     lastSourceTransition_ = itemTypeInfo;
0881     sentry.completedSuccessfully();
0882 
0883     StatusCode returnCode = epSuccess;
0884 
0885     if (checkForAsyncStopRequest(returnCode)) {
0886       actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
0887       lastSourceTransition_ = InputSource::ItemType::IsStop;
0888     }
0889 
0890     return lastSourceTransition_;
0891   }
0892 
0893   void EventProcessor::nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
0894                                                WaitingTaskHolder nextTask) {
0895     auto group = nextTask.group();
0896     sourceResourcesAcquirer_.serialQueueChain().push(
0897         *group, [this, runStatus = std::move(iRunStatus), nextHolder = std::move(nextTask)]() mutable {
0898           CMS_SA_ALLOW try {
0899             ServiceRegistry::Operate operate(serviceToken_);
0900             std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
0901             nextTransitionType();
0902             if (lastTransitionType() == InputSource::ItemType::IsRun &&
0903                 runStatus->runPrincipal()->run() == input_->run() &&
0904                 runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
0905               throw Exception(errors::LogicError)
0906                   << "InputSource claimed previous Run Entry was last to be merged in this file,\n"
0907                   << "but the next entry has the same run number and reduced ProcessHistoryID.\n"
0908                   << "This is probably a bug in the InputSource. Please report to the Core group.\n";
0909             }
0910           } catch (...) {
0911             nextHolder.doneWaiting(std::current_exception());
0912           }
0913         });
0914   }
0915 
0916   EventProcessor::StatusCode EventProcessor::runToCompletion() {
0917     beginJob();  //make sure this was called
0918 
0919     // make the services available
0920     ServiceRegistry::Operate operate(serviceToken_);
0921     actReg_->beginProcessingSignal_();
0922     auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
0923     std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
0924     try {
0925       FilesProcessor fp(fileModeNoMerge_);
0926 
0927       convertException::wrap([&]() {
0928         bool firstTime = true;
0929         do {
0930           if (not firstTime) {
0931             prepareForNextLoop();
0932             rewindInput();
0933           } else {
0934             firstTime = false;
0935           }
0936           startingNewLoop();
0937 
0938           auto trans = fp.processFiles(*this);
0939 
0940           fp.normalEnd();
0941 
0942           if (deferredExceptionPtrIsSet_.load()) {
0943             std::rethrow_exception(deferredExceptionPtr_);
0944           }
0945           if (trans != InputSource::ItemType::IsStop) {
0946             //problem with the source
0947             doErrorStuff();
0948 
0949             throw cms::Exception("BadTransition") << "Unexpected transition change " << static_cast<int>(trans);
0950           }
0951         } while (not endOfLoop());
0952       });  // convertException::wrap
0953 
0954     }  // Try block
0955     catch (cms::Exception& e) {
0956       if (exceptionMessageLumis_) {
0957         std::string message(
0958             "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
0959         e.addAdditionalInfo(message);
0960         if (e.alreadyPrinted()) {
0961           LogAbsolute("Additional Exceptions") << message;
0962         }
0963       }
0964       if (exceptionMessageRuns_) {
0965         std::string message(
0966             "Another exception was caught while trying to clean up runs after the primary fatal exception.");
0967         e.addAdditionalInfo(message);
0968         if (e.alreadyPrinted()) {
0969           LogAbsolute("Additional Exceptions") << message;
0970         }
0971       }
0972       if (!exceptionMessageFiles_.empty()) {
0973         e.addAdditionalInfo(exceptionMessageFiles_);
0974         if (e.alreadyPrinted()) {
0975           LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
0976         }
0977       }
0978       throw;
0979     }
0980     return epSuccess;
0981   }
0982 
0983   void EventProcessor::readFile() {
0984     FDEBUG(1) << " \treadFile\n";
0985     size_t size = preg_->size();
0986     SendSourceTerminationSignalIfException sentry(actReg_.get());
0987 
0988     if (streamRunActive_ > 0) {
0989       streamRunStatus_[0]->runPrincipal()->preReadFile();
0990       streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
0991     }
0992 
0993     if (streamLumiActive_ > 0) {
0994       streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
0995     }
0996 
0997     fb_ = input_->readFile();
0998     if (size < preg_->size()) {
0999       principalCache_.adjustIndexesAfterProductRegistryAddition();
1000     }
1001     principalCache_.adjustEventsToNewProductRegistry(preg());
1002     if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
1003       fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1004     }
1005     sentry.completedSuccessfully();
1006   }
1007 
1008   void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1009     if (fileBlockValid()) {
1010       SendSourceTerminationSignalIfException sentry(actReg_.get());
1011       input_->closeFile(fb_.get(), cleaningUpAfterException);
1012       sentry.completedSuccessfully();
1013     }
1014     FDEBUG(1) << "\tcloseInputFile\n";
1015   }
1016 
1017   void EventProcessor::openOutputFiles() {
1018     if (fileBlockValid()) {
1019       schedule_->openOutputFiles(*fb_);
1020       for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
1021     }
1022     FDEBUG(1) << "\topenOutputFiles\n";
1023   }
1024 
1025   void EventProcessor::closeOutputFiles() {
1026     schedule_->closeOutputFiles();
1027     for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
1028     processBlockHelper_->clearAfterOutputFilesClose();
1029     FDEBUG(1) << "\tcloseOutputFiles\n";
1030   }
1031 
1032   void EventProcessor::respondToOpenInputFile() {
1033     if (fileBlockValid()) {
1034       for_all(subProcesses_,
1035               [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1036       schedule_->respondToOpenInputFile(*fb_);
1037       for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1038     }
1039     FDEBUG(1) << "\trespondToOpenInputFile\n";
1040   }
1041 
1042   void EventProcessor::respondToCloseInputFile() {
1043     if (fileBlockValid()) {
1044       schedule_->respondToCloseInputFile(*fb_);
1045       for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1046     }
1047     FDEBUG(1) << "\trespondToCloseInputFile\n";
1048   }
1049 
1050   void EventProcessor::startingNewLoop() {
1051     shouldWeStop_ = false;
1052     //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1053     // until after we've called beginOfJob
1054     if (looper_ && looperBeginJobRun_) {
1055       looper_->doStartingNewLoop();
1056     }
1057     FDEBUG(1) << "\tstartingNewLoop\n";
1058   }
1059 
1060   bool EventProcessor::endOfLoop() {
1061     if (looper_) {
1062       ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToResolverIndices());
1063       looper_->setModuleChanger(&changer);
1064       EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1065       looper_->setModuleChanger(nullptr);
1066       if (status != EDLooperBase::kContinue || forceLooperToEnd_)
1067         return true;
1068       else
1069         return false;
1070     }
1071     FDEBUG(1) << "\tendOfLoop\n";
1072     return true;
1073   }
1074 
1075   void EventProcessor::rewindInput() {
1076     input_->repeat();
1077     input_->rewind();
1078     FDEBUG(1) << "\trewind\n";
1079   }
1080 
1081   void EventProcessor::prepareForNextLoop() {
1082     looper_->prepareForNextLoop(esp_.get());
1083     FDEBUG(1) << "\tprepareForNextLoop\n";
1084   }
1085 
1086   bool EventProcessor::shouldWeCloseOutput() const {
1087     FDEBUG(1) << "\tshouldWeCloseOutput\n";
1088     if (!subProcesses_.empty()) {
1089       for (auto const& subProcess : subProcesses_) {
1090         if (subProcess.shouldWeCloseOutput()) {
1091           return true;
1092         }
1093       }
1094       return false;
1095     }
1096     return schedule_->shouldWeCloseOutput();
1097   }
1098 
1099   void EventProcessor::doErrorStuff() {
1100     FDEBUG(1) << "\tdoErrorStuff\n";
1101     LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1102                              << "and went to the error state\n"
1103                              << "Will attempt to terminate processing normally\n"
1104                              << "(IF using the looper the next loop will be attempted)\n"
1105                              << "This likely indicates a bug in an input module or corrupted input or both\n";
1106   }
1107 
1108   void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1109     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1110     processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1111 
1112     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1113     FinalWaitingTask globalWaitTask{taskGroup_};
1114 
1115     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1116     beginGlobalTransitionAsync<Traits>(
1117         WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1118 
1119     globalWaitTask.wait();
1120     beginProcessBlockSucceeded = true;
1121   }
1122 
1123   void EventProcessor::inputProcessBlocks() {
1124     input_->fillProcessBlockHelper();
1125     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1126     while (input_->nextProcessBlock(processBlockPrincipal)) {
1127       readProcessBlock(processBlockPrincipal);
1128 
1129       using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1130       FinalWaitingTask globalWaitTask{taskGroup_};
1131 
1132       ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1133       beginGlobalTransitionAsync<Traits>(
1134           WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1135 
1136       globalWaitTask.wait();
1137 
1138       FinalWaitingTask writeWaitTask{taskGroup_};
1139       writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::Input);
1140       writeWaitTask.wait();
1141 
1142       processBlockPrincipal.clearPrincipal();
1143       for (auto& s : subProcesses_) {
1144         s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1145       }
1146     }
1147   }
1148 
1149   void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1150     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1151 
1152     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1153     FinalWaitingTask globalWaitTask{taskGroup_};
1154 
1155     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1156     endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1157                                      *schedule_,
1158                                      transitionInfo,
1159                                      serviceToken_,
1160                                      subProcesses_,
1161                                      cleaningUpAfterException);
1162     globalWaitTask.wait();
1163 
1164     if (beginProcessBlockSucceeded) {
1165       FinalWaitingTask writeWaitTask{taskGroup_};
1166       writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::New);
1167       writeWaitTask.wait();
1168     }
1169 
1170     processBlockPrincipal.clearPrincipal();
1171     for (auto& s : subProcesses_) {
1172       s.clearProcessBlockPrincipal(ProcessBlockType::New);
1173     }
1174   }
1175 
1176   InputSource::ItemType EventProcessor::processRuns() {
1177     FinalWaitingTask waitTask{taskGroup_};
1178     assert(lastTransitionType() == InputSource::ItemType::IsRun);
1179     if (streamRunActive_ == 0) {
1180       assert(streamLumiActive_ == 0);
1181 
1182       beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1183                     WaitingTaskHolder{taskGroup_, &waitTask});
1184     } else {
1185       assert(streamRunActive_ == preallocations_.numberOfStreams());
1186 
1187       auto runStatus = streamRunStatus_[0];
1188 
1189       while (lastTransitionType() == InputSource::ItemType::IsRun and
1190              runStatus->runPrincipal()->run() == input_->run() and
1191              runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1192         readAndMergeRun(*runStatus);
1193         nextTransitionType();
1194       }
1195 
1196       setNeedToCallNext(false);
1197 
1198       WaitingTaskHolder holder{taskGroup_, &waitTask};
1199       runStatus->setHolderOfTaskInProcessRuns(holder);
1200       if (streamLumiActive_ > 0) {
1201         assert(streamLumiActive_ == preallocations_.numberOfStreams());
1202         continueLumiAsync(std::move(holder));
1203       } else {
1204         handleNextItemAfterMergingRunEntries(std::move(runStatus), std::move(holder));
1205       }
1206     }
1207     waitTask.wait();
1208     return lastTransitionType();
1209   }
1210 
1211   void EventProcessor::beginRunAsync(IOVSyncValue const& iSync, WaitingTaskHolder iHolder) {
1212     if (iHolder.taskHasFailed()) {
1213       return;
1214     }
1215 
1216     actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1217 
1218     auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1219 
1220     chain::first([this, &status, iSync](auto nextTask) {
1221       espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1222                                                            nextTask,
1223                                                            status->endIOVWaitingTasks(),
1224                                                            status->eventSetupImpls(),
1225                                                            queueWhichWaitsForIOVsToFinish_,
1226                                                            actReg_.get(),
1227                                                            serviceToken_,
1228                                                            forceESCacheClearOnNewRun_);
1229     }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1230       CMS_SA_ALLOW try {
1231         if (iException) {
1232           WaitingTaskHolder copyHolder(nextTask);
1233           copyHolder.doneWaiting(*iException);
1234           // Finish handling the exception in the task pushed to runQueue_
1235         }
1236         ServiceRegistry::Operate operate(serviceToken_);
1237 
1238         runQueue_->pushAndPause(
1239             *nextTask.group(),
1240             [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1241               CMS_SA_ALLOW try {
1242                 if (postRunQueueTask.taskHasFailed()) {
1243                   status->resetBeginResources();
1244                   queueWhichWaitsForIOVsToFinish_.resume();
1245                   return;
1246                 }
1247 
1248                 status->setResumer(std::move(iResumer));
1249 
1250                 sourceResourcesAcquirer_.serialQueueChain().push(
1251                     *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1252                       CMS_SA_ALLOW try {
1253                         ServiceRegistry::Operate operate(serviceToken_);
1254 
1255                         if (postSourceTask.taskHasFailed()) {
1256                           status->resetBeginResources();
1257                           queueWhichWaitsForIOVsToFinish_.resume();
1258                           status->resumeGlobalRunQueue();
1259                           return;
1260                         }
1261 
1262                         status->setRunPrincipal(readRun());
1263 
1264                         RunPrincipal& runPrincipal = *status->runPrincipal();
1265                         {
1266                           SendSourceTerminationSignalIfException sentry(actReg_.get());
1267                           input_->doBeginRun(runPrincipal, &processContext_);
1268                           sentry.completedSuccessfully();
1269                         }
1270 
1271                         EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1272                         if (looper_ && looperBeginJobRun_ == false) {
1273                           looper_->copyInfo(ScheduleInfo(schedule_.get()));
1274 
1275                           oneapi::tbb::task_group group;
1276                           FinalWaitingTask waitTask{group};
1277                           using namespace edm::waiting_task::chain;
1278                           chain::first([this, &es](auto nextTask) {
1279                             looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1280                           }) | then([this, &es](auto nextTask) mutable {
1281                             looper_->beginOfJob(es);
1282                             looperBeginJobRun_ = true;
1283                             looper_->doStartingNewLoop();
1284                           }) | runLast(WaitingTaskHolder(group, &waitTask));
1285                           waitTask.wait();
1286                         }
1287 
1288                         using namespace edm::waiting_task::chain;
1289                         chain::first([this, status](auto nextTask) mutable {
1290                           CMS_SA_ALLOW try {
1291                             if (lastTransitionType().itemPosition() != InputSource::ItemPosition::LastItemToBeMerged) {
1292                               readAndMergeRunEntriesAsync(status, nextTask);
1293                             } else {
1294                               setNeedToCallNext(true);
1295                             }
1296                           } catch (...) {
1297                             status->setStopBeforeProcessingRun(true);
1298                             nextTask.doneWaiting(std::current_exception());
1299                           }
1300                         }) | then([this, status, &es](auto nextTask) {
1301                           if (status->stopBeforeProcessingRun()) {
1302                             return;
1303                           }
1304                           RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1305                           using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1306                           beginGlobalTransitionAsync<Traits>(
1307                               nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1308                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1309                           if (status->stopBeforeProcessingRun()) {
1310                             return;
1311                           }
1312                           looper_->prefetchAsync(
1313                               nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1314                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1315                           if (status->stopBeforeProcessingRun()) {
1316                             return;
1317                           }
1318                           ServiceRegistry::Operate operateLooper(serviceToken_);
1319                           looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1320                         }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1321                           if (iException) {
1322                             WaitingTaskHolder copyHolder(holder);
1323                             copyHolder.doneWaiting(*iException);
1324                           } else {
1325                             status->globalBeginDidSucceed();
1326                           }
1327 
1328                           if (status->stopBeforeProcessingRun()) {
1329                             // We just quit now if there was a failure when merging runs
1330                             status->resetBeginResources();
1331                             queueWhichWaitsForIOVsToFinish_.resume();
1332                             status->resumeGlobalRunQueue();
1333                             return;
1334                           }
1335                           CMS_SA_ALLOW try {
1336                             // Under normal circumstances, this task runs after endRun has completed for all streams
1337                             // and global endLumi has completed for all lumis contained in this run
1338                             auto globalEndRunTask =
1339                                 edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1340                                   WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1341                                   status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1342                                   globalEndRunAsync(std::move(taskHolder), std::move(status));
1343                                 });
1344                             status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1345                           } catch (...) {
1346                             status->resetBeginResources();
1347                             queueWhichWaitsForIOVsToFinish_.resume();
1348                             status->resumeGlobalRunQueue();
1349                             holder.doneWaiting(std::current_exception());
1350                             return;
1351                           }
1352 
1353                           // After this point we are committed to end the run via endRunAsync
1354 
1355                           ServiceRegistry::Operate operate(serviceToken_);
1356 
1357                           // The only purpose of the pause is to cause stream begin run to execute before
1358                           // global begin lumi in the single threaded case (maintains consistency with
1359                           // the order that existed before concurrent runs were implemented).
1360                           PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1361 
1362                           CMS_SA_ALLOW try {
1363                             streamQueuesInserter_.push(*holder.group(), [this, status, holder]() mutable {
1364                               for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1365                                 CMS_SA_ALLOW try {
1366                                   streamQueues_[i].push(*holder.group(), [this, i, status, holder]() mutable {
1367                                     streamBeginRunAsync(i, std::move(status), std::move(holder));
1368                                   });
1369                                 } catch (...) {
1370                                   if (status->streamFinishedBeginRun()) {
1371                                     WaitingTaskHolder copyHolder(holder);
1372                                     copyHolder.doneWaiting(std::current_exception());
1373                                     status->resetBeginResources();
1374                                     queueWhichWaitsForIOVsToFinish_.resume();
1375                                     exceptionRunStatus_ = status;
1376                                   }
1377                                 }
1378                               }
1379                             });
1380                           } catch (...) {
1381                             WaitingTaskHolder copyHolder(holder);
1382                             copyHolder.doneWaiting(std::current_exception());
1383                             status->resetBeginResources();
1384                             queueWhichWaitsForIOVsToFinish_.resume();
1385                             exceptionRunStatus_ = status;
1386                           }
1387                           handleNextItemAfterMergingRunEntries(status, holder);
1388                         }) | runLast(postSourceTask);
1389                       } catch (...) {
1390                         status->resetBeginResources();
1391                         queueWhichWaitsForIOVsToFinish_.resume();
1392                         status->resumeGlobalRunQueue();
1393                         postSourceTask.doneWaiting(std::current_exception());
1394                       }
1395                     });  // task in sourceResourcesAcquirer
1396               } catch (...) {
1397                 status->resetBeginResources();
1398                 queueWhichWaitsForIOVsToFinish_.resume();
1399                 status->resumeGlobalRunQueue();
1400                 postRunQueueTask.doneWaiting(std::current_exception());
1401               }
1402             });  // task in runQueue
1403       } catch (...) {
1404         status->resetBeginResources();
1405         queueWhichWaitsForIOVsToFinish_.resume();
1406         nextTask.doneWaiting(std::current_exception());
1407       }
1408     }) | chain::runLast(std::move(iHolder));
1409   }
1410 
1411   void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1412                                            std::shared_ptr<RunProcessingStatus> status,
1413                                            WaitingTaskHolder iHolder) noexcept {
1414     // These shouldn't throw
1415     streamQueues_[iStream].pause();
1416     ++streamRunActive_;
1417     streamRunStatus_[iStream] = std::move(status);
1418 
1419     CMS_SA_ALLOW try {
1420       using namespace edm::waiting_task::chain;
1421       chain::first([this, iStream](auto nextTask) {
1422         RunProcessingStatus& rs = *streamRunStatus_[iStream];
1423         if (rs.didGlobalBeginSucceed()) {
1424           RunTransitionInfo transitionInfo(
1425               *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1426           using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1427           beginStreamTransitionAsync<Traits>(
1428               std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1429         }
1430       }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1431         if (exceptionFromBeginStreamRun) {
1432           nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1433         }
1434         releaseBeginRunResources(iStream);
1435       }) | runLast(iHolder);
1436     } catch (...) {
1437       releaseBeginRunResources(iStream);
1438       iHolder.doneWaiting(std::current_exception());
1439     }
1440   }
1441 
1442   void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1443     auto& status = streamRunStatus_[iStream];
1444     if (status->streamFinishedBeginRun()) {
1445       status->resetBeginResources();
1446       queueWhichWaitsForIOVsToFinish_.resume();
1447     }
1448     streamQueues_[iStream].resume();
1449   }
1450 
1451   void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1452     RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1453     iRunStatus->setEndTime();
1454     IOVSyncValue ts(
1455         EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1456         runPrincipal.endTime());
1457     CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1458       WaitingTaskHolder copyHolder(iHolder);
1459       copyHolder.doneWaiting(std::current_exception());
1460     }
1461 
1462     chain::first([this, &iRunStatus, &ts](auto nextTask) {
1463       espController_->runOrQueueEventSetupForInstanceAsync(ts,
1464                                                            nextTask,
1465                                                            iRunStatus->endIOVWaitingTasksEndRun(),
1466                                                            iRunStatus->eventSetupImplsEndRun(),
1467                                                            queueWhichWaitsForIOVsToFinish_,
1468                                                            actReg_.get(),
1469                                                            serviceToken_);
1470     }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1471       if (iException) {
1472         iRunStatus->setEndingEventSetupSucceeded(false);
1473         handleEndRunExceptions(*iException, nextTask);
1474       }
1475       ServiceRegistry::Operate operate(serviceToken_);
1476       streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1477         for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1478           CMS_SA_ALLOW try {
1479             streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1480               streamQueues_[i].pause();
1481               streamEndRunAsync(std::move(nextTask), i);
1482             });
1483           } catch (...) {
1484             WaitingTaskHolder copyHolder(nextTask);
1485             copyHolder.doneWaiting(std::current_exception());
1486           }
1487         }
1488       });
1489 
1490       if (lastTransitionType() == InputSource::ItemType::IsRun) {
1491         CMS_SA_ALLOW try {
1492           beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1493         } catch (...) {
1494           WaitingTaskHolder copyHolder(nextTask);
1495           copyHolder.doneWaiting(std::current_exception());
1496         }
1497       }
1498     }) | chain::runLast(std::move(iHolder));
1499   }
1500 
1501   void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1502     if (holder.taskHasFailed()) {
1503       setExceptionMessageRuns();
1504     } else {
1505       WaitingTaskHolder tmp(holder);
1506       tmp.doneWaiting(iException);
1507     }
1508   }
1509 
1510   void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1511     auto& runPrincipal = *(iRunStatus->runPrincipal());
1512     bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1513     bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1514     EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1515     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1516     bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1517 
1518     MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1519     using namespace edm::waiting_task::chain;
1520     chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1521                      auto nextTask) {
1522       if (endingEventSetupSucceeded) {
1523         RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1524         using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1525         endGlobalTransitionAsync<Traits>(
1526             std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1527       }
1528     }) |
1529         ifThen(looper_ && endingEventSetupSucceeded,
1530                [this, &runPrincipal, &es](auto nextTask) {
1531                  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1532                }) |
1533         ifThen(looper_ && endingEventSetupSucceeded,
1534                [this, &runPrincipal, &es](auto nextTask) {
1535                  ServiceRegistry::Operate operate(serviceToken_);
1536                  looper_->doEndRun(runPrincipal, es, &processContext_);
1537                }) |
1538         ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1539                [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1540                  mergeableRunProductMetadata->preWriteRun();
1541                  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1542                }) |
1543         then([status = std::move(iRunStatus),
1544               this,
1545               didGlobalBeginSucceed,
1546               mergeableRunProductMetadata,
1547               endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1548           if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1549             mergeableRunProductMetadata->postWriteRun();
1550           }
1551           if (iException) {
1552             handleEndRunExceptions(*iException, nextTask);
1553           }
1554           ServiceRegistry::Operate operate(serviceToken_);
1555 
1556           std::exception_ptr ptr;
1557 
1558           // Try hard to clean up resources so the
1559           // process can terminate in a controlled
1560           // fashion even after exceptions have occurred.
1561           CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1562             if (not ptr) {
1563               ptr = std::current_exception();
1564             }
1565           }
1566           CMS_SA_ALLOW try {
1567             status->resumeGlobalRunQueue();
1568             queueWhichWaitsForIOVsToFinish_.resume();
1569           } catch (...) {
1570             if (not ptr) {
1571               ptr = std::current_exception();
1572             }
1573           }
1574           CMS_SA_ALLOW try {
1575             status->resetEndResources();
1576             status.reset();
1577           } catch (...) {
1578             if (not ptr) {
1579               ptr = std::current_exception();
1580             }
1581           }
1582 
1583           if (ptr && !iException) {
1584             handleEndRunExceptions(ptr, nextTask);
1585           }
1586         }) |
1587         runLast(std::move(iTask));
1588   }
1589 
1590   void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1591     CMS_SA_ALLOW try {
1592       if (!streamRunStatus_[iStreamIndex]) {
1593         if (exceptionRunStatus_->streamFinishedRun()) {
1594           exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1595           exceptionRunStatus_.reset();
1596         }
1597         return;
1598       }
1599 
1600       auto runDoneTask =
1601           edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1602             if (iException) {
1603               handleEndRunExceptions(*iException, iTask);
1604             }
1605 
1606             auto runStatus = streamRunStatus_[iStreamIndex];
1607 
1608             //reset status before releasing queue else get race condition
1609             if (runStatus->streamFinishedRun()) {
1610               runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1611             }
1612             streamRunStatus_[iStreamIndex].reset();
1613             --streamRunActive_;
1614             streamQueues_[iStreamIndex].resume();
1615           });
1616 
1617       WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1618 
1619       auto runStatus = streamRunStatus_[iStreamIndex].get();
1620 
1621       if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1622         EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1623         auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1624         bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1625 
1626         auto& runPrincipal = *runStatus->runPrincipal();
1627         using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1628         RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1629         endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1630                                          *schedule_,
1631                                          iStreamIndex,
1632                                          transitionInfo,
1633                                          serviceToken_,
1634                                          subProcesses_,
1635                                          cleaningUpAfterException);
1636       }
1637     } catch (...) {
1638       handleEndRunExceptions(std::current_exception(), iTask);
1639     }
1640   }
1641 
1642   void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1643     if (streamRunActive_ > 0) {
1644       FinalWaitingTask waitTask{taskGroup_};
1645 
1646       auto runStatus = streamRunStatus_[0].get();
1647       runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1648       WaitingTaskHolder holder{taskGroup_, &waitTask};
1649       runStatus->setHolderOfTaskInProcessRuns(holder);
1650       lastSourceTransition_ = InputSource::ItemType::IsStop;
1651       endRunAsync(streamRunStatus_[0], std::move(holder));
1652       waitTask.wait();
1653     }
1654   }
1655 
1656   void EventProcessor::beginLumiAsync(IOVSyncValue const& iSync,
1657                                       std::shared_ptr<RunProcessingStatus> iRunStatus,
1658                                       edm::WaitingTaskHolder iHolder) {
1659     actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1660 
1661     auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1662     chain::first([this, &iSync, &status](auto nextTask) {
1663       espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1664                                                            nextTask,
1665                                                            status->endIOVWaitingTasks(),
1666                                                            status->eventSetupImpls(),
1667                                                            queueWhichWaitsForIOVsToFinish_,
1668                                                            actReg_.get(),
1669                                                            serviceToken_);
1670     }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1671       CMS_SA_ALLOW try {
1672         //the call to doneWaiting will cause the count to decrement
1673         if (iException) {
1674           WaitingTaskHolder copyHolder(nextTask);
1675           copyHolder.doneWaiting(*iException);
1676         }
1677 
1678         lumiQueue_->pushAndPause(
1679             *nextTask.group(),
1680             [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1681               CMS_SA_ALLOW try {
1682                 if (postLumiQueueTask.taskHasFailed()) {
1683                   status->resetResources();
1684                   queueWhichWaitsForIOVsToFinish_.resume();
1685                   endRunAsync(iRunStatus, postLumiQueueTask);
1686                   return;
1687                 }
1688 
1689                 status->setResumer(std::move(iResumer));
1690 
1691                 sourceResourcesAcquirer_.serialQueueChain().push(
1692                     *postLumiQueueTask.group(),
1693                     [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1694                       CMS_SA_ALLOW try {
1695                         ServiceRegistry::Operate operate(serviceToken_);
1696 
1697                         if (postSourceTask.taskHasFailed()) {
1698                           status->resetResources();
1699                           queueWhichWaitsForIOVsToFinish_.resume();
1700                           endRunAsync(iRunStatus, postSourceTask);
1701                           return;
1702                         }
1703 
1704                         status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1705 
1706                         LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1707                         {
1708                           SendSourceTerminationSignalIfException sentry(actReg_.get());
1709                           input_->doBeginLumi(lumiPrincipal, &processContext_);
1710                           sentry.completedSuccessfully();
1711                         }
1712 
1713                         Service<RandomNumberGenerator> rng;
1714                         if (rng.isAvailable()) {
1715                           LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1716                           rng->preBeginLumi(lb);
1717                         }
1718 
1719                         EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1720 
1721                         using namespace edm::waiting_task::chain;
1722                         chain::first([this, status](auto nextTask) mutable {
1723                           if (lastTransitionType().itemPosition() != InputSource::ItemPosition::LastItemToBeMerged) {
1724                             readAndMergeLumiEntriesAsync(std::move(status), std::move(nextTask));
1725                           } else {
1726                             setNeedToCallNext(true);
1727                           }
1728                         }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1729                           LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1730                           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1731                           beginGlobalTransitionAsync<Traits>(
1732                               nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1733                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1734                           looper_->prefetchAsync(
1735                               nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1736                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1737                           ServiceRegistry::Operate operateLooper(serviceToken_);
1738                           looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1739                         }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1740                           status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1741 
1742                           if (iException) {
1743                             WaitingTaskHolder copyHolder(holder);
1744                             copyHolder.doneWaiting(*iException);
1745                             globalEndLumiAsync(holder, status);
1746                             endRunAsync(iRunStatus, holder);
1747                           } else {
1748                             status->globalBeginDidSucceed();
1749 
1750                             EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1751                             using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1752 
1753                             streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1754                               for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1755                                 streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1756                                   if (!status->shouldStreamStartLumi()) {
1757                                     return;
1758                                   }
1759                                   streamQueues_[i].pause();
1760 
1761                                   auto& event = principalCache_.eventPrincipal(i);
1762                                   auto eventSetupImpls = &status->eventSetupImpls();
1763                                   auto lp = status->lumiPrincipal().get();
1764                                   streamLumiStatus_[i] = std::move(status);
1765                                   ++streamLumiActive_;
1766                                   event.setLuminosityBlockPrincipal(lp);
1767                                   LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1768                                   using namespace edm::waiting_task::chain;
1769                                   chain::first([this, i, &transitionInfo](auto nextTask) {
1770                                     beginStreamTransitionAsync<Traits>(std::move(nextTask),
1771                                                                        *schedule_,
1772                                                                        i,
1773                                                                        transitionInfo,
1774                                                                        serviceToken_,
1775                                                                        subProcesses_);
1776                                   }) |
1777                                       then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1778                                                      auto nextTask) {
1779                                         if (exceptionFromBeginStreamLumi) {
1780                                           WaitingTaskHolder copyHolder(nextTask);
1781                                           copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1782                                         }
1783                                         handleNextEventForStreamAsync(std::move(nextTask), i);
1784                                       }) |
1785                                       runLast(std::move(holder));
1786                                 });
1787                               }  // end for loop over streams
1788                             });
1789                           }
1790                         }) | runLast(postSourceTask);
1791                       } catch (...) {
1792                         status->resetResources();
1793                         queueWhichWaitsForIOVsToFinish_.resume();
1794                         WaitingTaskHolder copyHolder(postSourceTask);
1795                         copyHolder.doneWaiting(std::current_exception());
1796                         endRunAsync(iRunStatus, postSourceTask);
1797                       }
1798                     });  // task in sourceResourcesAcquirer
1799               } catch (...) {
1800                 status->resetResources();
1801                 queueWhichWaitsForIOVsToFinish_.resume();
1802                 WaitingTaskHolder copyHolder(postLumiQueueTask);
1803                 copyHolder.doneWaiting(std::current_exception());
1804                 endRunAsync(iRunStatus, postLumiQueueTask);
1805               }
1806             });  // task in lumiQueue
1807       } catch (...) {
1808         status->resetResources();
1809         queueWhichWaitsForIOVsToFinish_.resume();
1810         WaitingTaskHolder copyHolder(nextTask);
1811         copyHolder.doneWaiting(std::current_exception());
1812         endRunAsync(iRunStatus, nextTask);
1813       }
1814     }) | chain::runLast(std::move(iHolder));
1815   }
1816 
1817   void EventProcessor::continueLumiAsync(edm::WaitingTaskHolder iHolder) {
1818     chain::first([this](auto nextTask) {
1819       //all streams are sharing the same status at the moment
1820       auto status = streamLumiStatus_[0];  //read from streamLumiActive_ happened in calling routine
1821       status->setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kProcessing);
1822 
1823       while (lastTransitionType() == InputSource::ItemType::IsLumi and
1824              status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1825         readAndMergeLumi(*status);
1826         nextTransitionType();
1827       }
1828     }) | chain::then([this](auto nextTask) mutable {
1829       unsigned int streamIndex = 0;
1830       oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1831       for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1832         arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1833       }
1834       nextTask.group()->run(
1835           [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1836     }) | chain::runLast(std::move(iHolder));
1837   }
1838 
1839   void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1840     if (holder.taskHasFailed()) {
1841       setExceptionMessageLumis();
1842     } else {
1843       WaitingTaskHolder tmp(holder);
1844       tmp.doneWaiting(iException);
1845     }
1846   }
1847 
1848   void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask,
1849                                           std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1850     // Get some needed info out of the status object before moving
1851     // it into finalTaskForThisLumi.
1852     auto& lp = *(iLumiStatus->lumiPrincipal());
1853     bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1854     bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1855     EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1856     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1857 
1858     using namespace edm::waiting_task::chain;
1859     chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1860       IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1861 
1862       LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1863       using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1864       endGlobalTransitionAsync<Traits>(
1865           std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1866     }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1867       //Only call writeLumi if beginLumi succeeded
1868       if (didGlobalBeginSucceed) {
1869         writeLumiAsync(std::move(nextTask), lumiPrincipal);
1870       }
1871     }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1872       looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1873     }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1874       //any thrown exception auto propagates to nextTask via the chain
1875       ServiceRegistry::Operate operate(serviceToken_);
1876       looper_->doEndLuminosityBlock(lp, es, &processContext_);
1877     }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1878       if (iException) {
1879         handleEndLumiExceptions(*iException, nextTask);
1880       }
1881       ServiceRegistry::Operate operate(serviceToken_);
1882 
1883       std::exception_ptr ptr;
1884 
1885       // Try hard to clean up resources so the
1886       // process can terminate in a controlled
1887       // fashion even after exceptions have occurred.
1888       // Caught exception is passed to handleEndLumiExceptions()
1889       CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1890         if (not ptr) {
1891           ptr = std::current_exception();
1892         }
1893       }
1894       // Caught exception is passed to handleEndLumiExceptions()
1895       CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1896         if (not ptr) {
1897           ptr = std::current_exception();
1898         }
1899       }
1900       // Caught exception is passed to handleEndLumiExceptions()
1901       CMS_SA_ALLOW try {
1902         status->resetResources();
1903         status->globalEndRunHolderDoneWaiting();
1904         status.reset();
1905       } catch (...) {
1906         if (not ptr) {
1907           ptr = std::current_exception();
1908         }
1909       }
1910 
1911       if (ptr && !iException) {
1912         handleEndLumiExceptions(ptr, nextTask);
1913       }
1914     }) | runLast(std::move(iTask));
1915   }
1916 
1917   void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1918     auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1919       auto status = streamLumiStatus_[iStreamIndex];
1920       if (iException) {
1921         handleEndLumiExceptions(*iException, iTask);
1922       }
1923 
1924       // reset status before releasing queue else get race condition
1925       streamLumiStatus_[iStreamIndex].reset();
1926       --streamLumiActive_;
1927       streamQueues_[iStreamIndex].resume();
1928 
1929       //are we the last one?
1930       if (status->streamFinishedLumi()) {
1931         globalEndLumiAsync(iTask, std::move(status));
1932       }
1933     });
1934 
1935     edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1936 
1937     // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1938     // therefore we do not want to hold the shared_ptr
1939     auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1940     lumiStatus->setEndTime();
1941 
1942     EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1943     auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1944     bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1945 
1946     auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1947     using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1948     LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1949     endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1950                                      *schedule_,
1951                                      iStreamIndex,
1952                                      transitionInfo,
1953                                      serviceToken_,
1954                                      subProcesses_,
1955                                      cleaningUpAfterException);
1956   }
1957 
1958   void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1959     if (streamRunActive_ == 0) {
1960       assert(streamLumiActive_ == 0);
1961     } else {
1962       assert(streamRunActive_ == preallocations_.numberOfStreams());
1963       if (streamLumiActive_ > 0) {
1964         FinalWaitingTask globalWaitTask{taskGroup_};
1965         assert(streamLumiActive_ == preallocations_.numberOfStreams());
1966         streamLumiStatus_[0]->noMoreEventsInLumi();
1967         streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1968         for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1969           streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1970         }
1971         globalWaitTask.wait();
1972       }
1973     }
1974   }
1975 
1976   void EventProcessor::readProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
1977     SendSourceTerminationSignalIfException sentry(actReg_.get());
1978     input_->readProcessBlock(processBlockPrincipal);
1979     sentry.completedSuccessfully();
1980   }
1981 
1982   std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
1983     auto rp = principalCache_.getAvailableRunPrincipalPtr();
1984     assert(rp);
1985     rp->setAux(*input_->runAuxiliary());
1986     {
1987       SendSourceTerminationSignalIfException sentry(actReg_.get());
1988       input_->readRun(*rp, *historyAppender_);
1989       sentry.completedSuccessfully();
1990     }
1991     assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1992     return rp;
1993   }
1994 
1995   void EventProcessor::readAndMergeRun(RunProcessingStatus& iStatus) {
1996     RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1997 
1998     bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1999     assert(runOK);
2000     runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
2001     {
2002       SendSourceTerminationSignalIfException sentry(actReg_.get());
2003       input_->readAndMergeRun(runPrincipal);
2004       sentry.completedSuccessfully();
2005     }
2006   }
2007 
2008   std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
2009     auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
2010     assert(lbp);
2011     lbp->setAux(*input_->luminosityBlockAuxiliary());
2012     {
2013       SendSourceTerminationSignalIfException sentry(actReg_.get());
2014       input_->readLuminosityBlock(*lbp, *historyAppender_);
2015       sentry.completedSuccessfully();
2016     }
2017     lbp->setRunPrincipal(std::move(rp));
2018     return lbp;
2019   }
2020 
2021   void EventProcessor::readAndMergeLumi(LuminosityBlockProcessingStatus& iStatus) {
2022     auto& lumiPrincipal = *iStatus.lumiPrincipal();
2023     assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
2024            input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2025                input_->processHistoryRegistry().reducedProcessHistoryID(
2026                    input_->luminosityBlockAuxiliary()->processHistoryID()));
2027     bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2028     assert(lumiOK);
2029     lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2030     {
2031       SendSourceTerminationSignalIfException sentry(actReg_.get());
2032       input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2033       sentry.completedSuccessfully();
2034     }
2035   }
2036 
2037   void EventProcessor::writeProcessBlockAsync(WaitingTaskHolder task, ProcessBlockType processBlockType) {
2038     using namespace edm::waiting_task;
2039     chain::first([&](auto nextTask) {
2040       ServiceRegistry::Operate op(serviceToken_);
2041       schedule_->writeProcessBlockAsync(
2042           nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2043     }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2044       ServiceRegistry::Operate op(serviceToken_);
2045       for (auto& s : subProcesses_) {
2046         s.writeProcessBlockAsync(nextTask, processBlockType);
2047       }
2048     }) | chain::runLast(std::move(task));
2049   }
2050 
2051   void EventProcessor::writeRunAsync(WaitingTaskHolder task,
2052                                      RunPrincipal const& runPrincipal,
2053                                      MergeableRunProductMetadata const* mergeableRunProductMetadata) {
2054     using namespace edm::waiting_task;
2055     if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2056       chain::first([&](auto nextTask) {
2057         ServiceRegistry::Operate op(serviceToken_);
2058         schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2059       }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2060         ServiceRegistry::Operate op(serviceToken_);
2061         for (auto& s : subProcesses_) {
2062           s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2063         }
2064       }) | chain::runLast(std::move(task));
2065     }
2066   }
2067 
2068   void EventProcessor::clearRunPrincipal(RunProcessingStatus& iStatus) {
2069     for (auto& s : subProcesses_) {
2070       s.clearRunPrincipal(*iStatus.runPrincipal());
2071     }
2072     iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2073     iStatus.runPrincipal()->clearPrincipal();
2074   }
2075 
2076   void EventProcessor::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& lumiPrincipal) {
2077     using namespace edm::waiting_task;
2078     if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2079       chain::first([&](auto nextTask) {
2080         ServiceRegistry::Operate op(serviceToken_);
2081 
2082         lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2083         schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2084       }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2085         ServiceRegistry::Operate op(serviceToken_);
2086         for (auto& s : subProcesses_) {
2087           s.writeLumiAsync(nextTask, lumiPrincipal);
2088         }
2089       }) | chain::lastTask(std::move(task));
2090     }
2091   }
2092 
2093   void EventProcessor::clearLumiPrincipal(LuminosityBlockProcessingStatus& iStatus) {
2094     for (auto& s : subProcesses_) {
2095       s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2096     }
2097     iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2098     iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2099     iStatus.lumiPrincipal()->clearPrincipal();
2100   }
2101 
2102   void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2103                                                    WaitingTaskHolder iHolder) {
2104     auto group = iHolder.group();
2105     sourceResourcesAcquirer_.serialQueueChain().push(
2106         *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2107           CMS_SA_ALLOW try {
2108             ServiceRegistry::Operate operate(serviceToken_);
2109 
2110             std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2111 
2112             nextTransitionType();
2113             setNeedToCallNext(false);
2114 
2115             while (lastTransitionType() == InputSource::ItemType::IsRun and
2116                    status->runPrincipal()->run() == input_->run() and
2117                    status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2118               if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2119                 status->setStopBeforeProcessingRun(true);
2120                 return;
2121               }
2122               readAndMergeRun(*status);
2123               if (lastTransitionType().itemPosition() == InputSource::ItemPosition::LastItemToBeMerged) {
2124                 setNeedToCallNext(true);
2125                 return;
2126               }
2127               nextTransitionType();
2128             }
2129           } catch (...) {
2130             status->setStopBeforeProcessingRun(true);
2131             holder.doneWaiting(std::current_exception());
2132           }
2133         });
2134   }
2135 
2136   void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2137                                                     WaitingTaskHolder iHolder) {
2138     auto group = iHolder.group();
2139     sourceResourcesAcquirer_.serialQueueChain().push(
2140         *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2141           CMS_SA_ALLOW try {
2142             ServiceRegistry::Operate operate(serviceToken_);
2143 
2144             std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2145 
2146             nextTransitionType();
2147             setNeedToCallNext(false);
2148 
2149             while (lastTransitionType() == InputSource::ItemType::IsLumi and
2150                    iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2151               readAndMergeLumi(*iLumiStatus);
2152               if (lastTransitionType().itemPosition() == InputSource::ItemPosition::LastItemToBeMerged) {
2153                 setNeedToCallNext(true);
2154                 return;
2155               }
2156               nextTransitionType();
2157             }
2158           } catch (...) {
2159             holder.doneWaiting(std::current_exception());
2160           }
2161         });
2162   }
2163 
2164   void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2165                                                             WaitingTaskHolder iHolder) {
2166     chain::first([this, iRunStatus](auto nextTask) mutable {
2167       if (needToCallNext()) {
2168         nextTransitionTypeAsync(std::move(iRunStatus), std::move(nextTask));
2169       }
2170     }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
2171       ServiceRegistry::Operate operate(serviceToken_);
2172       if (iException) {
2173         WaitingTaskHolder copyHolder(nextTask);
2174         copyHolder.doneWaiting(*iException);
2175       }
2176       if (lastTransitionType() == InputSource::ItemType::IsFile) {
2177         iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2178         return;
2179       }
2180       if (lastTransitionType() == InputSource::ItemType::IsLumi && !nextTask.taskHasFailed()) {
2181         CMS_SA_ALLOW try {
2182           beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2183                                       input_->luminosityBlockAuxiliary()->beginTime()),
2184                          iRunStatus,
2185                          nextTask);
2186           return;
2187         } catch (...) {
2188           WaitingTaskHolder copyHolder(nextTask);
2189           copyHolder.doneWaiting(std::current_exception());
2190         }
2191       }
2192       // Note that endRunAsync will call beginRunAsync for the following run
2193       // if appropriate.
2194       endRunAsync(iRunStatus, std::move(nextTask));
2195     }) | chain::runLast(std::move(iHolder));
2196   }
2197 
2198   bool EventProcessor::readNextEventForStream(WaitingTaskHolder const& iTask,
2199                                               unsigned int iStreamIndex,
2200                                               LuminosityBlockProcessingStatus& iStatus) {
2201     // This function returns true if it successfully reads an event for the stream and that
2202     // requires both that an event is next and there are no problems or requests to stop.
2203 
2204     if (iTask.taskHasFailed()) {
2205       // We want all streams to stop or all streams to pause. If we are already in the
2206       // middle of pausing streams, then finish pausing all of them and the lumi will be
2207       // ended later. Otherwise, just end it now.
2208       if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2209         iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2210       }
2211       return false;
2212     }
2213 
2214     // Did another stream already stop or pause this lumi?
2215     if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2216       return false;
2217     }
2218 
2219     // Are output modules or the looper requesting we stop?
2220     if (shouldWeStop()) {
2221       lastSourceTransition_ = InputSource::ItemType::IsStop;
2222       iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2223       return false;
2224     }
2225 
2226     ServiceRegistry::Operate operate(serviceToken_);
2227 
2228     // need to use lock in addition to the serial task queue because
2229     // of delayed provenance reading and reading data in response to
2230     // edm::Refs etc
2231     std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2232 
2233     // If we didn't already call nextTransitionType while merging lumis, call it here.
2234     // This asks the input source what is next and also checks for signals.
2235 
2236     InputSource::ItemType itemType = needToCallNext() ? nextTransitionType() : lastTransitionType();
2237     setNeedToCallNext(true);
2238 
2239     if (InputSource::ItemType::IsEvent != itemType) {
2240       // IsFile may continue processing the lumi and
2241       // looper_ can cause the input source to declare a new IsRun which is actually
2242       // just a continuation of the previous run
2243       if (InputSource::ItemType::IsStop == itemType or InputSource::ItemType::IsLumi == itemType or
2244           (InputSource::ItemType::IsRun == itemType and
2245            (iStatus.lumiPrincipal()->run() != input_->run() or
2246             iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2247         if (itemType == InputSource::ItemType::IsLumi &&
2248             iStatus.lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2249           throw Exception(errors::LogicError)
2250               << "InputSource claimed previous Lumi Entry was last to be merged in this file,\n"
2251               << "but the next lumi entry has the same lumi number.\n"
2252               << "This is probably a bug in the InputSource. Please report to the Core group.\n";
2253         }
2254         iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2255       } else {
2256         iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2257       }
2258       return false;
2259     }
2260     readEvent(iStreamIndex);
2261     return true;
2262   }
2263 
2264   void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2265     if (streamLumiStatus_[iStreamIndex]->haveStartedNextLumiOrEndedRun()) {
2266       streamEndLumiAsync(iTask, iStreamIndex);
2267       return;
2268     }
2269     auto group = iTask.group();
2270     sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2271       CMS_SA_ALLOW try {
2272         auto status = streamLumiStatus_[iStreamIndex].get();
2273         ServiceRegistry::Operate operate(serviceToken_);
2274 
2275         if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2276           auto recursionTask =
2277               make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2278                 if (iEventException) {
2279                   WaitingTaskHolder copyHolder(iTask);
2280                   copyHolder.doneWaiting(*iEventException);
2281                   // Intentionally, we don't return here. The recursive call to
2282                   // handleNextEvent takes care of immediately ending the run properly
2283                   // using the same code it uses to end the run in other situations.
2284                 }
2285                 handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2286               });
2287 
2288           processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2289         } else {
2290           // the stream will stop processing this lumi now
2291           if (status->eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi) {
2292             if (not status->haveStartedNextLumiOrEndedRun()) {
2293               status->noMoreEventsInLumi();
2294               status->startNextLumiOrEndRun();
2295               if (lastTransitionType() == InputSource::ItemType::IsLumi && !iTask.taskHasFailed()) {
2296                 CMS_SA_ALLOW try {
2297                   beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2298                                               input_->luminosityBlockAuxiliary()->beginTime()),
2299                                  streamRunStatus_[iStreamIndex],
2300                                  iTask);
2301                 } catch (...) {
2302                   WaitingTaskHolder copyHolder(iTask);
2303                   copyHolder.doneWaiting(std::current_exception());
2304                   endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2305                 }
2306               } else {
2307                 // If appropriate, this will also start the next run.
2308                 endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2309               }
2310             }
2311             streamEndLumiAsync(iTask, iStreamIndex);
2312           } else {
2313             assert(status->eventProcessingState() ==
2314                    LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2315             auto runStatus = streamRunStatus_[iStreamIndex].get();
2316 
2317             if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2318               runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2319             }
2320           }
2321         }
2322       } catch (...) {
2323         WaitingTaskHolder copyHolder(iTask);
2324         copyHolder.doneWaiting(std::current_exception());
2325         handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2326       }
2327     });
2328   }
2329 
2330   void EventProcessor::readEvent(unsigned int iStreamIndex) {
2331     //TODO this will have to become per stream
2332     auto& event = principalCache_.eventPrincipal(iStreamIndex);
2333     StreamContext streamContext(event.streamID(), &processContext_);
2334 
2335     SendSourceTerminationSignalIfException sentry(actReg_.get());
2336     input_->readEvent(event, streamContext);
2337 
2338     streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2339     streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2340     sentry.completedSuccessfully();
2341 
2342     FDEBUG(1) << "\treadEvent\n";
2343   }
2344 
2345   void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2346     iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2347   }
2348 
2349   namespace {
2350     struct ClearEventGuard {
2351       ClearEventGuard(edm::ActivityRegistry& iReg, edm::StreamContext const& iContext)
2352           : act_(iReg), context_(iContext) {
2353         iReg.preClearEventSignal_.emit(iContext);
2354       }
2355       ~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
2356       edm::ActivityRegistry& act_;
2357       edm::StreamContext const& context_;
2358     };
2359   }  // namespace
2360 
2361   void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2362     auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2363 
2364     ServiceRegistry::Operate operate(serviceToken_);
2365     Service<RandomNumberGenerator> rng;
2366     if (rng.isAvailable()) {
2367       Event ev(*pep, ModuleDescription(), nullptr);
2368       rng->postEventRead(ev);
2369     }
2370 
2371     EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2372     using namespace edm::waiting_task::chain;
2373     chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2374       EventTransitionInfo info(*pep, es);
2375       schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2376     }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2377       for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2378         subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2379       }
2380     }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2381       //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2382       ServiceRegistry::Operate operateLooper(serviceToken_);
2383       processEventWithLooper(*pep, iStreamIndex);
2384     }) | then([this, pep](auto nextTask) {
2385       FDEBUG(1) << "\tprocessEvent\n";
2386       StreamContext streamContext(pep->streamID(),
2387                                   StreamContext::Transition::kEvent,
2388                                   pep->id(),
2389                                   pep->runPrincipal().index(),
2390                                   pep->luminosityBlockPrincipal().index(),
2391                                   pep->time(),
2392                                   &processContext_);
2393       ClearEventGuard guard(*this->actReg_.get(), streamContext);
2394       pep->clearEventPrincipal();
2395     }) | runLast(iHolder);
2396   }
2397 
2398   void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2399     bool randomAccess = input_->randomAccess();
2400     ProcessingController::ForwardState forwardState = input_->forwardState();
2401     ProcessingController::ReverseState reverseState = input_->reverseState();
2402     ProcessingController pc(forwardState, reverseState, randomAccess);
2403 
2404     EDLooperBase::Status status = EDLooperBase::kContinue;
2405     do {
2406       StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2407       EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2408       status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2409 
2410       bool succeeded = true;
2411       if (randomAccess) {
2412         if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2413           input_->skipEvents(-2);
2414         } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2415           succeeded = input_->goToEvent(pc.specifiedEventTransition());
2416         }
2417       }
2418       pc.setLastOperationSucceeded(succeeded);
2419     } while (!pc.lastOperationSucceeded());
2420     if (status != EDLooperBase::kContinue) {
2421       shouldWeStop_ = true;
2422     }
2423   }
2424 
2425   bool EventProcessor::shouldWeStop() const {
2426     FDEBUG(1) << "\tshouldWeStop\n";
2427     if (shouldWeStop_)
2428       return true;
2429     if (!subProcesses_.empty()) {
2430       for (auto const& subProcess : subProcesses_) {
2431         if (subProcess.terminate()) {
2432           return true;
2433         }
2434       }
2435       return false;
2436     }
2437     return schedule_->terminate();
2438   }
2439 
2440   void EventProcessor::setExceptionMessageFiles(std::string& message) { exceptionMessageFiles_ = message; }
2441 
2442   void EventProcessor::setExceptionMessageRuns() { exceptionMessageRuns_ = true; }
2443 
2444   void EventProcessor::setExceptionMessageLumis() { exceptionMessageLumis_ = true; }
2445 
2446   bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2447     bool expected = false;
2448     if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2449       deferredExceptionPtr_ = iException;
2450       return true;
2451     }
2452     return false;
2453   }
2454 
2455   void EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization() const {
2456     cms::Exception ex("ModulesSynchingOnLumis");
2457     ex << "The framework is configured to use at least two streams, but the following modules\n"
2458        << "require synchronizing on LuminosityBlock boundaries:";
2459     bool found = false;
2460     for (auto worker : schedule_->allWorkers()) {
2461       if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2462         found = true;
2463         ex << "\n  " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2464       }
2465     }
2466     if (found) {
2467       ex << "\n\nThe situation can be fixed by either\n"
2468          << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2469          << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2470       throw ex;
2471     }
2472   }
2473 
2474   void EventProcessor::warnAboutModulesRequiringRunSynchronization() const {
2475     std::unique_ptr<LogSystem> s;
2476     for (auto worker : schedule_->allWorkers()) {
2477       if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2478         if (not s) {
2479           s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2480           (*s) << "The following modules require synchronizing on Run boundaries:";
2481         }
2482         (*s) << "\n  " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2483       }
2484     }
2485   }
2486 }  // namespace edm