Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-01-11 16:27:14

0001 #include "FWCore/Framework/interface/EventProcessor.h"
0002 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0003 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0004 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0005 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0007 #include "DataFormats/Provenance/interface/SubProcessParentageHelper.h"
0008 
0009 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0010 #include "FWCore/Framework/src/CommonParams.h"
0011 #include "FWCore/Framework/interface/EDLooperBase.h"
0012 #include "FWCore/Framework/interface/EventPrincipal.h"
0013 #include "FWCore/Framework/interface/EventSetupProvider.h"
0014 #include "FWCore/Framework/interface/EventSetupRecord.h"
0015 #include "FWCore/Framework/interface/FileBlock.h"
0016 #include "FWCore/Framework/interface/HistoryAppender.h"
0017 #include "FWCore/Framework/interface/InputSourceDescription.h"
0018 #include "FWCore/Framework/interface/IOVSyncValue.h"
0019 #include "FWCore/Framework/interface/LooperFactory.h"
0020 #include "FWCore/Framework/interface/LuminosityBlock.h"
0021 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0022 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0023 #include "FWCore/Framework/interface/ModuleChanger.h"
0024 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0025 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0026 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0027 #include "FWCore/Framework/interface/ProcessingController.h"
0028 #include "FWCore/Framework/interface/RunPrincipal.h"
0029 #include "FWCore/Framework/interface/Schedule.h"
0030 #include "FWCore/Framework/interface/ScheduleInfo.h"
0031 #include "FWCore/Framework/interface/ScheduleItems.h"
0032 #include "FWCore/Framework/interface/SubProcess.h"
0033 #include "FWCore/Framework/interface/Event.h"
0034 #include "FWCore/Framework/interface/ESRecordsToProxyIndices.h"
0035 #include "FWCore/Framework/src/Breakpoints.h"
0036 #include "FWCore/Framework/interface/EventSetupsController.h"
0037 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0038 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0039 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0040 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0041 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0042 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0043 #include "FWCore/Framework/interface/TriggerNamesService.h"
0044 #include "FWCore/Framework/src/SendSourceTerminationSignalIfException.h"
0045 
0046 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0047 
0048 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0049 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
0050 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0051 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0052 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0053 #include "FWCore/ParameterSet/interface/Registry.h"
0054 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0055 
0056 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0057 #include "FWCore/ServiceRegistry/interface/Service.h"
0058 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0059 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0060 
0061 #include "FWCore/Concurrency/interface/WaitingTask.h"
0062 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0063 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0064 #include "FWCore/Concurrency/interface/chain_first.h"
0065 
0066 #include "FWCore/Utilities/interface/Algorithms.h"
0067 #include "FWCore/Utilities/interface/DebugMacros.h"
0068 #include "FWCore/Utilities/interface/EDMException.h"
0069 #include "FWCore/Utilities/interface/Exception.h"
0070 #include "FWCore/Utilities/interface/ConvertException.h"
0071 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0072 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0073 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0074 #include "FWCore/Utilities/interface/StreamID.h"
0075 #include "FWCore/Utilities/interface/RootHandlers.h"
0076 #include "FWCore/Utilities/interface/propagate_const.h"
0077 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0078 
0079 #include "MessageForSource.h"
0080 #include "MessageForParent.h"
0081 #include "LuminosityBlockProcessingStatus.h"
0082 #include "RunProcessingStatus.h"
0083 
0084 #include "boost/range/adaptor/reversed.hpp"
0085 
0086 #include <cassert>
0087 #include <exception>
0088 #include <iomanip>
0089 #include <iostream>
0090 #include <utility>
0091 #include <sstream>
0092 
0093 #include <sys/ipc.h>
0094 #include <sys/msg.h>
0095 
0096 #include "oneapi/tbb/task.h"
0097 
0098 //Used for CPU affinity
0099 #ifndef __APPLE__
0100 #include <sched.h>
0101 #endif
0102 
0103 namespace {
0104   class PauseQueueSentry {
0105   public:
0106     PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
0107     ~PauseQueueSentry() { queue_.resume(); }
0108 
0109   private:
0110     edm::SerialTaskQueue& queue_;
0111   };
0112 }  // namespace
0113 
0114 namespace edm {
0115 
0116   namespace chain = waiting_task::chain;
0117 
0118   // ---------------------------------------------------------------
0119   std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
0120                                          ParameterSet& params,
0121                                          CommonParams const& common,
0122                                          std::shared_ptr<ProductRegistry> preg,
0123                                          std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0124                                          std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0125                                          std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0126                                          std::shared_ptr<ActivityRegistry> areg,
0127                                          std::shared_ptr<ProcessConfiguration const> processConfiguration,
0128                                          PreallocationConfiguration const& allocations) {
0129     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0130     if (main_input == nullptr) {
0131       throw Exception(errors::Configuration)
0132           << "There must be exactly one source in the configuration.\n"
0133           << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0134     }
0135 
0136     std::string modtype(main_input->getParameter<std::string>("@module_type"));
0137 
0138     std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0139         ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0140     ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0141     filler->fill(descriptions);
0142 
0143     try {
0144       convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
0145     } catch (cms::Exception& iException) {
0146       std::ostringstream ost;
0147       ost << "Validating configuration of input source of type " << modtype;
0148       iException.addContext(ost.str());
0149       throw;
0150     }
0151 
0152     main_input->registerIt();
0153 
0154     // Fill in "ModuleDescription", in case the input source produces
0155     // any EDProducts, which would be registered in the ProductRegistry.
0156     // Also fill in the process history item for this process.
0157     // There is no module label for the unnamed input source, so
0158     // just use "source".
0159     // Only the tracked parameters belong in the process configuration.
0160     ModuleDescription md(main_input->id(),
0161                          main_input->getParameter<std::string>("@module_type"),
0162                          "source",
0163                          processConfiguration.get(),
0164                          moduleIndex);
0165 
0166     InputSourceDescription isdesc(md,
0167                                   preg,
0168                                   branchIDListHelper,
0169                                   processBlockHelper,
0170                                   thinnedAssociationsHelper,
0171                                   areg,
0172                                   common.maxEventsInput_,
0173                                   common.maxLumisInput_,
0174                                   common.maxSecondsUntilRampdown_,
0175                                   allocations);
0176 
0177     areg->preSourceConstructionSignal_(md);
0178     std::unique_ptr<InputSource> input;
0179     try {
0180       //even if we have an exception, send the signal
0181       std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
0182       convertException::wrap([&]() {
0183         input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
0184         input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
0185         input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
0186       });
0187     } catch (cms::Exception& iException) {
0188       std::ostringstream ost;
0189       ost << "Constructing input source of type " << modtype;
0190       iException.addContext(ost.str());
0191       throw;
0192     }
0193     return input;
0194   }
0195 
0196   // ---------------------------------------------------------------
0197   std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
0198                                            eventsetup::EventSetupProvider& cp,
0199                                            ParameterSet& params,
0200                                            std::vector<std::string> const& loopers) {
0201     std::shared_ptr<EDLooperBase> vLooper;
0202 
0203     assert(1 == loopers.size());
0204 
0205     for (auto const& looperName : loopers) {
0206       ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
0207       // Unlikely we would ever need the ModuleTypeResolver in Looper
0208       vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
0209     }
0210     return vLooper;
0211   }
0212 
0213   // ---------------------------------------------------------------
0214   EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,  //std::string const& config,
0215                                  ServiceToken const& iToken,
0216                                  serviceregistry::ServiceLegacy iLegacy,
0217                                  std::vector<std::string> const& defaultServices,
0218                                  std::vector<std::string> const& forcedServices)
0219       : actReg_(),
0220         preg_(),
0221         branchIDListHelper_(),
0222         serviceToken_(),
0223         input_(),
0224         moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0225         espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0226         esp_(),
0227         act_table_(),
0228         processConfiguration_(),
0229         schedule_(),
0230         subProcesses_(),
0231         historyAppender_(new HistoryAppender),
0232         fb_(),
0233         looper_(),
0234         deferredExceptionPtrIsSet_(false),
0235         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0236         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0237         principalCache_(),
0238         beginJobCalled_(false),
0239         shouldWeStop_(false),
0240         fileModeNoMerge_(false),
0241         exceptionMessageFiles_(),
0242         exceptionMessageRuns_(false),
0243         exceptionMessageLumis_(false),
0244         forceLooperToEnd_(false),
0245         looperBeginJobRun_(false),
0246         forceESCacheClearOnNewRun_(false),
0247         eventSetupDataToExcludeFromPrefetching_() {
0248     auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0249     processDesc->addServices(defaultServices, forcedServices);
0250     init(processDesc, iToken, iLegacy);
0251   }
0252 
0253   EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,  //std::string const& config,
0254                                  std::vector<std::string> const& defaultServices,
0255                                  std::vector<std::string> const& forcedServices)
0256       : actReg_(),
0257         preg_(),
0258         branchIDListHelper_(),
0259         serviceToken_(),
0260         input_(),
0261         moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
0262         espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0263         esp_(),
0264         act_table_(),
0265         processConfiguration_(),
0266         schedule_(),
0267         subProcesses_(),
0268         historyAppender_(new HistoryAppender),
0269         fb_(),
0270         looper_(),
0271         deferredExceptionPtrIsSet_(false),
0272         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0273         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0274         principalCache_(),
0275         beginJobCalled_(false),
0276         shouldWeStop_(false),
0277         fileModeNoMerge_(false),
0278         exceptionMessageFiles_(),
0279         exceptionMessageRuns_(false),
0280         exceptionMessageLumis_(false),
0281         forceLooperToEnd_(false),
0282         looperBeginJobRun_(false),
0283         forceESCacheClearOnNewRun_(false),
0284         eventSetupDataToExcludeFromPrefetching_() {
0285     auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0286     processDesc->addServices(defaultServices, forcedServices);
0287     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
0288   }
0289 
0290   EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0291                                  ServiceToken const& token,
0292                                  serviceregistry::ServiceLegacy legacy)
0293       : actReg_(),
0294         preg_(),
0295         branchIDListHelper_(),
0296         serviceToken_(),
0297         input_(),
0298         moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
0299         espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
0300         esp_(),
0301         act_table_(),
0302         processConfiguration_(),
0303         schedule_(),
0304         subProcesses_(),
0305         historyAppender_(new HistoryAppender),
0306         fb_(),
0307         looper_(),
0308         deferredExceptionPtrIsSet_(false),
0309         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0310         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0311         principalCache_(),
0312         beginJobCalled_(false),
0313         shouldWeStop_(false),
0314         fileModeNoMerge_(false),
0315         exceptionMessageFiles_(),
0316         exceptionMessageRuns_(false),
0317         exceptionMessageLumis_(false),
0318         forceLooperToEnd_(false),
0319         looperBeginJobRun_(false),
0320         forceESCacheClearOnNewRun_(false),
0321         eventSetupDataToExcludeFromPrefetching_() {
0322     init(processDesc, token, legacy);
0323   }
0324 
0325   void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
0326                             ServiceToken const& iToken,
0327                             serviceregistry::ServiceLegacy iLegacy) {
0328     //std::cerr << processDesc->dump() << std::endl;
0329 
0330     // register the empty parentage vector , once and for all
0331     ParentageRegistry::instance()->insertMapped(Parentage());
0332 
0333     // register the empty parameter set, once and for all.
0334     ParameterSet().registerIt();
0335 
0336     std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
0337 
0338     // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
0339     auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
0340     bool const hasSubProcesses = !subProcessVParameterSet.empty();
0341 
0342     // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
0343     // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
0344     // set in here if the parameters were not explicitly set.
0345     validateTopLevelParameterSets(parameterSet.get());
0346 
0347     // Now set some parameters specific to the main process.
0348     ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
0349     auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
0350     if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
0351       throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
0352           << fileMode << ".\n"
0353           << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
0354     } else {
0355       fileModeNoMerge_ = (fileMode == "NOMERGE");
0356     }
0357     forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
0358     ensureAvailableAccelerators(*parameterSet);
0359 
0360     //threading
0361     unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
0362 
0363     // Even if numberOfThreads was set to zero in the Python configuration, the code
0364     // in cmsRun.cpp should have reset it to something else.
0365     assert(nThreads != 0);
0366 
0367     unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
0368     if (nStreams == 0) {
0369       nStreams = nThreads;
0370     }
0371     unsigned int nConcurrentLumis =
0372         optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
0373     if (nConcurrentLumis == 0) {
0374       nConcurrentLumis = 2;
0375     }
0376     if (nConcurrentLumis > nStreams) {
0377       nConcurrentLumis = nStreams;
0378     }
0379     unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
0380     if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
0381       nConcurrentRuns = nConcurrentLumis;
0382     }
0383     std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
0384     if (!loopers.empty()) {
0385       //For now loopers make us run only 1 transition at a time
0386       if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
0387         edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
0388                                                 "of concurrent runs, and the number of concurrent lumis "
0389                                                 "are all being reset to 1. Loopers cannot currently support "
0390                                                 "values greater than 1.";
0391         nStreams = 1;
0392         nConcurrentLumis = 1;
0393         nConcurrentRuns = 1;
0394       }
0395     }
0396     bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
0397     if (dumpOptions) {
0398       dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
0399     } else {
0400       if (nThreads > 1 or nStreams > 1) {
0401         edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
0402       }
0403     }
0404 
0405     // The number of concurrent IOVs is configured individually for each record in
0406     // the class NumberOfConcurrentIOVs to values less than or equal to this.
0407     // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
0408     // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
0409     // concurrent run past the first in use cases where IOVs change within a run.
0410     unsigned int maxConcurrentIOVs =
0411         3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
0412 
0413     IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
0414 
0415     printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
0416     deleteNonConsumedUnscheduledModules_ =
0417         optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
0418     //for now, if have a subProcess, don't allow early delete
0419     // In the future we should use the SubProcess's 'keep list' to decide what can be kept
0420     if (not hasSubProcesses) {
0421       branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
0422     }
0423     if (not branchesToDeleteEarly_.empty()) {
0424       auto referencePSets =
0425           optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
0426       for (auto const& pset : referencePSets) {
0427         auto product = pset.getParameter<std::string>("product");
0428         auto references = pset.getParameter<std::vector<std::string>>("references");
0429         for (auto const& ref : references) {
0430           referencesToBranches_.emplace(product, ref);
0431         }
0432       }
0433       modulesToIgnoreForDeleteEarly_ =
0434           optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
0435     }
0436 
0437     // Now do general initialization
0438     ScheduleItems items;
0439 
0440     //initialize the services
0441     auto& serviceSets = processDesc->getServicesPSets();
0442     ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
0443     serviceToken_ = items.addCPRandTNS(*parameterSet, token);
0444 
0445     //make the services available
0446     ServiceRegistry::Operate operate(serviceToken_);
0447 
0448     CMS_SA_ALLOW try {
0449       if (nThreads > 1) {
0450         edm::Service<RootHandlers> handler;
0451         handler->willBeUsingThreads();
0452       }
0453 
0454       // intialize miscellaneous items
0455       std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
0456 
0457       // intialize the event setup provider
0458       ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
0459       esp_ = espController_->makeProvider(
0460           *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
0461 
0462       // initialize the looper, if any
0463       if (!loopers.empty()) {
0464         looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
0465         looper_->setActionTable(items.act_table_.get());
0466         looper_->attachTo(*items.actReg_);
0467 
0468         // in presence of looper do not delete modules
0469         deleteNonConsumedUnscheduledModules_ = false;
0470       }
0471 
0472       preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0473 
0474       runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
0475       lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
0476       streamQueues_.resize(nStreams);
0477       streamRunStatus_.resize(nStreams);
0478       streamLumiStatus_.resize(nStreams);
0479 
0480       processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0481 
0482       {
0483         std::optional<ScheduleItems::MadeModules> madeModules;
0484 
0485         //setup input and modules concurrently
0486         tbb::task_group group;
0487 
0488         // initialize the input source
0489         auto tempReg = std::make_shared<ProductRegistry>();
0490         auto sourceID = ModuleDescription::getUniqueID();
0491 
0492         group.run([&, this]() {
0493           // initialize the Schedule
0494           ServiceRegistry::Operate operate(serviceToken_);
0495           auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0496           madeModules =
0497               items.initModules(*parameterSet, tns, preallocations_, &processContext_, moduleTypeResolverMaker_.get());
0498         });
0499 
0500         group.run([&, this, tempReg]() {
0501           ServiceRegistry::Operate operate(serviceToken_);
0502           input_ = makeInput(sourceID,
0503                              *parameterSet,
0504                              *common,
0505                              /*items.preg(),*/ tempReg,
0506                              items.branchIDListHelper(),
0507                              get_underlying_safe(processBlockHelper_),
0508                              items.thinnedAssociationsHelper(),
0509                              items.actReg_,
0510                              items.processConfiguration(),
0511                              preallocations_);
0512         });
0513 
0514         group.wait();
0515         items.preg()->addFromInput(*tempReg);
0516         input_->switchTo(items.preg());
0517 
0518         {
0519           auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
0520           schedule_ = items.finishSchedule(std::move(*madeModules),
0521                                            *parameterSet,
0522                                            tns,
0523                                            hasSubProcesses,
0524                                            preallocations_,
0525                                            &processContext_,
0526                                            *processBlockHelper_);
0527         }
0528       }
0529 
0530       // set the data members
0531       act_table_ = std::move(items.act_table_);
0532       actReg_ = items.actReg_;
0533       preg_ = items.preg();
0534       mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0535       branchIDListHelper_ = items.branchIDListHelper();
0536       thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0537       processConfiguration_ = items.processConfiguration();
0538       processContext_.setProcessConfiguration(processConfiguration_.get());
0539 
0540       FDEBUG(2) << parameterSet << std::endl;
0541 
0542       principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0543       for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0544         // Reusable event principal
0545         auto ep = std::make_shared<EventPrincipal>(preg(),
0546                                                    branchIDListHelper(),
0547                                                    thinnedAssociationsHelper(),
0548                                                    *processConfiguration_,
0549                                                    historyAppender_.get(),
0550                                                    index,
0551                                                    true /*primary process*/,
0552                                                    &*processBlockHelper_);
0553         principalCache_.insert(std::move(ep));
0554       }
0555 
0556       for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0557         auto rp = std::make_unique<RunPrincipal>(
0558             preg(), *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_);
0559         principalCache_.insert(std::move(rp));
0560       }
0561 
0562       for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0563         auto lp =
0564             std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
0565         principalCache_.insert(std::move(lp));
0566       }
0567 
0568       {
0569         auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0570         principalCache_.insert(std::move(pb));
0571 
0572         auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0573         principalCache_.insertForInput(std::move(pbForInput));
0574       }
0575 
0576       // fill the subprocesses, if there are any
0577       subProcesses_.reserve(subProcessVParameterSet.size());
0578       for (auto& subProcessPSet : subProcessVParameterSet) {
0579         subProcesses_.emplace_back(subProcessPSet,
0580                                    *parameterSet,
0581                                    preg(),
0582                                    branchIDListHelper(),
0583                                    *processBlockHelper_,
0584                                    *thinnedAssociationsHelper_,
0585                                    SubProcessParentageHelper(),
0586                                    *espController_,
0587                                    *actReg_,
0588                                    token,
0589                                    serviceregistry::kConfigurationOverrides,
0590                                    preallocations_,
0591                                    &processContext_,
0592                                    moduleTypeResolverMaker_);
0593       }
0594     } catch (...) {
0595       //in case of an exception, make sure Services are available
0596       // during the following destructors
0597       espController_ = nullptr;
0598       esp_ = nullptr;
0599       schedule_ = nullptr;
0600       input_ = nullptr;
0601       looper_ = nullptr;
0602       actReg_ = nullptr;
0603       throw;
0604     }
0605   }
0606 
0607   EventProcessor::~EventProcessor() {
0608     // Make the services available while everything is being deleted.
0609     ServiceToken token = getToken();
0610     ServiceRegistry::Operate op(token);
0611 
0612     // manually destroy all these thing that may need the services around
0613     // propagate_const<T> has no reset() function
0614     espController_ = nullptr;
0615     esp_ = nullptr;
0616     schedule_ = nullptr;
0617     input_ = nullptr;
0618     looper_ = nullptr;
0619     actReg_ = nullptr;
0620 
0621     pset::Registry::instance()->clear();
0622     ParentageRegistry::instance()->clear();
0623   }
0624 
0625   void EventProcessor::taskCleanup() {
0626     edm::FinalWaitingTask task{taskGroup_};
0627     espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
0628     task.waitNoThrow();
0629     assert(task.done());
0630   }
0631 
0632   void EventProcessor::beginJob() {
0633     if (beginJobCalled_)
0634       return;
0635     beginJobCalled_ = true;
0636     bk::beginJob();
0637 
0638     // StateSentry toerror(this); // should we add this ?
0639     //make the services available
0640     ServiceRegistry::Operate operate(serviceToken_);
0641 
0642     service::SystemBounds bounds(preallocations_.numberOfStreams(),
0643                                  preallocations_.numberOfLuminosityBlocks(),
0644                                  preallocations_.numberOfRuns(),
0645                                  preallocations_.numberOfThreads());
0646     actReg_->preallocateSignal_(bounds);
0647     schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0648     pathsAndConsumesOfModules_.initialize(schedule_.get(), preg());
0649 
0650     std::vector<ModuleProcessName> consumedBySubProcesses;
0651     for_all(subProcesses_,
0652             [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
0653               auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
0654               if (consumedBySubProcesses.empty()) {
0655                 consumedBySubProcesses = std::move(c);
0656               } else if (not c.empty()) {
0657                 std::vector<ModuleProcessName> tmp;
0658                 tmp.reserve(consumedBySubProcesses.size() + c.size());
0659                 std::merge(consumedBySubProcesses.begin(),
0660                            consumedBySubProcesses.end(),
0661                            c.begin(),
0662                            c.end(),
0663                            std::back_inserter(tmp));
0664                 std::swap(consumedBySubProcesses, tmp);
0665               }
0666             });
0667 
0668     // Note: all these may throw
0669     checkForModuleDependencyCorrectness(pathsAndConsumesOfModules_, printDependencies_);
0670     if (deleteNonConsumedUnscheduledModules_) {
0671       if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
0672           not unusedModules.empty()) {
0673         pathsAndConsumesOfModules_.removeModules(unusedModules);
0674 
0675         edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
0676           l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
0677                "and "
0678                "therefore they are deleted before beginJob transition.";
0679           for (auto const& description : unusedModules) {
0680             l << "\n " << description->moduleLabel();
0681           }
0682         });
0683         for (auto const& description : unusedModules) {
0684           schedule_->deleteModule(description->moduleLabel(), actReg_.get());
0685         }
0686       }
0687     }
0688     // Initialize after the deletion of non-consumed unscheduled
0689     // modules to avoid non-consumed non-run modules to keep the
0690     // products unnecessarily alive
0691     if (not branchesToDeleteEarly_.empty()) {
0692       auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
0693       auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
0694       auto referencesToBranches = std::move(referencesToBranches_);
0695       schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
0696     }
0697 
0698     actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
0699 
0700     if (preallocations_.numberOfLuminosityBlocks() > 1) {
0701       throwAboutModulesRequiringLuminosityBlockSynchronization();
0702     }
0703     if (preallocations_.numberOfRuns() > 1) {
0704       warnAboutModulesRequiringRunSynchronization();
0705     }
0706     warnAboutLegacyModules();
0707 
0708     //NOTE:  This implementation assumes 'Job' means one call
0709     // the EventProcessor::run
0710     // If it really means once per 'application' then this code will
0711     // have to be changed.
0712     // Also have to deal with case where have 'run' then new Module
0713     // added and do 'run'
0714     // again.  In that case the newly added Module needs its 'beginJob'
0715     // to be called.
0716 
0717     //NOTE: in future we should have a beginOfJob for looper that takes no arguments
0718     //  For now we delay calling beginOfJob until first beginOfRun
0719     //if(looper_) {
0720     //   looper_->beginOfJob(es);
0721     //}
0722     try {
0723       convertException::wrap([&]() { input_->doBeginJob(); });
0724     } catch (cms::Exception& ex) {
0725       ex.addContext("Calling beginJob for the source");
0726       throw;
0727     }
0728     espController_->finishConfiguration();
0729     schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
0730     if (looper_) {
0731       constexpr bool mustPrefetchMayGet = true;
0732       auto const processBlockLookup = preg_->productLookup(InProcess);
0733       auto const runLookup = preg_->productLookup(InRun);
0734       auto const lumiLookup = preg_->productLookup(InLumi);
0735       auto const eventLookup = preg_->productLookup(InEvent);
0736       looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
0737       looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
0738       looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
0739       looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
0740       looper_->updateLookup(esp_->recordsToProxyIndices());
0741     }
0742     // toerror.succeeded(); // should we add this?
0743     for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
0744     actReg_->postBeginJobSignal_();
0745 
0746     oneapi::tbb::task_group group;
0747     FinalWaitingTask last{group};
0748     using namespace edm::waiting_task::chain;
0749     first([this](auto nextTask) {
0750       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0751         first([i, this](auto nextTask) {
0752           ServiceRegistry::Operate operate(serviceToken_);
0753           schedule_->beginStream(i);
0754         }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
0755           ServiceRegistry::Operate operate(serviceToken_);
0756           for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
0757         }) | lastTask(nextTask);
0758       }
0759     }) | runLast(WaitingTaskHolder(group, &last));
0760     last.wait();
0761   }
0762 
0763   void EventProcessor::endJob() {
0764     // Collects exceptions, so we don't throw before all operations are performed.
0765     ExceptionCollector c(
0766         "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0767 
0768     //make the services available
0769     ServiceRegistry::Operate operate(serviceToken_);
0770 
0771     using namespace edm::waiting_task::chain;
0772 
0773     oneapi::tbb::task_group group;
0774     edm::FinalWaitingTask waitTask{group};
0775 
0776     {
0777       //handle endStream transitions
0778       edm::WaitingTaskHolder taskHolder(group, &waitTask);
0779       std::mutex collectorMutex;
0780       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0781         first([this, i, &c, &collectorMutex](auto nextTask) {
0782           std::exception_ptr ep;
0783           try {
0784             ServiceRegistry::Operate operate(serviceToken_);
0785             this->schedule_->endStream(i);
0786           } catch (...) {
0787             ep = std::current_exception();
0788           }
0789           if (ep) {
0790             std::lock_guard<std::mutex> l(collectorMutex);
0791             c.call([&ep]() { std::rethrow_exception(ep); });
0792           }
0793         }) | then([this, i, &c, &collectorMutex](auto nextTask) {
0794           for (auto& subProcess : subProcesses_) {
0795             first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
0796               std::exception_ptr ep;
0797               try {
0798                 ServiceRegistry::Operate operate(serviceToken_);
0799                 subProcess.doEndStream(i);
0800               } catch (...) {
0801                 ep = std::current_exception();
0802               }
0803               if (ep) {
0804                 std::lock_guard<std::mutex> l(collectorMutex);
0805                 c.call([&ep]() { std::rethrow_exception(ep); });
0806               }
0807             }) | lastTask(nextTask);
0808           }
0809         }) | lastTask(taskHolder);
0810       }
0811     }
0812     waitTask.waitNoThrow();
0813 
0814     auto actReg = actReg_.get();
0815     c.call([actReg]() { actReg->preEndJobSignal_(); });
0816     schedule_->endJob(c);
0817     for (auto& subProcess : subProcesses_) {
0818       c.call(std::bind(&SubProcess::doEndJob, &subProcess));
0819     }
0820     c.call(std::bind(&InputSource::doEndJob, input_.get()));
0821     if (looper_) {
0822       c.call(std::bind(&EDLooperBase::endOfJob, looper()));
0823     }
0824     c.call([actReg]() { actReg->postEndJobSignal_(); });
0825     if (c.hasThrown()) {
0826       c.rethrow();
0827     }
0828   }
0829 
0830   ServiceToken EventProcessor::getToken() { return serviceToken_; }
0831 
0832   std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
0833     return schedule_->getAllModuleDescriptions();
0834   }
0835 
0836   int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
0837 
0838   int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
0839 
0840   int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
0841 
0842   void EventProcessor::clearCounters() { schedule_->clearCounters(); }
0843 
0844   namespace {
0845 #include "TransitionProcessors.icc"
0846   }
0847 
0848   bool EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
0849     bool returnValue = false;
0850 
0851     // Look for a shutdown signal
0852     if (shutdown_flag.load(std::memory_order_acquire)) {
0853       returnValue = true;
0854       returnCode = epSignal;
0855     }
0856     return returnValue;
0857   }
0858 
0859   InputSource::ItemType EventProcessor::nextTransitionType() {
0860     SendSourceTerminationSignalIfException sentry(actReg_.get());
0861     InputSource::ItemType itemType;
0862     //For now, do nothing with InputSource::IsSynchronize
0863     do {
0864       itemType = input_->nextItemType();
0865     } while (itemType == InputSource::IsSynchronize);
0866 
0867     lastSourceTransition_ = itemType;
0868     sentry.completedSuccessfully();
0869 
0870     StatusCode returnCode = epSuccess;
0871 
0872     if (checkForAsyncStopRequest(returnCode)) {
0873       actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
0874       lastSourceTransition_ = InputSource::IsStop;
0875     }
0876 
0877     return lastSourceTransition_;
0878   }
0879 
0880   EventProcessor::StatusCode EventProcessor::runToCompletion() {
0881     beginJob();  //make sure this was called
0882 
0883     // make the services available
0884     ServiceRegistry::Operate operate(serviceToken_);
0885 
0886     try {
0887       FilesProcessor fp(fileModeNoMerge_);
0888 
0889       convertException::wrap([&]() {
0890         bool firstTime = true;
0891         do {
0892           if (not firstTime) {
0893             prepareForNextLoop();
0894             rewindInput();
0895           } else {
0896             firstTime = false;
0897           }
0898           startingNewLoop();
0899 
0900           auto trans = fp.processFiles(*this);
0901 
0902           fp.normalEnd();
0903 
0904           if (deferredExceptionPtrIsSet_.load()) {
0905             std::rethrow_exception(deferredExceptionPtr_);
0906           }
0907           if (trans != InputSource::IsStop) {
0908             //problem with the source
0909             doErrorStuff();
0910 
0911             throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
0912           }
0913         } while (not endOfLoop());
0914       });  // convertException::wrap
0915 
0916     }  // Try block
0917     catch (cms::Exception& e) {
0918       if (exceptionMessageLumis_) {
0919         std::string message(
0920             "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
0921         e.addAdditionalInfo(message);
0922         if (e.alreadyPrinted()) {
0923           LogAbsolute("Additional Exceptions") << message;
0924         }
0925       }
0926       if (exceptionMessageRuns_) {
0927         std::string message(
0928             "Another exception was caught while trying to clean up runs after the primary fatal exception.");
0929         e.addAdditionalInfo(message);
0930         if (e.alreadyPrinted()) {
0931           LogAbsolute("Additional Exceptions") << message;
0932         }
0933       }
0934       if (!exceptionMessageFiles_.empty()) {
0935         e.addAdditionalInfo(exceptionMessageFiles_);
0936         if (e.alreadyPrinted()) {
0937           LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
0938         }
0939       }
0940       throw;
0941     }
0942     return epSuccess;
0943   }
0944 
0945   void EventProcessor::readFile() {
0946     FDEBUG(1) << " \treadFile\n";
0947     size_t size = preg_->size();
0948     SendSourceTerminationSignalIfException sentry(actReg_.get());
0949 
0950     if (streamRunActive_ > 0) {
0951       streamRunStatus_[0]->runPrincipal()->preReadFile();
0952       streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
0953     }
0954 
0955     if (streamLumiActive_ > 0) {
0956       streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
0957     }
0958 
0959     fb_ = input_->readFile();
0960     if (size < preg_->size()) {
0961       principalCache_.adjustIndexesAfterProductRegistryAddition();
0962     }
0963     principalCache_.adjustEventsToNewProductRegistry(preg());
0964     if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
0965       fb_->setNotFastClonable(FileBlock::ParallelProcesses);
0966     }
0967     sentry.completedSuccessfully();
0968   }
0969 
0970   void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
0971     if (fileBlockValid()) {
0972       SendSourceTerminationSignalIfException sentry(actReg_.get());
0973       input_->closeFile(fb_.get(), cleaningUpAfterException);
0974       sentry.completedSuccessfully();
0975     }
0976     FDEBUG(1) << "\tcloseInputFile\n";
0977   }
0978 
0979   void EventProcessor::openOutputFiles() {
0980     if (fileBlockValid()) {
0981       schedule_->openOutputFiles(*fb_);
0982       for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
0983     }
0984     FDEBUG(1) << "\topenOutputFiles\n";
0985   }
0986 
0987   void EventProcessor::closeOutputFiles() {
0988     schedule_->closeOutputFiles();
0989     for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
0990     processBlockHelper_->clearAfterOutputFilesClose();
0991     FDEBUG(1) << "\tcloseOutputFiles\n";
0992   }
0993 
0994   void EventProcessor::respondToOpenInputFile() {
0995     if (fileBlockValid()) {
0996       for_all(subProcesses_,
0997               [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
0998       schedule_->respondToOpenInputFile(*fb_);
0999       for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1000     }
1001     FDEBUG(1) << "\trespondToOpenInputFile\n";
1002   }
1003 
1004   void EventProcessor::respondToCloseInputFile() {
1005     if (fileBlockValid()) {
1006       schedule_->respondToCloseInputFile(*fb_);
1007       for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1008     }
1009     FDEBUG(1) << "\trespondToCloseInputFile\n";
1010   }
1011 
1012   void EventProcessor::startingNewLoop() {
1013     shouldWeStop_ = false;
1014     //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1015     // until after we've called beginOfJob
1016     if (looper_ && looperBeginJobRun_) {
1017       looper_->doStartingNewLoop();
1018     }
1019     FDEBUG(1) << "\tstartingNewLoop\n";
1020   }
1021 
1022   bool EventProcessor::endOfLoop() {
1023     if (looper_) {
1024       ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
1025       looper_->setModuleChanger(&changer);
1026       EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1027       looper_->setModuleChanger(nullptr);
1028       if (status != EDLooperBase::kContinue || forceLooperToEnd_)
1029         return true;
1030       else
1031         return false;
1032     }
1033     FDEBUG(1) << "\tendOfLoop\n";
1034     return true;
1035   }
1036 
1037   void EventProcessor::rewindInput() {
1038     input_->repeat();
1039     input_->rewind();
1040     FDEBUG(1) << "\trewind\n";
1041   }
1042 
1043   void EventProcessor::prepareForNextLoop() {
1044     looper_->prepareForNextLoop(esp_.get());
1045     FDEBUG(1) << "\tprepareForNextLoop\n";
1046   }
1047 
1048   bool EventProcessor::shouldWeCloseOutput() const {
1049     FDEBUG(1) << "\tshouldWeCloseOutput\n";
1050     if (!subProcesses_.empty()) {
1051       for (auto const& subProcess : subProcesses_) {
1052         if (subProcess.shouldWeCloseOutput()) {
1053           return true;
1054         }
1055       }
1056       return false;
1057     }
1058     return schedule_->shouldWeCloseOutput();
1059   }
1060 
1061   void EventProcessor::doErrorStuff() {
1062     FDEBUG(1) << "\tdoErrorStuff\n";
1063     LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1064                              << "and went to the error state\n"
1065                              << "Will attempt to terminate processing normally\n"
1066                              << "(IF using the looper the next loop will be attempted)\n"
1067                              << "This likely indicates a bug in an input module or corrupted input or both\n";
1068   }
1069 
1070   void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1071     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1072     processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1073 
1074     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1075     FinalWaitingTask globalWaitTask{taskGroup_};
1076 
1077     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1078     beginGlobalTransitionAsync<Traits>(
1079         WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1080 
1081     globalWaitTask.wait();
1082     beginProcessBlockSucceeded = true;
1083   }
1084 
1085   void EventProcessor::inputProcessBlocks() {
1086     input_->fillProcessBlockHelper();
1087     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1088     while (input_->nextProcessBlock(processBlockPrincipal)) {
1089       readProcessBlock(processBlockPrincipal);
1090 
1091       using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1092       FinalWaitingTask globalWaitTask{taskGroup_};
1093 
1094       ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1095       beginGlobalTransitionAsync<Traits>(
1096           WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1097 
1098       globalWaitTask.wait();
1099 
1100       FinalWaitingTask writeWaitTask{taskGroup_};
1101       writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::Input);
1102       writeWaitTask.wait();
1103 
1104       processBlockPrincipal.clearPrincipal();
1105       for (auto& s : subProcesses_) {
1106         s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1107       }
1108     }
1109   }
1110 
1111   void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1112     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1113 
1114     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1115     FinalWaitingTask globalWaitTask{taskGroup_};
1116 
1117     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1118     endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1119                                      *schedule_,
1120                                      transitionInfo,
1121                                      serviceToken_,
1122                                      subProcesses_,
1123                                      cleaningUpAfterException);
1124     globalWaitTask.wait();
1125 
1126     if (beginProcessBlockSucceeded) {
1127       FinalWaitingTask writeWaitTask{taskGroup_};
1128       writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::New);
1129       writeWaitTask.wait();
1130     }
1131 
1132     processBlockPrincipal.clearPrincipal();
1133     for (auto& s : subProcesses_) {
1134       s.clearProcessBlockPrincipal(ProcessBlockType::New);
1135     }
1136   }
1137 
1138   InputSource::ItemType EventProcessor::processRuns() {
1139     FinalWaitingTask waitTask{taskGroup_};
1140     assert(lastTransitionType() == InputSource::IsRun);
1141     if (streamRunActive_ == 0) {
1142       assert(streamLumiActive_ == 0);
1143 
1144       beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1145                     WaitingTaskHolder{taskGroup_, &waitTask});
1146     } else {
1147       assert(streamRunActive_ == preallocations_.numberOfStreams());
1148 
1149       auto runStatus = streamRunStatus_[0];
1150 
1151       while (lastTransitionType() == InputSource::IsRun and runStatus->runPrincipal()->run() == input_->run() and
1152              runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1153         readAndMergeRun(*runStatus);
1154         nextTransitionType();
1155       }
1156 
1157       WaitingTaskHolder holder{taskGroup_, &waitTask};
1158       runStatus->setHolderOfTaskInProcessRuns(holder);
1159       if (streamLumiActive_ > 0) {
1160         assert(streamLumiActive_ == preallocations_.numberOfStreams());
1161         continueLumiAsync(std::move(holder));
1162       } else {
1163         handleNextItemAfterMergingRunEntries(std::move(runStatus), std::move(holder));
1164       }
1165     }
1166     waitTask.wait();
1167     return lastTransitionType();
1168   }
1169 
1170   void EventProcessor::beginRunAsync(IOVSyncValue const& iSync, WaitingTaskHolder iHolder) {
1171     if (iHolder.taskHasFailed()) {
1172       return;
1173     }
1174 
1175     actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1176 
1177     auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1178 
1179     chain::first([this, &status, &iSync](auto nextTask) {
1180       espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1181                                                            nextTask,
1182                                                            status->endIOVWaitingTasks(),
1183                                                            status->eventSetupImpls(),
1184                                                            queueWhichWaitsForIOVsToFinish_,
1185                                                            actReg_.get(),
1186                                                            serviceToken_,
1187                                                            forceESCacheClearOnNewRun_);
1188     }) | chain::then([this, status, iSync](std::exception_ptr const* iException, auto nextTask) {
1189       CMS_SA_ALLOW try {
1190         if (iException) {
1191           WaitingTaskHolder copyHolder(nextTask);
1192           copyHolder.doneWaiting(*iException);
1193           // Finish handling the exception in the task pushed to runQueue_
1194         }
1195         ServiceRegistry::Operate operate(serviceToken_);
1196         actReg_->postESSyncIOVSignal_.emit(iSync);
1197 
1198         runQueue_->pushAndPause(
1199             *nextTask.group(),
1200             [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1201               CMS_SA_ALLOW try {
1202                 if (postRunQueueTask.taskHasFailed()) {
1203                   status->resetBeginResources();
1204                   queueWhichWaitsForIOVsToFinish_.resume();
1205                   return;
1206                 }
1207 
1208                 status->setResumer(std::move(iResumer));
1209 
1210                 sourceResourcesAcquirer_.serialQueueChain().push(
1211                     *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1212                       CMS_SA_ALLOW try {
1213                         ServiceRegistry::Operate operate(serviceToken_);
1214 
1215                         if (postSourceTask.taskHasFailed()) {
1216                           status->resetBeginResources();
1217                           queueWhichWaitsForIOVsToFinish_.resume();
1218                           status->resumeGlobalRunQueue();
1219                           return;
1220                         }
1221 
1222                         status->setRunPrincipal(readRun());
1223 
1224                         RunPrincipal& runPrincipal = *status->runPrincipal();
1225                         {
1226                           SendSourceTerminationSignalIfException sentry(actReg_.get());
1227                           input_->doBeginRun(runPrincipal, &processContext_);
1228                           sentry.completedSuccessfully();
1229                         }
1230 
1231                         EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1232                         if (looper_ && looperBeginJobRun_ == false) {
1233                           looper_->copyInfo(ScheduleInfo(schedule_.get()));
1234 
1235                           oneapi::tbb::task_group group;
1236                           FinalWaitingTask waitTask{group};
1237                           using namespace edm::waiting_task::chain;
1238                           chain::first([this, &es](auto nextTask) {
1239                             looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1240                           }) | then([this, &es](auto nextTask) mutable {
1241                             looper_->beginOfJob(es);
1242                             looperBeginJobRun_ = true;
1243                             looper_->doStartingNewLoop();
1244                           }) | runLast(WaitingTaskHolder(group, &waitTask));
1245                           waitTask.wait();
1246                         }
1247 
1248                         using namespace edm::waiting_task::chain;
1249                         chain::first([this, status](auto nextTask) mutable {
1250                           CMS_SA_ALLOW try { readAndMergeRunEntriesAsync(std::move(status), nextTask); } catch (...) {
1251                             status->setStopBeforeProcessingRun(true);
1252                             nextTask.doneWaiting(std::current_exception());
1253                           }
1254                         }) | then([this, status, &es](auto nextTask) {
1255                           if (status->stopBeforeProcessingRun()) {
1256                             return;
1257                           }
1258                           RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1259                           using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1260                           beginGlobalTransitionAsync<Traits>(
1261                               nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1262                         }) | then([status](auto nextTask) mutable {
1263                           if (status->stopBeforeProcessingRun()) {
1264                             return;
1265                           }
1266                           status->globalBeginDidSucceed();
1267                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1268                           if (status->stopBeforeProcessingRun()) {
1269                             return;
1270                           }
1271                           looper_->prefetchAsync(
1272                               nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1273                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1274                           if (status->stopBeforeProcessingRun()) {
1275                             return;
1276                           }
1277                           ServiceRegistry::Operate operateLooper(serviceToken_);
1278                           looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1279                         }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1280                           bool precedingTasksSucceeded = true;
1281                           if (iException) {
1282                             precedingTasksSucceeded = false;
1283                             WaitingTaskHolder copyHolder(holder);
1284                             copyHolder.doneWaiting(*iException);
1285                           }
1286 
1287                           if (status->stopBeforeProcessingRun()) {
1288                             // We just quit now if there was a failure when merging runs
1289                             status->resetBeginResources();
1290                             queueWhichWaitsForIOVsToFinish_.resume();
1291                             status->resumeGlobalRunQueue();
1292                             return;
1293                           }
1294                           CMS_SA_ALLOW try {
1295                             // Under normal circumstances, this task runs after endRun has completed for all streams
1296                             // and global endLumi has completed for all lumis contained in this run
1297                             auto globalEndRunTask =
1298                                 edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1299                                   WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1300                                   status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1301                                   globalEndRunAsync(std::move(taskHolder), std::move(status));
1302                                 });
1303                             status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1304                           } catch (...) {
1305                             status->resetBeginResources();
1306                             queueWhichWaitsForIOVsToFinish_.resume();
1307                             status->resumeGlobalRunQueue();
1308                             holder.doneWaiting(std::current_exception());
1309                             return;
1310                           }
1311 
1312                           // After this point we are committed to end the run via endRunAsync
1313 
1314                           ServiceRegistry::Operate operate(serviceToken_);
1315 
1316                           // The only purpose of the pause is to cause stream begin run to execute before
1317                           // global begin lumi in the single threaded case (maintains consistency with
1318                           // the order that existed before concurrent runs were implemented).
1319                           PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1320 
1321                           CMS_SA_ALLOW try {
1322                             streamQueuesInserter_.push(
1323                                 *holder.group(), [this, status, precedingTasksSucceeded, holder]() mutable {
1324                                   for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1325                                     CMS_SA_ALLOW try {
1326                                       streamQueues_[i].push(
1327                                           *holder.group(),
1328                                           [this, i, status, precedingTasksSucceeded, holder]() mutable {
1329                                             streamBeginRunAsync(
1330                                                 i, std::move(status), precedingTasksSucceeded, std::move(holder));
1331                                           });
1332                                     } catch (...) {
1333                                       if (status->streamFinishedBeginRun()) {
1334                                         WaitingTaskHolder copyHolder(holder);
1335                                         copyHolder.doneWaiting(std::current_exception());
1336                                         status->resetBeginResources();
1337                                         queueWhichWaitsForIOVsToFinish_.resume();
1338                                         exceptionRunStatus_ = status;
1339                                       }
1340                                     }
1341                                   }
1342                                 });
1343                           } catch (...) {
1344                             WaitingTaskHolder copyHolder(holder);
1345                             copyHolder.doneWaiting(std::current_exception());
1346                             status->resetBeginResources();
1347                             queueWhichWaitsForIOVsToFinish_.resume();
1348                             exceptionRunStatus_ = status;
1349                           }
1350                           handleNextItemAfterMergingRunEntries(status, holder);
1351                         }) | runLast(postSourceTask);
1352                       } catch (...) {
1353                         status->resetBeginResources();
1354                         queueWhichWaitsForIOVsToFinish_.resume();
1355                         status->resumeGlobalRunQueue();
1356                         postSourceTask.doneWaiting(std::current_exception());
1357                       }
1358                     });  // task in sourceResourcesAcquirer
1359               } catch (...) {
1360                 status->resetBeginResources();
1361                 queueWhichWaitsForIOVsToFinish_.resume();
1362                 status->resumeGlobalRunQueue();
1363                 postRunQueueTask.doneWaiting(std::current_exception());
1364               }
1365             });  // task in runQueue
1366       } catch (...) {
1367         status->resetBeginResources();
1368         queueWhichWaitsForIOVsToFinish_.resume();
1369         nextTask.doneWaiting(std::current_exception());
1370       }
1371     }) | chain::runLast(std::move(iHolder));
1372   }
1373 
1374   void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1375                                            std::shared_ptr<RunProcessingStatus> status,
1376                                            bool precedingTasksSucceeded,
1377                                            WaitingTaskHolder iHolder) {
1378     // These shouldn't throw
1379     streamQueues_[iStream].pause();
1380     ++streamRunActive_;
1381     streamRunStatus_[iStream] = std::move(status);
1382 
1383     CMS_SA_ALLOW try {
1384       using namespace edm::waiting_task::chain;
1385       chain::first([this, iStream, precedingTasksSucceeded](auto nextTask) {
1386         if (precedingTasksSucceeded) {
1387           RunProcessingStatus& rs = *streamRunStatus_[iStream];
1388           RunTransitionInfo transitionInfo(
1389               *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1390           using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1391           beginStreamTransitionAsync<Traits>(
1392               std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1393         }
1394       }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1395         if (exceptionFromBeginStreamRun) {
1396           nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1397         }
1398         releaseBeginRunResources(iStream);
1399       }) | runLast(iHolder);
1400     } catch (...) {
1401       releaseBeginRunResources(iStream);
1402       iHolder.doneWaiting(std::current_exception());
1403     }
1404   }
1405 
1406   void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1407     auto& status = streamRunStatus_[iStream];
1408     if (status->streamFinishedBeginRun()) {
1409       status->resetBeginResources();
1410       queueWhichWaitsForIOVsToFinish_.resume();
1411     }
1412     streamQueues_[iStream].resume();
1413   }
1414 
1415   void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1416     RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1417     iRunStatus->setEndTime();
1418     IOVSyncValue ts(
1419         EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1420         runPrincipal.endTime());
1421     CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1422       WaitingTaskHolder copyHolder(iHolder);
1423       copyHolder.doneWaiting(std::current_exception());
1424     }
1425 
1426     chain::first([this, &iRunStatus, &ts](auto nextTask) {
1427       espController_->runOrQueueEventSetupForInstanceAsync(ts,
1428                                                            nextTask,
1429                                                            iRunStatus->endIOVWaitingTasksEndRun(),
1430                                                            iRunStatus->eventSetupImplsEndRun(),
1431                                                            queueWhichWaitsForIOVsToFinish_,
1432                                                            actReg_.get(),
1433                                                            serviceToken_);
1434     }) | chain::then([this, iRunStatus, ts](std::exception_ptr const* iException, auto nextTask) {
1435       if (iException) {
1436         iRunStatus->setEndingEventSetupSucceeded(false);
1437         handleEndRunExceptions(*iException, nextTask);
1438       }
1439       ServiceRegistry::Operate operate(serviceToken_);
1440       CMS_SA_ALLOW try { actReg_->postESSyncIOVSignal_.emit(ts); } catch (...) {
1441         WaitingTaskHolder copyHolder(nextTask);
1442         copyHolder.doneWaiting(std::current_exception());
1443       }
1444 
1445       streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1446         for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1447           CMS_SA_ALLOW try {
1448             streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1449               streamQueues_[i].pause();
1450               streamEndRunAsync(std::move(nextTask), i);
1451             });
1452           } catch (...) {
1453             WaitingTaskHolder copyHolder(nextTask);
1454             copyHolder.doneWaiting(std::current_exception());
1455           }
1456         }
1457       });
1458 
1459       if (lastTransitionType() == InputSource::IsRun) {
1460         CMS_SA_ALLOW try {
1461           beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1462         } catch (...) {
1463           WaitingTaskHolder copyHolder(nextTask);
1464           copyHolder.doneWaiting(std::current_exception());
1465         }
1466       }
1467     }) | chain::runLast(std::move(iHolder));
1468   }
1469 
1470   void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1471     if (holder.taskHasFailed()) {
1472       setExceptionMessageRuns();
1473     } else {
1474       WaitingTaskHolder tmp(holder);
1475       tmp.doneWaiting(iException);
1476     }
1477   }
1478 
1479   void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1480     auto& runPrincipal = *(iRunStatus->runPrincipal());
1481     bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1482     bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1483     EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1484     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1485     bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1486 
1487     MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1488     using namespace edm::waiting_task::chain;
1489     chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1490                      auto nextTask) {
1491       if (endingEventSetupSucceeded) {
1492         RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1493         using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1494         endGlobalTransitionAsync<Traits>(
1495             std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1496       }
1497     }) |
1498         ifThen(looper_ && endingEventSetupSucceeded,
1499                [this, &runPrincipal, &es](auto nextTask) {
1500                  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1501                }) |
1502         ifThen(looper_ && endingEventSetupSucceeded,
1503                [this, &runPrincipal, &es](auto nextTask) {
1504                  ServiceRegistry::Operate operate(serviceToken_);
1505                  looper_->doEndRun(runPrincipal, es, &processContext_);
1506                }) |
1507         ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1508                [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1509                  mergeableRunProductMetadata->preWriteRun();
1510                  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1511                }) |
1512         then([status = std::move(iRunStatus),
1513               this,
1514               didGlobalBeginSucceed,
1515               mergeableRunProductMetadata,
1516               endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1517           if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1518             mergeableRunProductMetadata->postWriteRun();
1519           }
1520           if (iException) {
1521             handleEndRunExceptions(*iException, nextTask);
1522           }
1523           ServiceRegistry::Operate operate(serviceToken_);
1524 
1525           std::exception_ptr ptr;
1526 
1527           // Try hard to clean up resources so the
1528           // process can terminate in a controlled
1529           // fashion even after exceptions have occurred.
1530           CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1531             if (not ptr) {
1532               ptr = std::current_exception();
1533             }
1534           }
1535           CMS_SA_ALLOW try {
1536             status->resumeGlobalRunQueue();
1537             queueWhichWaitsForIOVsToFinish_.resume();
1538           } catch (...) {
1539             if (not ptr) {
1540               ptr = std::current_exception();
1541             }
1542           }
1543           CMS_SA_ALLOW try {
1544             status->resetEndResources();
1545             status.reset();
1546           } catch (...) {
1547             if (not ptr) {
1548               ptr = std::current_exception();
1549             }
1550           }
1551 
1552           if (ptr && !iException) {
1553             handleEndRunExceptions(ptr, nextTask);
1554           }
1555         }) |
1556         runLast(std::move(iTask));
1557   }
1558 
1559   void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1560     CMS_SA_ALLOW try {
1561       if (!streamRunStatus_[iStreamIndex]) {
1562         if (exceptionRunStatus_->streamFinishedRun()) {
1563           exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1564           exceptionRunStatus_.reset();
1565         }
1566         return;
1567       }
1568 
1569       auto runDoneTask =
1570           edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1571             if (iException) {
1572               handleEndRunExceptions(*iException, iTask);
1573             }
1574 
1575             auto runStatus = streamRunStatus_[iStreamIndex];
1576 
1577             //reset status before releasing queue else get race condition
1578             if (runStatus->streamFinishedRun()) {
1579               runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1580             }
1581             streamRunStatus_[iStreamIndex].reset();
1582             --streamRunActive_;
1583             streamQueues_[iStreamIndex].resume();
1584           });
1585 
1586       WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1587 
1588       auto runStatus = streamRunStatus_[iStreamIndex].get();
1589 
1590       if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1591         EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1592         auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1593         bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1594 
1595         auto& runPrincipal = *runStatus->runPrincipal();
1596         using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1597         RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1598         endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1599                                          *schedule_,
1600                                          iStreamIndex,
1601                                          transitionInfo,
1602                                          serviceToken_,
1603                                          subProcesses_,
1604                                          cleaningUpAfterException);
1605       }
1606     } catch (...) {
1607       handleEndRunExceptions(std::current_exception(), iTask);
1608     }
1609   }
1610 
1611   void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1612     if (streamRunActive_ > 0) {
1613       FinalWaitingTask waitTask{taskGroup_};
1614 
1615       auto runStatus = streamRunStatus_[0].get();
1616       runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1617       WaitingTaskHolder holder{taskGroup_, &waitTask};
1618       runStatus->setHolderOfTaskInProcessRuns(holder);
1619       lastSourceTransition_ = InputSource::IsStop;
1620       endRunAsync(streamRunStatus_[0], std::move(holder));
1621       waitTask.wait();
1622     }
1623   }
1624 
1625   void EventProcessor::beginLumiAsync(IOVSyncValue const& iSync,
1626                                       std::shared_ptr<RunProcessingStatus> iRunStatus,
1627                                       edm::WaitingTaskHolder iHolder) {
1628     actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1629 
1630     auto status = std::make_shared<LuminosityBlockProcessingStatus>(preallocations_.numberOfStreams());
1631     chain::first([this, &iSync, &status](auto nextTask) {
1632       espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1633                                                            nextTask,
1634                                                            status->endIOVWaitingTasks(),
1635                                                            status->eventSetupImpls(),
1636                                                            queueWhichWaitsForIOVsToFinish_,
1637                                                            actReg_.get(),
1638                                                            serviceToken_);
1639     }) | chain::then([this, status, iRunStatus, iSync](std::exception_ptr const* iException, auto nextTask) {
1640       CMS_SA_ALLOW try {
1641         //the call to doneWaiting will cause the count to decrement
1642         if (iException) {
1643           WaitingTaskHolder copyHolder(nextTask);
1644           copyHolder.doneWaiting(*iException);
1645         }
1646 
1647         ServiceRegistry::Operate operate(serviceToken_);
1648         actReg_->postESSyncIOVSignal_.emit(iSync);
1649 
1650         lumiQueue_->pushAndPause(
1651             *nextTask.group(),
1652             [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1653               CMS_SA_ALLOW try {
1654                 if (postLumiQueueTask.taskHasFailed()) {
1655                   status->resetResources();
1656                   queueWhichWaitsForIOVsToFinish_.resume();
1657                   endRunAsync(iRunStatus, postLumiQueueTask);
1658                   return;
1659                 }
1660 
1661                 status->setResumer(std::move(iResumer));
1662 
1663                 sourceResourcesAcquirer_.serialQueueChain().push(
1664                     *postLumiQueueTask.group(),
1665                     [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1666                       CMS_SA_ALLOW try {
1667                         ServiceRegistry::Operate operate(serviceToken_);
1668 
1669                         if (postSourceTask.taskHasFailed()) {
1670                           status->resetResources();
1671                           queueWhichWaitsForIOVsToFinish_.resume();
1672                           endRunAsync(iRunStatus, postSourceTask);
1673                           return;
1674                         }
1675 
1676                         status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1677 
1678                         LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1679                         {
1680                           SendSourceTerminationSignalIfException sentry(actReg_.get());
1681                           input_->doBeginLumi(lumiPrincipal, &processContext_);
1682                           sentry.completedSuccessfully();
1683                         }
1684 
1685                         Service<RandomNumberGenerator> rng;
1686                         if (rng.isAvailable()) {
1687                           LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1688                           rng->preBeginLumi(lb);
1689                         }
1690 
1691                         EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1692 
1693                         using namespace edm::waiting_task::chain;
1694                         chain::first([this, status](auto nextTask) mutable {
1695                           readAndMergeLumiEntriesAsync(std::move(status), std::move(nextTask));
1696                           firstItemAfterLumiMerge_ = true;
1697                         }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1698                           LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1699                           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1700                           beginGlobalTransitionAsync<Traits>(
1701                               nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1702                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1703                           looper_->prefetchAsync(
1704                               nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1705                         }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1706                           status->globalBeginDidSucceed();
1707                           ServiceRegistry::Operate operateLooper(serviceToken_);
1708                           looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1709                         }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1710                           if (iException) {
1711                             status->resetResources();
1712                             queueWhichWaitsForIOVsToFinish_.resume();
1713                             WaitingTaskHolder copyHolder(holder);
1714                             copyHolder.doneWaiting(*iException);
1715                             endRunAsync(iRunStatus, holder);
1716                           } else {
1717                             if (not looper_) {
1718                               status->globalBeginDidSucceed();
1719                             }
1720 
1721                             status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1722 
1723                             EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1724                             using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1725 
1726                             streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1727                               for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1728                                 streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1729                                   streamQueues_[i].pause();
1730 
1731                                   auto& event = principalCache_.eventPrincipal(i);
1732                                   //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1733                                   // held by the container as this lambda may not finish executing before all the tasks it
1734                                   // spawns have already started to run.
1735                                   auto eventSetupImpls = &status->eventSetupImpls();
1736                                   auto lp = status->lumiPrincipal().get();
1737                                   streamLumiStatus_[i] = std::move(status);
1738                                   ++streamLumiActive_;
1739                                   event.setLuminosityBlockPrincipal(lp);
1740                                   LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1741                                   using namespace edm::waiting_task::chain;
1742                                   chain::first([this, i, &transitionInfo](auto nextTask) {
1743                                     beginStreamTransitionAsync<Traits>(std::move(nextTask),
1744                                                                        *schedule_,
1745                                                                        i,
1746                                                                        transitionInfo,
1747                                                                        serviceToken_,
1748                                                                        subProcesses_);
1749                                   }) |
1750                                       then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1751                                                      auto nextTask) {
1752                                         if (exceptionFromBeginStreamLumi) {
1753                                           WaitingTaskHolder copyHolder(nextTask);
1754                                           copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1755                                         }
1756                                         handleNextEventForStreamAsync(std::move(nextTask), i);
1757                                       }) |
1758                                       runLast(std::move(holder));
1759                                 });
1760                               }  // end for loop over streams
1761                             });
1762                           }
1763                         }) | runLast(postSourceTask);
1764                       } catch (...) {
1765                         status->resetResources();
1766                         queueWhichWaitsForIOVsToFinish_.resume();
1767                         WaitingTaskHolder copyHolder(postSourceTask);
1768                         copyHolder.doneWaiting(std::current_exception());
1769                         endRunAsync(iRunStatus, postSourceTask);
1770                       }
1771                     });  // task in sourceResourcesAcquirer
1772               } catch (...) {
1773                 status->resetResources();
1774                 queueWhichWaitsForIOVsToFinish_.resume();
1775                 WaitingTaskHolder copyHolder(postLumiQueueTask);
1776                 copyHolder.doneWaiting(std::current_exception());
1777                 endRunAsync(iRunStatus, postLumiQueueTask);
1778               }
1779             });  // task in lumiQueue
1780       } catch (...) {
1781         status->resetResources();
1782         queueWhichWaitsForIOVsToFinish_.resume();
1783         WaitingTaskHolder copyHolder(nextTask);
1784         copyHolder.doneWaiting(std::current_exception());
1785         endRunAsync(iRunStatus, nextTask);
1786       }
1787     }) | chain::runLast(std::move(iHolder));
1788   }
1789 
1790   void EventProcessor::continueLumiAsync(edm::WaitingTaskHolder iHolder) {
1791     chain::first([this](auto nextTask) {
1792       //all streams are sharing the same status at the moment
1793       auto status = streamLumiStatus_[0];  //read from streamLumiActive_ happened in calling routine
1794       status->setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kProcessing);
1795 
1796       while (lastTransitionType() == InputSource::IsLumi and
1797              status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1798         readAndMergeLumi(*status);
1799         nextTransitionType();
1800       }
1801       firstItemAfterLumiMerge_ = true;
1802     }) | chain::then([this](auto nextTask) mutable {
1803       unsigned int streamIndex = 0;
1804       oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1805       for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1806         arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1807       }
1808       nextTask.group()->run(
1809           [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1810     }) | chain::runLast(std::move(iHolder));
1811   }
1812 
1813   void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1814     if (holder.taskHasFailed()) {
1815       setExceptionMessageLumis();
1816     } else {
1817       WaitingTaskHolder tmp(holder);
1818       tmp.doneWaiting(iException);
1819     }
1820   }
1821 
1822   void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask,
1823                                           std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1824     // Get some needed info out of the status object before moving
1825     // it into finalTaskForThisLumi.
1826     auto& lp = *(iLumiStatus->lumiPrincipal());
1827     bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1828     bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1829     EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1830     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1831 
1832     using namespace edm::waiting_task::chain;
1833     chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1834       IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1835 
1836       LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1837       using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1838       endGlobalTransitionAsync<Traits>(
1839           std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1840     }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1841       //Only call writeLumi if beginLumi succeeded
1842       if (didGlobalBeginSucceed) {
1843         writeLumiAsync(std::move(nextTask), lumiPrincipal);
1844       }
1845     }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1846       looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1847     }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1848       //any thrown exception auto propagates to nextTask via the chain
1849       ServiceRegistry::Operate operate(serviceToken_);
1850       looper_->doEndLuminosityBlock(lp, es, &processContext_);
1851     }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1852       if (iException) {
1853         handleEndLumiExceptions(*iException, nextTask);
1854       }
1855       ServiceRegistry::Operate operate(serviceToken_);
1856 
1857       std::exception_ptr ptr;
1858 
1859       // Try hard to clean up resources so the
1860       // process can terminate in a controlled
1861       // fashion even after exceptions have occurred.
1862       // Caught exception is passed to handleEndLumiExceptions()
1863       CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1864         if (not ptr) {
1865           ptr = std::current_exception();
1866         }
1867       }
1868       // Caught exception is passed to handleEndLumiExceptions()
1869       CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1870         if (not ptr) {
1871           ptr = std::current_exception();
1872         }
1873       }
1874       // Caught exception is passed to handleEndLumiExceptions()
1875       CMS_SA_ALLOW try {
1876         status->resetResources();
1877         status->globalEndRunHolderDoneWaiting();
1878         status.reset();
1879       } catch (...) {
1880         if (not ptr) {
1881           ptr = std::current_exception();
1882         }
1883       }
1884 
1885       if (ptr && !iException) {
1886         handleEndLumiExceptions(ptr, nextTask);
1887       }
1888     }) | runLast(std::move(iTask));
1889   }
1890 
1891   void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1892     auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1893       auto status = streamLumiStatus_[iStreamIndex];
1894       if (iException) {
1895         handleEndLumiExceptions(*iException, iTask);
1896       }
1897 
1898       // reset status before releasing queue else get race condition
1899       streamLumiStatus_[iStreamIndex].reset();
1900       --streamLumiActive_;
1901       streamQueues_[iStreamIndex].resume();
1902 
1903       //are we the last one?
1904       if (status->streamFinishedLumi()) {
1905         globalEndLumiAsync(iTask, std::move(status));
1906       }
1907     });
1908 
1909     edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1910 
1911     // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1912     // therefore we do not want to hold the shared_ptr
1913     auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1914     lumiStatus->setEndTime();
1915 
1916     EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1917     auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1918     bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1919 
1920     if (lumiStatus->didGlobalBeginSucceed()) {
1921       auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1922       using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1923       LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1924       endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1925                                        *schedule_,
1926                                        iStreamIndex,
1927                                        transitionInfo,
1928                                        serviceToken_,
1929                                        subProcesses_,
1930                                        cleaningUpAfterException);
1931     }
1932   }
1933 
1934   void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1935     if (streamRunActive_ == 0) {
1936       assert(streamLumiActive_ == 0);
1937     } else {
1938       assert(streamRunActive_ == preallocations_.numberOfStreams());
1939       if (streamLumiActive_ > 0) {
1940         FinalWaitingTask globalWaitTask{taskGroup_};
1941         assert(streamLumiActive_ == preallocations_.numberOfStreams());
1942         streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1943         for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1944           streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1945         }
1946         globalWaitTask.wait();
1947       }
1948     }
1949   }
1950 
1951   void EventProcessor::readProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
1952     SendSourceTerminationSignalIfException sentry(actReg_.get());
1953     input_->readProcessBlock(processBlockPrincipal);
1954     sentry.completedSuccessfully();
1955   }
1956 
1957   std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
1958     auto rp = principalCache_.getAvailableRunPrincipalPtr();
1959     assert(rp);
1960     rp->setAux(*input_->runAuxiliary());
1961     {
1962       SendSourceTerminationSignalIfException sentry(actReg_.get());
1963       input_->readRun(*rp, *historyAppender_);
1964       sentry.completedSuccessfully();
1965     }
1966     assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1967     return rp;
1968   }
1969 
1970   void EventProcessor::readAndMergeRun(RunProcessingStatus& iStatus) {
1971     RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1972 
1973     bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1974     assert(runOK);
1975     runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
1976     {
1977       SendSourceTerminationSignalIfException sentry(actReg_.get());
1978       input_->readAndMergeRun(runPrincipal);
1979       sentry.completedSuccessfully();
1980     }
1981   }
1982 
1983   std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
1984     auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
1985     assert(lbp);
1986     lbp->setAux(*input_->luminosityBlockAuxiliary());
1987     {
1988       SendSourceTerminationSignalIfException sentry(actReg_.get());
1989       input_->readLuminosityBlock(*lbp, *historyAppender_);
1990       sentry.completedSuccessfully();
1991     }
1992     lbp->setRunPrincipal(std::move(rp));
1993     return lbp;
1994   }
1995 
1996   void EventProcessor::readAndMergeLumi(LuminosityBlockProcessingStatus& iStatus) {
1997     auto& lumiPrincipal = *iStatus.lumiPrincipal();
1998     assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1999            input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2000                input_->processHistoryRegistry().reducedProcessHistoryID(
2001                    input_->luminosityBlockAuxiliary()->processHistoryID()));
2002     bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2003     assert(lumiOK);
2004     lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2005     {
2006       SendSourceTerminationSignalIfException sentry(actReg_.get());
2007       input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2008       sentry.completedSuccessfully();
2009     }
2010   }
2011 
2012   void EventProcessor::writeProcessBlockAsync(WaitingTaskHolder task, ProcessBlockType processBlockType) {
2013     using namespace edm::waiting_task;
2014     chain::first([&](auto nextTask) {
2015       ServiceRegistry::Operate op(serviceToken_);
2016       schedule_->writeProcessBlockAsync(
2017           nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2018     }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2019       ServiceRegistry::Operate op(serviceToken_);
2020       for (auto& s : subProcesses_) {
2021         s.writeProcessBlockAsync(nextTask, processBlockType);
2022       }
2023     }) | chain::runLast(std::move(task));
2024   }
2025 
2026   void EventProcessor::writeRunAsync(WaitingTaskHolder task,
2027                                      RunPrincipal const& runPrincipal,
2028                                      MergeableRunProductMetadata const* mergeableRunProductMetadata) {
2029     using namespace edm::waiting_task;
2030     if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2031       chain::first([&](auto nextTask) {
2032         ServiceRegistry::Operate op(serviceToken_);
2033         schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2034       }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2035         ServiceRegistry::Operate op(serviceToken_);
2036         for (auto& s : subProcesses_) {
2037           s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2038         }
2039       }) | chain::runLast(std::move(task));
2040     }
2041   }
2042 
2043   void EventProcessor::clearRunPrincipal(RunProcessingStatus& iStatus) {
2044     for (auto& s : subProcesses_) {
2045       s.clearRunPrincipal(*iStatus.runPrincipal());
2046     }
2047     iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2048     iStatus.runPrincipal()->clearPrincipal();
2049   }
2050 
2051   void EventProcessor::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& lumiPrincipal) {
2052     using namespace edm::waiting_task;
2053     if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2054       chain::first([&](auto nextTask) {
2055         ServiceRegistry::Operate op(serviceToken_);
2056 
2057         lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2058         schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2059       }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2060         ServiceRegistry::Operate op(serviceToken_);
2061         for (auto& s : subProcesses_) {
2062           s.writeLumiAsync(nextTask, lumiPrincipal);
2063         }
2064       }) | chain::lastTask(std::move(task));
2065     }
2066   }
2067 
2068   void EventProcessor::clearLumiPrincipal(LuminosityBlockProcessingStatus& iStatus) {
2069     for (auto& s : subProcesses_) {
2070       s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2071     }
2072     iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2073     iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2074     iStatus.lumiPrincipal()->clearPrincipal();
2075   }
2076 
2077   void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2078                                                    WaitingTaskHolder iHolder) {
2079     auto group = iHolder.group();
2080     sourceResourcesAcquirer_.serialQueueChain().push(
2081         *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2082           CMS_SA_ALLOW try {
2083             ServiceRegistry::Operate operate(serviceToken_);
2084 
2085             std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2086 
2087             nextTransitionType();
2088             while (lastTransitionType() == InputSource::IsRun and status->runPrincipal()->run() == input_->run() and
2089                    status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2090               if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2091                 status->setStopBeforeProcessingRun(true);
2092                 return;
2093               }
2094               readAndMergeRun(*status);
2095               nextTransitionType();
2096             }
2097           } catch (...) {
2098             status->setStopBeforeProcessingRun(true);
2099             holder.doneWaiting(std::current_exception());
2100           }
2101         });
2102   }
2103 
2104   void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2105                                                     WaitingTaskHolder iHolder) {
2106     auto group = iHolder.group();
2107     sourceResourcesAcquirer_.serialQueueChain().push(
2108         *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2109           CMS_SA_ALLOW try {
2110             ServiceRegistry::Operate operate(serviceToken_);
2111 
2112             std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2113 
2114             nextTransitionType();
2115             while (lastTransitionType() == InputSource::IsLumi and
2116                    iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2117               readAndMergeLumi(*iLumiStatus);
2118               nextTransitionType();
2119             }
2120           } catch (...) {
2121             holder.doneWaiting(std::current_exception());
2122           }
2123         });
2124   }
2125 
2126   void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2127                                                             WaitingTaskHolder iHolder) {
2128     if (lastTransitionType() == InputSource::IsFile) {
2129       iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2130       iHolder.doneWaiting(std::exception_ptr{});
2131     } else if (lastTransitionType() == InputSource::IsLumi && !iHolder.taskHasFailed()) {
2132       CMS_SA_ALLOW try {
2133         beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2134                                     input_->luminosityBlockAuxiliary()->beginTime()),
2135                        iRunStatus,
2136                        iHolder);
2137       } catch (...) {
2138         WaitingTaskHolder copyHolder(iHolder);
2139         iHolder.doneWaiting(std::current_exception());
2140         endRunAsync(std::move(iRunStatus), std::move(iHolder));
2141       }
2142     } else {
2143       // Note that endRunAsync will call beginRunAsync for the following run
2144       // if appropriate.
2145       endRunAsync(std::move(iRunStatus), std::move(iHolder));
2146     }
2147   }
2148 
2149   bool EventProcessor::readNextEventForStream(WaitingTaskHolder const& iTask,
2150                                               unsigned int iStreamIndex,
2151                                               LuminosityBlockProcessingStatus& iStatus) {
2152     // This function returns true if it successfully reads an event for the stream and that
2153     // requires both that an event is next and there are no problems or requests to stop.
2154 
2155     if (iTask.taskHasFailed()) {
2156       // We want all streams to stop or all streams to pause. If we are already in the
2157       // middle of pausing streams, then finish pausing all of them and the lumi will be
2158       // ended later. Otherwise, just end it now.
2159       if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2160         iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2161       }
2162       return false;
2163     }
2164 
2165     // Did another stream already stop or pause this lumi?
2166     if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2167       return false;
2168     }
2169 
2170     // Are output modules or the looper requesting we stop?
2171     if (shouldWeStop()) {
2172       lastSourceTransition_ = InputSource::IsStop;
2173       iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2174       return false;
2175     }
2176 
2177     ServiceRegistry::Operate operate(serviceToken_);
2178 
2179     // need to use lock in addition to the serial task queue because
2180     // of delayed provenance reading and reading data in response to
2181     // edm::Refs etc
2182     std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2183 
2184     // If we didn't already call nextTransitionType while merging lumis, call it here.
2185     // This asks the input source what is next and also checks for signals.
2186 
2187     InputSource::ItemType itemType = firstItemAfterLumiMerge_ ? lastTransitionType() : nextTransitionType();
2188     firstItemAfterLumiMerge_ = false;
2189 
2190     if (InputSource::IsEvent != itemType) {
2191       // IsFile may continue processing the lumi and
2192       // looper_ can cause the input source to declare a new IsRun which is actually
2193       // just a continuation of the previous run
2194       if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
2195           (InputSource::IsRun == itemType and
2196            (iStatus.lumiPrincipal()->run() != input_->run() or
2197             iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2198         iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi);
2199       } else {
2200         iStatus.setEventProcessingState(LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2201       }
2202       return false;
2203     }
2204     readEvent(iStreamIndex);
2205     return true;
2206   }
2207 
2208   void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2209     auto group = iTask.group();
2210     sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2211       CMS_SA_ALLOW try {
2212         auto status = streamLumiStatus_[iStreamIndex].get();
2213         ServiceRegistry::Operate operate(serviceToken_);
2214 
2215         if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2216           auto recursionTask =
2217               make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2218                 if (iEventException) {
2219                   WaitingTaskHolder copyHolder(iTask);
2220                   copyHolder.doneWaiting(*iEventException);
2221                   // Intentionally, we don't return here. The recursive call to
2222                   // handleNextEvent takes care of immediately ending the run properly
2223                   // using the same code it uses to end the run in other situations.
2224                 }
2225                 handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2226               });
2227 
2228           processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2229         } else {
2230           // the stream will stop processing this lumi now
2231           if (status->eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi) {
2232             if (not status->haveStartedNextLumiOrEndedRun()) {
2233               status->startNextLumiOrEndRun();
2234               if (lastTransitionType() == InputSource::IsLumi && !iTask.taskHasFailed()) {
2235                 CMS_SA_ALLOW try {
2236                   beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2237                                               input_->luminosityBlockAuxiliary()->beginTime()),
2238                                  streamRunStatus_[iStreamIndex],
2239                                  iTask);
2240                 } catch (...) {
2241                   WaitingTaskHolder copyHolder(iTask);
2242                   copyHolder.doneWaiting(std::current_exception());
2243                   endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2244                 }
2245               } else {
2246                 // If appropriate, this will also start the next run.
2247                 endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2248               }
2249             }
2250             streamEndLumiAsync(iTask, iStreamIndex);
2251           } else {
2252             assert(status->eventProcessingState() ==
2253                    LuminosityBlockProcessingStatus::EventProcessingState::kPauseForFileTransition);
2254             auto runStatus = streamRunStatus_[iStreamIndex].get();
2255 
2256             if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2257               runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2258             }
2259           }
2260         }
2261       } catch (...) {
2262         WaitingTaskHolder copyHolder(iTask);
2263         copyHolder.doneWaiting(std::current_exception());
2264         handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2265       }
2266     });
2267   }
2268 
2269   void EventProcessor::readEvent(unsigned int iStreamIndex) {
2270     //TODO this will have to become per stream
2271     auto& event = principalCache_.eventPrincipal(iStreamIndex);
2272     StreamContext streamContext(event.streamID(), &processContext_);
2273 
2274     SendSourceTerminationSignalIfException sentry(actReg_.get());
2275     input_->readEvent(event, streamContext);
2276 
2277     streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2278     streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2279     sentry.completedSuccessfully();
2280 
2281     FDEBUG(1) << "\treadEvent\n";
2282   }
2283 
2284   void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2285     iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2286   }
2287 
2288   void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2289     auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2290 
2291     ServiceRegistry::Operate operate(serviceToken_);
2292     Service<RandomNumberGenerator> rng;
2293     if (rng.isAvailable()) {
2294       Event ev(*pep, ModuleDescription(), nullptr);
2295       rng->postEventRead(ev);
2296     }
2297 
2298     EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2299     using namespace edm::waiting_task::chain;
2300     chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2301       EventTransitionInfo info(*pep, es);
2302       schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2303     }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2304       for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2305         subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2306       }
2307     }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2308       //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2309       ServiceRegistry::Operate operateLooper(serviceToken_);
2310       processEventWithLooper(*pep, iStreamIndex);
2311     }) | then([pep](auto nextTask) {
2312       FDEBUG(1) << "\tprocessEvent\n";
2313       pep->clearEventPrincipal();
2314     }) | runLast(iHolder);
2315   }
2316 
2317   void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2318     bool randomAccess = input_->randomAccess();
2319     ProcessingController::ForwardState forwardState = input_->forwardState();
2320     ProcessingController::ReverseState reverseState = input_->reverseState();
2321     ProcessingController pc(forwardState, reverseState, randomAccess);
2322 
2323     EDLooperBase::Status status = EDLooperBase::kContinue;
2324     do {
2325       StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2326       EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2327       status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2328 
2329       bool succeeded = true;
2330       if (randomAccess) {
2331         if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2332           input_->skipEvents(-2);
2333         } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2334           succeeded = input_->goToEvent(pc.specifiedEventTransition());
2335         }
2336       }
2337       pc.setLastOperationSucceeded(succeeded);
2338     } while (!pc.lastOperationSucceeded());
2339     if (status != EDLooperBase::kContinue) {
2340       shouldWeStop_ = true;
2341     }
2342   }
2343 
2344   bool EventProcessor::shouldWeStop() const {
2345     FDEBUG(1) << "\tshouldWeStop\n";
2346     if (shouldWeStop_)
2347       return true;
2348     if (!subProcesses_.empty()) {
2349       for (auto const& subProcess : subProcesses_) {
2350         if (subProcess.terminate()) {
2351           return true;
2352         }
2353       }
2354       return false;
2355     }
2356     return schedule_->terminate();
2357   }
2358 
2359   void EventProcessor::setExceptionMessageFiles(std::string& message) { exceptionMessageFiles_ = message; }
2360 
2361   void EventProcessor::setExceptionMessageRuns() { exceptionMessageRuns_ = true; }
2362 
2363   void EventProcessor::setExceptionMessageLumis() { exceptionMessageLumis_ = true; }
2364 
2365   bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2366     bool expected = false;
2367     if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2368       deferredExceptionPtr_ = iException;
2369       return true;
2370     }
2371     return false;
2372   }
2373 
2374   void EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization() const {
2375     cms::Exception ex("ModulesSynchingOnLumis");
2376     ex << "The framework is configured to use at least two streams, but the following modules\n"
2377        << "require synchronizing on LuminosityBlock boundaries:";
2378     bool found = false;
2379     for (auto worker : schedule_->allWorkers()) {
2380       if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2381         found = true;
2382         ex << "\n  " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2383       }
2384     }
2385     if (found) {
2386       ex << "\n\nThe situation can be fixed by either\n"
2387          << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2388          << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2389       throw ex;
2390     }
2391   }
2392 
2393   void EventProcessor::warnAboutModulesRequiringRunSynchronization() const {
2394     std::unique_ptr<LogSystem> s;
2395     for (auto worker : schedule_->allWorkers()) {
2396       if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2397         if (not s) {
2398           s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2399           (*s) << "The following modules require synchronizing on Run boundaries:";
2400         }
2401         (*s) << "\n  " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2402       }
2403     }
2404   }
2405 
2406   void EventProcessor::warnAboutLegacyModules() const {
2407     std::unique_ptr<LogSystem> s;
2408     for (auto worker : schedule_->allWorkers()) {
2409       if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2410         if (not s) {
2411           s = std::make_unique<LogSystem>("LegacyModules");
2412           (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2413                   "is going to end soon. These modules need to be converted to have type\n"
2414                   "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2415         }
2416         (*s) << "\n  " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2417       }
2418     }
2419   }
2420 }  // namespace edm