Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-06-03 00:59:02

0001 #include "FWCore/Framework/interface/EventProcessor.h"
0002 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0003 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0004 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0005 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0007 #include "DataFormats/Provenance/interface/SubProcessParentageHelper.h"
0008 
0009 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0010 #include "FWCore/Framework/src/CommonParams.h"
0011 #include "FWCore/Framework/interface/EDLooperBase.h"
0012 #include "FWCore/Framework/interface/EventPrincipal.h"
0013 #include "FWCore/Framework/interface/EventSetupProvider.h"
0014 #include "FWCore/Framework/interface/EventSetupRecord.h"
0015 #include "FWCore/Framework/interface/FileBlock.h"
0016 #include "FWCore/Framework/interface/HistoryAppender.h"
0017 #include "FWCore/Framework/interface/InputSourceDescription.h"
0018 #include "FWCore/Framework/interface/IOVSyncValue.h"
0019 #include "FWCore/Framework/interface/LooperFactory.h"
0020 #include "FWCore/Framework/interface/LuminosityBlock.h"
0021 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0022 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0023 #include "FWCore/Framework/interface/ModuleChanger.h"
0024 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0025 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0026 #include "FWCore/Framework/interface/ProcessingController.h"
0027 #include "FWCore/Framework/interface/RunPrincipal.h"
0028 #include "FWCore/Framework/interface/Schedule.h"
0029 #include "FWCore/Framework/interface/ScheduleInfo.h"
0030 #include "FWCore/Framework/interface/ScheduleItems.h"
0031 #include "FWCore/Framework/interface/SubProcess.h"
0032 #include "FWCore/Framework/interface/Event.h"
0033 #include "FWCore/Framework/interface/ESRecordsToProxyIndices.h"
0034 #include "FWCore/Framework/src/Breakpoints.h"
0035 #include "FWCore/Framework/interface/EventSetupsController.h"
0036 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0037 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0038 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0039 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0040 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0041 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0042 
0043 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0044 
0045 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0046 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
0047 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0048 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0049 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0050 #include "FWCore/ParameterSet/interface/Registry.h"
0051 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0052 
0053 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0054 #include "FWCore/ServiceRegistry/interface/Service.h"
0055 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0056 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0057 
0058 #include "FWCore/Concurrency/interface/WaitingTask.h"
0059 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0060 #include "FWCore/Concurrency/interface/chain_first.h"
0061 
0062 #include "FWCore/Utilities/interface/Algorithms.h"
0063 #include "FWCore/Utilities/interface/DebugMacros.h"
0064 #include "FWCore/Utilities/interface/EDMException.h"
0065 #include "FWCore/Utilities/interface/Exception.h"
0066 #include "FWCore/Utilities/interface/ConvertException.h"
0067 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0068 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0069 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0070 #include "FWCore/Utilities/interface/StreamID.h"
0071 #include "FWCore/Utilities/interface/RootHandlers.h"
0072 #include "FWCore/Utilities/interface/propagate_const.h"
0073 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0074 
0075 #include "MessageForSource.h"
0076 #include "MessageForParent.h"
0077 #include "LuminosityBlockProcessingStatus.h"
0078 
0079 #include "boost/range/adaptor/reversed.hpp"
0080 
0081 #include <cassert>
0082 #include <exception>
0083 #include <iomanip>
0084 #include <iostream>
0085 #include <utility>
0086 #include <sstream>
0087 
0088 #include <sys/ipc.h>
0089 #include <sys/msg.h>
0090 
0091 #include "oneapi/tbb/task.h"
0092 
0093 //Used for CPU affinity
0094 #ifndef __APPLE__
0095 #include <sched.h>
0096 #endif
0097 
0098 namespace {
0099   //Sentry class to only send a signal if an
0100   // exception occurs. An exception is identified
0101   // by the destructor being called without first
0102   // calling completedSuccessfully().
0103   class SendSourceTerminationSignalIfException {
0104   public:
0105     SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
0106     ~SendSourceTerminationSignalIfException() {
0107       if (reg_) {
0108         reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
0109       }
0110     }
0111     void completedSuccessfully() { reg_ = nullptr; }
0112 
0113   private:
0114     edm::ActivityRegistry* reg_;  // We do not use propagate_const because the registry itself is mutable.
0115   };
0116 }  // namespace
0117 
0118 namespace edm {
0119 
0120   namespace chain = waiting_task::chain;
0121 
0122   // ---------------------------------------------------------------
0123   std::unique_ptr<InputSource> makeInput(ParameterSet& params,
0124                                          CommonParams const& common,
0125                                          std::shared_ptr<ProductRegistry> preg,
0126                                          std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0127                                          std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0128                                          std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0129                                          std::shared_ptr<ActivityRegistry> areg,
0130                                          std::shared_ptr<ProcessConfiguration const> processConfiguration,
0131                                          PreallocationConfiguration const& allocations) {
0132     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0133     if (main_input == nullptr) {
0134       throw Exception(errors::Configuration)
0135           << "There must be exactly one source in the configuration.\n"
0136           << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0137     }
0138 
0139     std::string modtype(main_input->getParameter<std::string>("@module_type"));
0140 
0141     std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0142         ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0143     ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0144     filler->fill(descriptions);
0145 
0146     try {
0147       convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
0148     } catch (cms::Exception& iException) {
0149       std::ostringstream ost;
0150       ost << "Validating configuration of input source of type " << modtype;
0151       iException.addContext(ost.str());
0152       throw;
0153     }
0154 
0155     main_input->registerIt();
0156 
0157     // Fill in "ModuleDescription", in case the input source produces
0158     // any EDProducts, which would be registered in the ProductRegistry.
0159     // Also fill in the process history item for this process.
0160     // There is no module label for the unnamed input source, so
0161     // just use "source".
0162     // Only the tracked parameters belong in the process configuration.
0163     ModuleDescription md(main_input->id(),
0164                          main_input->getParameter<std::string>("@module_type"),
0165                          "source",
0166                          processConfiguration.get(),
0167                          ModuleDescription::getUniqueID());
0168 
0169     InputSourceDescription isdesc(md,
0170                                   preg,
0171                                   branchIDListHelper,
0172                                   processBlockHelper,
0173                                   thinnedAssociationsHelper,
0174                                   areg,
0175                                   common.maxEventsInput_,
0176                                   common.maxLumisInput_,
0177                                   common.maxSecondsUntilRampdown_,
0178                                   allocations);
0179 
0180     areg->preSourceConstructionSignal_(md);
0181     std::unique_ptr<InputSource> input;
0182     try {
0183       //even if we have an exception, send the signal
0184       std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
0185       convertException::wrap([&]() {
0186         input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
0187         input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
0188         input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
0189       });
0190     } catch (cms::Exception& iException) {
0191       std::ostringstream ost;
0192       ost << "Constructing input source of type " << modtype;
0193       iException.addContext(ost.str());
0194       throw;
0195     }
0196     return input;
0197   }
0198 
0199   // ---------------------------------------------------------------
0200   void validateLooper(ParameterSet& pset) {
0201     auto const modtype = pset.getParameter<std::string>("@module_type");
0202     auto const moduleLabel = pset.getParameter<std::string>("@module_label");
0203     auto filler = ParameterSetDescriptionFillerPluginFactory::get()->create(modtype);
0204     ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0205     filler->fill(descriptions);
0206     try {
0207       edm::convertException::wrap([&]() { descriptions.validate(pset, moduleLabel); });
0208     } catch (cms::Exception& iException) {
0209       iException.addContext(
0210           fmt::format("Validating configuration of EDLooper of type {} with label: '{}'", modtype, moduleLabel));
0211       throw;
0212     }
0213   }
0214 
0215   std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
0216                                            eventsetup::EventSetupProvider& cp,
0217                                            ParameterSet& params,
0218                                            std::vector<std::string> const& loopers) {
0219     std::shared_ptr<EDLooperBase> vLooper;
0220 
0221     assert(1 == loopers.size());
0222 
0223     for (auto const& looperName : loopers) {
0224       ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
0225       validateLooper(*providerPSet);
0226       providerPSet->registerIt();
0227       vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
0228     }
0229     return vLooper;
0230   }
0231 
0232   // ---------------------------------------------------------------
0233   EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,  //std::string const& config,
0234                                  ServiceToken const& iToken,
0235                                  serviceregistry::ServiceLegacy iLegacy,
0236                                  std::vector<std::string> const& defaultServices,
0237                                  std::vector<std::string> const& forcedServices)
0238       : actReg_(),
0239         preg_(),
0240         branchIDListHelper_(),
0241         serviceToken_(),
0242         input_(),
0243         espController_(new eventsetup::EventSetupsController),
0244         esp_(),
0245         act_table_(),
0246         processConfiguration_(),
0247         schedule_(),
0248         subProcesses_(),
0249         historyAppender_(new HistoryAppender),
0250         fb_(),
0251         looper_(),
0252         deferredExceptionPtrIsSet_(false),
0253         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0254         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0255         principalCache_(),
0256         beginJobCalled_(false),
0257         shouldWeStop_(false),
0258         fileModeNoMerge_(false),
0259         exceptionMessageFiles_(),
0260         exceptionMessageRuns_(false),
0261         exceptionMessageLumis_(false),
0262         forceLooperToEnd_(false),
0263         looperBeginJobRun_(false),
0264         forceESCacheClearOnNewRun_(false),
0265         eventSetupDataToExcludeFromPrefetching_() {
0266     auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0267     processDesc->addServices(defaultServices, forcedServices);
0268     init(processDesc, iToken, iLegacy);
0269   }
0270 
0271   EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,  //std::string const& config,
0272                                  std::vector<std::string> const& defaultServices,
0273                                  std::vector<std::string> const& forcedServices)
0274       : actReg_(),
0275         preg_(),
0276         branchIDListHelper_(),
0277         serviceToken_(),
0278         input_(),
0279         espController_(new eventsetup::EventSetupsController),
0280         esp_(),
0281         act_table_(),
0282         processConfiguration_(),
0283         schedule_(),
0284         subProcesses_(),
0285         historyAppender_(new HistoryAppender),
0286         fb_(),
0287         looper_(),
0288         deferredExceptionPtrIsSet_(false),
0289         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0290         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0291         principalCache_(),
0292         beginJobCalled_(false),
0293         shouldWeStop_(false),
0294         fileModeNoMerge_(false),
0295         exceptionMessageFiles_(),
0296         exceptionMessageRuns_(false),
0297         exceptionMessageLumis_(false),
0298         forceLooperToEnd_(false),
0299         looperBeginJobRun_(false),
0300         forceESCacheClearOnNewRun_(false),
0301         eventSetupDataToExcludeFromPrefetching_() {
0302     auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
0303     processDesc->addServices(defaultServices, forcedServices);
0304     init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError);
0305   }
0306 
0307   EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0308                                  ServiceToken const& token,
0309                                  serviceregistry::ServiceLegacy legacy)
0310       : actReg_(),
0311         preg_(),
0312         branchIDListHelper_(),
0313         serviceToken_(),
0314         input_(),
0315         espController_(new eventsetup::EventSetupsController),
0316         esp_(),
0317         act_table_(),
0318         processConfiguration_(),
0319         schedule_(),
0320         subProcesses_(),
0321         historyAppender_(new HistoryAppender),
0322         fb_(),
0323         looper_(),
0324         deferredExceptionPtrIsSet_(false),
0325         sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
0326         sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
0327         principalCache_(),
0328         beginJobCalled_(false),
0329         shouldWeStop_(false),
0330         fileModeNoMerge_(false),
0331         exceptionMessageFiles_(),
0332         exceptionMessageRuns_(false),
0333         exceptionMessageLumis_(false),
0334         forceLooperToEnd_(false),
0335         looperBeginJobRun_(false),
0336         forceESCacheClearOnNewRun_(false),
0337         eventSetupDataToExcludeFromPrefetching_() {
0338     init(processDesc, token, legacy);
0339   }
0340 
0341   void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
0342                             ServiceToken const& iToken,
0343                             serviceregistry::ServiceLegacy iLegacy) {
0344     //std::cerr << processDesc->dump() << std::endl;
0345 
0346     // register the empty parentage vector , once and for all
0347     ParentageRegistry::instance()->insertMapped(Parentage());
0348 
0349     // register the empty parameter set, once and for all.
0350     ParameterSet().registerIt();
0351 
0352     std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
0353 
0354     // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
0355     auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
0356     bool const hasSubProcesses = !subProcessVParameterSet.empty();
0357 
0358     // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
0359     // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
0360     // set in here if the parameters were not explicitly set.
0361     validateTopLevelParameterSets(parameterSet.get());
0362 
0363     // Now set some parameters specific to the main process.
0364     ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
0365     auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
0366     if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
0367       throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
0368           << fileMode << ".\n"
0369           << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
0370     } else {
0371       fileModeNoMerge_ = (fileMode == "NOMERGE");
0372     }
0373     forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
0374     ensureAvailableAccelerators(*parameterSet);
0375 
0376     //threading
0377     unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
0378 
0379     // Even if numberOfThreads was set to zero in the Python configuration, the code
0380     // in cmsRun.cpp should have reset it to something else.
0381     assert(nThreads != 0);
0382 
0383     unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
0384     if (nStreams == 0) {
0385       nStreams = nThreads;
0386     }
0387     unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
0388     if (nConcurrentRuns != 1) {
0389       throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
0390           << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
0391     }
0392     unsigned int nConcurrentLumis =
0393         optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
0394     if (nConcurrentLumis == 0) {
0395       nConcurrentLumis = 2;
0396     }
0397     if (nConcurrentLumis > nStreams) {
0398       nConcurrentLumis = nStreams;
0399     }
0400     std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
0401     if (!loopers.empty()) {
0402       //For now loopers make us run only 1 transition at a time
0403       if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
0404         edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
0405                                                 "of concurrent runs, and the number of concurrent lumis "
0406                                                 "are all being reset to 1. Loopers cannot currently support "
0407                                                 "values greater than 1.";
0408         nStreams = 1;
0409         nConcurrentLumis = 1;
0410         nConcurrentRuns = 1;
0411       }
0412     }
0413     bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
0414     if (dumpOptions) {
0415       dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
0416     } else {
0417       if (nThreads > 1 or nStreams > 1) {
0418         edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
0419       }
0420     }
0421     // The number of concurrent IOVs is configured individually for each record in
0422     // the class NumberOfConcurrentIOVs to values less than or equal to this.
0423     unsigned int maxConcurrentIOVs = nConcurrentLumis;
0424 
0425     //Check that relationships between threading parameters makes sense
0426     /*
0427       if(nThreads<nStreams) {
0428       //bad
0429       }
0430       if(nConcurrentRuns>nStreams) {
0431       //bad
0432       }
0433       if(nConcurrentRuns>nConcurrentLumis) {
0434       //bad
0435       }
0436     */
0437     IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
0438 
0439     printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
0440     deleteNonConsumedUnscheduledModules_ =
0441         optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
0442     //for now, if have a subProcess, don't allow early delete
0443     // In the future we should use the SubProcess's 'keep list' to decide what can be kept
0444     if (not hasSubProcesses) {
0445       branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
0446     }
0447 
0448     // Now do general initialization
0449     ScheduleItems items;
0450 
0451     //initialize the services
0452     auto& serviceSets = processDesc->getServicesPSets();
0453     ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
0454     serviceToken_ = items.addCPRandTNS(*parameterSet, token);
0455 
0456     //make the services available
0457     ServiceRegistry::Operate operate(serviceToken_);
0458 
0459     CMS_SA_ALLOW try {
0460       if (nStreams > 1) {
0461         edm::Service<RootHandlers> handler;
0462         handler->willBeUsingThreads();
0463       }
0464 
0465       // intialize miscellaneous items
0466       std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
0467 
0468       // intialize the event setup provider
0469       ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
0470       esp_ = espController_->makeProvider(
0471           *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
0472 
0473       // initialize the looper, if any
0474       if (!loopers.empty()) {
0475         looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
0476         looper_->setActionTable(items.act_table_.get());
0477         looper_->attachTo(*items.actReg_);
0478 
0479         // in presence of looper do not delete modules
0480         deleteNonConsumedUnscheduledModules_ = false;
0481       }
0482 
0483       preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0484 
0485       lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
0486       streamQueues_.resize(nStreams);
0487       streamLumiStatus_.resize(nStreams);
0488 
0489       processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0490 
0491       // initialize the input source
0492       input_ = makeInput(*parameterSet,
0493                          *common,
0494                          items.preg(),
0495                          items.branchIDListHelper(),
0496                          get_underlying_safe(processBlockHelper_),
0497                          items.thinnedAssociationsHelper(),
0498                          items.actReg_,
0499                          items.processConfiguration(),
0500                          preallocations_);
0501 
0502       // initialize the Schedule
0503       schedule_ =
0504           items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_, *processBlockHelper_);
0505 
0506       // set the data members
0507       act_table_ = std::move(items.act_table_);
0508       actReg_ = items.actReg_;
0509       preg_ = items.preg();
0510       mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0511       branchIDListHelper_ = items.branchIDListHelper();
0512       thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0513       processConfiguration_ = items.processConfiguration();
0514       processContext_.setProcessConfiguration(processConfiguration_.get());
0515       principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
0516 
0517       FDEBUG(2) << parameterSet << std::endl;
0518 
0519       principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0520       for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0521         // Reusable event principal
0522         auto ep = std::make_shared<EventPrincipal>(preg(),
0523                                                    branchIDListHelper(),
0524                                                    thinnedAssociationsHelper(),
0525                                                    *processConfiguration_,
0526                                                    historyAppender_.get(),
0527                                                    index,
0528                                                    true /*primary process*/,
0529                                                    &*processBlockHelper_);
0530         principalCache_.insert(std::move(ep));
0531       }
0532 
0533       for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0534         auto lp =
0535             std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
0536         principalCache_.insert(std::move(lp));
0537       }
0538 
0539       {
0540         auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0541         principalCache_.insert(std::move(pb));
0542 
0543         auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
0544         principalCache_.insertForInput(std::move(pbForInput));
0545       }
0546 
0547       // fill the subprocesses, if there are any
0548       subProcesses_.reserve(subProcessVParameterSet.size());
0549       for (auto& subProcessPSet : subProcessVParameterSet) {
0550         subProcesses_.emplace_back(subProcessPSet,
0551                                    *parameterSet,
0552                                    preg(),
0553                                    branchIDListHelper(),
0554                                    *processBlockHelper_,
0555                                    *thinnedAssociationsHelper_,
0556                                    SubProcessParentageHelper(),
0557                                    *espController_,
0558                                    *actReg_,
0559                                    token,
0560                                    serviceregistry::kConfigurationOverrides,
0561                                    preallocations_,
0562                                    &processContext_);
0563       }
0564     } catch (...) {
0565       //in case of an exception, make sure Services are available
0566       // during the following destructors
0567       espController_ = nullptr;
0568       esp_ = nullptr;
0569       schedule_ = nullptr;
0570       input_ = nullptr;
0571       looper_ = nullptr;
0572       actReg_ = nullptr;
0573       throw;
0574     }
0575   }
0576 
0577   EventProcessor::~EventProcessor() {
0578     // Make the services available while everything is being deleted.
0579     ServiceToken token = getToken();
0580     ServiceRegistry::Operate op(token);
0581 
0582     // manually destroy all these thing that may need the services around
0583     // propagate_const<T> has no reset() function
0584     espController_ = nullptr;
0585     esp_ = nullptr;
0586     schedule_ = nullptr;
0587     input_ = nullptr;
0588     looper_ = nullptr;
0589     actReg_ = nullptr;
0590 
0591     pset::Registry::instance()->clear();
0592     ParentageRegistry::instance()->clear();
0593   }
0594 
0595   void EventProcessor::taskCleanup() {
0596     edm::FinalWaitingTask task;
0597     espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
0598     taskGroup_.wait();
0599     assert(task.done());
0600   }
0601 
0602   void EventProcessor::beginJob() {
0603     if (beginJobCalled_)
0604       return;
0605     beginJobCalled_ = true;
0606     bk::beginJob();
0607 
0608     // StateSentry toerror(this); // should we add this ?
0609     //make the services available
0610     ServiceRegistry::Operate operate(serviceToken_);
0611 
0612     service::SystemBounds bounds(preallocations_.numberOfStreams(),
0613                                  preallocations_.numberOfLuminosityBlocks(),
0614                                  preallocations_.numberOfRuns(),
0615                                  preallocations_.numberOfThreads());
0616     actReg_->preallocateSignal_(bounds);
0617     schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0618     pathsAndConsumesOfModules_.initialize(schedule_.get(), preg());
0619 
0620     std::vector<ModuleProcessName> consumedBySubProcesses;
0621     for_all(subProcesses_,
0622             [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
0623               auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
0624               if (consumedBySubProcesses.empty()) {
0625                 consumedBySubProcesses = std::move(c);
0626               } else if (not c.empty()) {
0627                 std::vector<ModuleProcessName> tmp;
0628                 tmp.reserve(consumedBySubProcesses.size() + c.size());
0629                 std::merge(consumedBySubProcesses.begin(),
0630                            consumedBySubProcesses.end(),
0631                            c.begin(),
0632                            c.end(),
0633                            std::back_inserter(tmp));
0634                 std::swap(consumedBySubProcesses, tmp);
0635               }
0636             });
0637 
0638     // Note: all these may throw
0639     checkForModuleDependencyCorrectness(pathsAndConsumesOfModules_, printDependencies_);
0640     if (deleteNonConsumedUnscheduledModules_) {
0641       if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
0642           not unusedModules.empty()) {
0643         pathsAndConsumesOfModules_.removeModules(unusedModules);
0644 
0645         edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
0646           l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
0647                "and "
0648                "therefore they are deleted before beginJob transition.";
0649           for (auto const& description : unusedModules) {
0650             l << "\n " << description->moduleLabel();
0651           }
0652         });
0653         for (auto const& description : unusedModules) {
0654           schedule_->deleteModule(description->moduleLabel(), actReg_.get());
0655         }
0656       }
0657     }
0658     // Initialize after the deletion of non-consumed unscheduled
0659     // modules to avoid non-consumed non-run modules to keep the
0660     // products unnecessarily alive
0661     if (not branchesToDeleteEarly_.empty()) {
0662       schedule_->initializeEarlyDelete(branchesToDeleteEarly_, *preg_);
0663       decltype(branchesToDeleteEarly_)().swap(branchesToDeleteEarly_);
0664     }
0665 
0666     actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
0667 
0668     if (preallocations_.numberOfLuminosityBlocks() > 1) {
0669       throwAboutModulesRequiringLuminosityBlockSynchronization();
0670     }
0671     warnAboutLegacyModules();
0672 
0673     //NOTE:  This implementation assumes 'Job' means one call
0674     // the EventProcessor::run
0675     // If it really means once per 'application' then this code will
0676     // have to be changed.
0677     // Also have to deal with case where have 'run' then new Module
0678     // added and do 'run'
0679     // again.  In that case the newly added Module needs its 'beginJob'
0680     // to be called.
0681 
0682     //NOTE: in future we should have a beginOfJob for looper that takes no arguments
0683     //  For now we delay calling beginOfJob until first beginOfRun
0684     //if(looper_) {
0685     //   looper_->beginOfJob(es);
0686     //}
0687     try {
0688       convertException::wrap([&]() { input_->doBeginJob(); });
0689     } catch (cms::Exception& ex) {
0690       ex.addContext("Calling beginJob for the source");
0691       throw;
0692     }
0693     espController_->finishConfiguration();
0694     schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
0695     if (looper_) {
0696       constexpr bool mustPrefetchMayGet = true;
0697       auto const processBlockLookup = preg_->productLookup(InProcess);
0698       auto const runLookup = preg_->productLookup(InRun);
0699       auto const lumiLookup = preg_->productLookup(InLumi);
0700       auto const eventLookup = preg_->productLookup(InEvent);
0701       looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
0702       looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
0703       looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
0704       looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
0705       looper_->updateLookup(esp_->recordsToProxyIndices());
0706     }
0707     // toerror.succeeded(); // should we add this?
0708     for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
0709     actReg_->postBeginJobSignal_();
0710 
0711     FinalWaitingTask last;
0712     oneapi::tbb::task_group group;
0713     using namespace edm::waiting_task::chain;
0714     first([this](auto nextTask) {
0715       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0716         first([i, this](auto nextTask) {
0717           ServiceRegistry::Operate operate(serviceToken_);
0718           schedule_->beginStream(i);
0719         }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
0720           ServiceRegistry::Operate operate(serviceToken_);
0721           for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
0722         }) | lastTask(nextTask);
0723       }
0724     }) | runLast(WaitingTaskHolder(group, &last));
0725     group.wait();
0726     if (last.exceptionPtr()) {
0727       std::rethrow_exception(*last.exceptionPtr());
0728     }
0729   }
0730 
0731   void EventProcessor::endJob() {
0732     // Collects exceptions, so we don't throw before all operations are performed.
0733     ExceptionCollector c(
0734         "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0735 
0736     //make the services available
0737     ServiceRegistry::Operate operate(serviceToken_);
0738 
0739     using namespace edm::waiting_task::chain;
0740 
0741     edm::FinalWaitingTask waitTask;
0742     oneapi::tbb::task_group group;
0743 
0744     {
0745       //handle endStream transitions
0746       edm::WaitingTaskHolder taskHolder(group, &waitTask);
0747       std::mutex collectorMutex;
0748       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0749         first([this, i, &c, &collectorMutex](auto nextTask) {
0750           std::exception_ptr ep;
0751           try {
0752             ServiceRegistry::Operate operate(serviceToken_);
0753             this->schedule_->endStream(i);
0754           } catch (...) {
0755             ep = std::current_exception();
0756           }
0757           if (ep) {
0758             std::lock_guard<std::mutex> l(collectorMutex);
0759             c.call([&ep]() { std::rethrow_exception(ep); });
0760           }
0761         }) | then([this, i, &c, &collectorMutex](auto nextTask) {
0762           for (auto& subProcess : subProcesses_) {
0763             first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
0764               std::exception_ptr ep;
0765               try {
0766                 ServiceRegistry::Operate operate(serviceToken_);
0767                 subProcess.doEndStream(i);
0768               } catch (...) {
0769                 ep = std::current_exception();
0770               }
0771               if (ep) {
0772                 std::lock_guard<std::mutex> l(collectorMutex);
0773                 c.call([&ep]() { std::rethrow_exception(ep); });
0774               }
0775             }) | lastTask(nextTask);
0776           }
0777         }) | lastTask(taskHolder);
0778       }
0779     }
0780     group.wait();
0781 
0782     auto actReg = actReg_.get();
0783     c.call([actReg]() { actReg->preEndJobSignal_(); });
0784     schedule_->endJob(c);
0785     for (auto& subProcess : subProcesses_) {
0786       c.call(std::bind(&SubProcess::doEndJob, &subProcess));
0787     }
0788     c.call(std::bind(&InputSource::doEndJob, input_.get()));
0789     if (looper_) {
0790       c.call(std::bind(&EDLooperBase::endOfJob, looper()));
0791     }
0792     c.call([actReg]() { actReg->postEndJobSignal_(); });
0793     if (c.hasThrown()) {
0794       c.rethrow();
0795     }
0796   }
0797 
0798   ServiceToken EventProcessor::getToken() { return serviceToken_; }
0799 
0800   std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
0801     return schedule_->getAllModuleDescriptions();
0802   }
0803 
0804   int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
0805 
0806   int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
0807 
0808   int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
0809 
0810   void EventProcessor::clearCounters() { schedule_->clearCounters(); }
0811 
0812   namespace {
0813 #include "TransitionProcessors.icc"
0814   }
0815 
0816   bool EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) {
0817     bool returnValue = false;
0818 
0819     // Look for a shutdown signal
0820     if (shutdown_flag.load(std::memory_order_acquire)) {
0821       returnValue = true;
0822       returnCode = epSignal;
0823     }
0824     return returnValue;
0825   }
0826 
0827   InputSource::ItemType EventProcessor::nextTransitionType() {
0828     if (deferredExceptionPtrIsSet_.load()) {
0829       lastSourceTransition_ = InputSource::IsStop;
0830       return InputSource::IsStop;
0831     }
0832 
0833     SendSourceTerminationSignalIfException sentry(actReg_.get());
0834     InputSource::ItemType itemType;
0835     //For now, do nothing with InputSource::IsSynchronize
0836     do {
0837       itemType = input_->nextItemType();
0838     } while (itemType == InputSource::IsSynchronize);
0839 
0840     lastSourceTransition_ = itemType;
0841     sentry.completedSuccessfully();
0842 
0843     StatusCode returnCode = epSuccess;
0844 
0845     if (checkForAsyncStopRequest(returnCode)) {
0846       actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
0847       lastSourceTransition_ = InputSource::IsStop;
0848     }
0849 
0850     return lastSourceTransition_;
0851   }
0852 
0853   std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
0854     return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
0855   }
0856 
0857   edm::LuminosityBlockNumber_t EventProcessor::nextLuminosityBlockID() { return input_->luminosityBlock(); }
0858 
0859   EventProcessor::StatusCode EventProcessor::runToCompletion() {
0860     beginJob();  //make sure this was called
0861 
0862     // make the services available
0863     ServiceRegistry::Operate operate(serviceToken_);
0864 
0865     try {
0866       FilesProcessor fp(fileModeNoMerge_);
0867 
0868       convertException::wrap([&]() {
0869         bool firstTime = true;
0870         do {
0871           if (not firstTime) {
0872             prepareForNextLoop();
0873             rewindInput();
0874           } else {
0875             firstTime = false;
0876           }
0877           startingNewLoop();
0878 
0879           auto trans = fp.processFiles(*this);
0880 
0881           fp.normalEnd();
0882 
0883           if (deferredExceptionPtrIsSet_.load()) {
0884             std::rethrow_exception(deferredExceptionPtr_);
0885           }
0886           if (trans != InputSource::IsStop) {
0887             //problem with the source
0888             doErrorStuff();
0889 
0890             throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
0891           }
0892         } while (not endOfLoop());
0893       });  // convertException::wrap
0894 
0895     }  // Try block
0896     catch (cms::Exception& e) {
0897       if (exceptionMessageLumis_) {
0898         std::string message(
0899             "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
0900         e.addAdditionalInfo(message);
0901         if (e.alreadyPrinted()) {
0902           LogAbsolute("Additional Exceptions") << message;
0903         }
0904       }
0905       if (exceptionMessageRuns_) {
0906         std::string message(
0907             "Another exception was caught while trying to clean up runs after the primary fatal exception.");
0908         e.addAdditionalInfo(message);
0909         if (e.alreadyPrinted()) {
0910           LogAbsolute("Additional Exceptions") << message;
0911         }
0912       }
0913       if (!exceptionMessageFiles_.empty()) {
0914         e.addAdditionalInfo(exceptionMessageFiles_);
0915         if (e.alreadyPrinted()) {
0916           LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
0917         }
0918       }
0919       throw;
0920     }
0921     return epSuccess;
0922   }
0923 
0924   void EventProcessor::readFile() {
0925     FDEBUG(1) << " \treadFile\n";
0926     size_t size = preg_->size();
0927     SendSourceTerminationSignalIfException sentry(actReg_.get());
0928 
0929     principalCache_.preReadFile();
0930 
0931     fb_ = input_->readFile();
0932     if (size < preg_->size()) {
0933       principalCache_.adjustIndexesAfterProductRegistryAddition();
0934     }
0935     principalCache_.adjustEventsToNewProductRegistry(preg());
0936     if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
0937       fb_->setNotFastClonable(FileBlock::ParallelProcesses);
0938     }
0939     sentry.completedSuccessfully();
0940   }
0941 
0942   void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
0943     if (fileBlockValid()) {
0944       SendSourceTerminationSignalIfException sentry(actReg_.get());
0945       input_->closeFile(fb_.get(), cleaningUpAfterException);
0946       sentry.completedSuccessfully();
0947     }
0948     FDEBUG(1) << "\tcloseInputFile\n";
0949   }
0950 
0951   void EventProcessor::openOutputFiles() {
0952     if (fileBlockValid()) {
0953       schedule_->openOutputFiles(*fb_);
0954       for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
0955     }
0956     FDEBUG(1) << "\topenOutputFiles\n";
0957   }
0958 
0959   void EventProcessor::closeOutputFiles() {
0960     schedule_->closeOutputFiles();
0961     for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
0962     processBlockHelper_->clearAfterOutputFilesClose();
0963     FDEBUG(1) << "\tcloseOutputFiles\n";
0964   }
0965 
0966   void EventProcessor::respondToOpenInputFile() {
0967     if (fileBlockValid()) {
0968       for_all(subProcesses_,
0969               [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
0970       schedule_->respondToOpenInputFile(*fb_);
0971       for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
0972     }
0973     FDEBUG(1) << "\trespondToOpenInputFile\n";
0974   }
0975 
0976   void EventProcessor::respondToCloseInputFile() {
0977     if (fileBlockValid()) {
0978       schedule_->respondToCloseInputFile(*fb_);
0979       for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
0980     }
0981     FDEBUG(1) << "\trespondToCloseInputFile\n";
0982   }
0983 
0984   void EventProcessor::startingNewLoop() {
0985     shouldWeStop_ = false;
0986     //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
0987     // until after we've called beginOfJob
0988     if (looper_ && looperBeginJobRun_) {
0989       looper_->doStartingNewLoop();
0990     }
0991     FDEBUG(1) << "\tstartingNewLoop\n";
0992   }
0993 
0994   bool EventProcessor::endOfLoop() {
0995     if (looper_) {
0996       ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
0997       looper_->setModuleChanger(&changer);
0998       EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
0999       looper_->setModuleChanger(nullptr);
1000       if (status != EDLooperBase::kContinue || forceLooperToEnd_)
1001         return true;
1002       else
1003         return false;
1004     }
1005     FDEBUG(1) << "\tendOfLoop\n";
1006     return true;
1007   }
1008 
1009   void EventProcessor::rewindInput() {
1010     input_->repeat();
1011     input_->rewind();
1012     FDEBUG(1) << "\trewind\n";
1013   }
1014 
1015   void EventProcessor::prepareForNextLoop() {
1016     looper_->prepareForNextLoop(esp_.get());
1017     FDEBUG(1) << "\tprepareForNextLoop\n";
1018   }
1019 
1020   bool EventProcessor::shouldWeCloseOutput() const {
1021     FDEBUG(1) << "\tshouldWeCloseOutput\n";
1022     if (!subProcesses_.empty()) {
1023       for (auto const& subProcess : subProcesses_) {
1024         if (subProcess.shouldWeCloseOutput()) {
1025           return true;
1026         }
1027       }
1028       return false;
1029     }
1030     return schedule_->shouldWeCloseOutput();
1031   }
1032 
1033   void EventProcessor::doErrorStuff() {
1034     FDEBUG(1) << "\tdoErrorStuff\n";
1035     LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1036                              << "and went to the error state\n"
1037                              << "Will attempt to terminate processing normally\n"
1038                              << "(IF using the looper the next loop will be attempted)\n"
1039                              << "This likely indicates a bug in an input module or corrupted input or both\n";
1040   }
1041 
1042   void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1043     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1044     processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1045 
1046     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1047     FinalWaitingTask globalWaitTask;
1048 
1049     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1050     beginGlobalTransitionAsync<Traits>(
1051         WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1052 
1053     do {
1054       taskGroup_.wait();
1055     } while (not globalWaitTask.done());
1056 
1057     if (globalWaitTask.exceptionPtr() != nullptr) {
1058       std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1059     }
1060     beginProcessBlockSucceeded = true;
1061   }
1062 
1063   void EventProcessor::inputProcessBlocks() {
1064     input_->fillProcessBlockHelper();
1065     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1066     while (input_->nextProcessBlock(processBlockPrincipal)) {
1067       readProcessBlock(processBlockPrincipal);
1068 
1069       using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1070       FinalWaitingTask globalWaitTask;
1071 
1072       ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1073       beginGlobalTransitionAsync<Traits>(
1074           WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1075 
1076       do {
1077         taskGroup_.wait();
1078       } while (not globalWaitTask.done());
1079       if (globalWaitTask.exceptionPtr() != nullptr) {
1080         std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1081       }
1082 
1083       FinalWaitingTask writeWaitTask;
1084       writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::Input);
1085       do {
1086         taskGroup_.wait();
1087       } while (not writeWaitTask.done());
1088       if (writeWaitTask.exceptionPtr()) {
1089         std::rethrow_exception(*writeWaitTask.exceptionPtr());
1090       }
1091 
1092       processBlockPrincipal.clearPrincipal();
1093       for (auto& s : subProcesses_) {
1094         s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1095       }
1096     }
1097   }
1098 
1099   void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1100     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1101 
1102     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1103     FinalWaitingTask globalWaitTask;
1104 
1105     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1106     endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1107                                      *schedule_,
1108                                      transitionInfo,
1109                                      serviceToken_,
1110                                      subProcesses_,
1111                                      cleaningUpAfterException);
1112     do {
1113       taskGroup_.wait();
1114     } while (not globalWaitTask.done());
1115     if (globalWaitTask.exceptionPtr() != nullptr) {
1116       std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1117     }
1118 
1119     if (beginProcessBlockSucceeded) {
1120       FinalWaitingTask writeWaitTask;
1121       writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::New);
1122       do {
1123         taskGroup_.wait();
1124       } while (not writeWaitTask.done());
1125       if (writeWaitTask.exceptionPtr()) {
1126         std::rethrow_exception(*writeWaitTask.exceptionPtr());
1127       }
1128     }
1129 
1130     processBlockPrincipal.clearPrincipal();
1131     for (auto& s : subProcesses_) {
1132       s.clearProcessBlockPrincipal(ProcessBlockType::New);
1133     }
1134   }
1135 
1136   void EventProcessor::beginRun(ProcessHistoryID const& phid,
1137                                 RunNumber_t run,
1138                                 bool& globalBeginSucceeded,
1139                                 bool& eventSetupForInstanceSucceeded) {
1140     globalBeginSucceeded = false;
1141     RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1142     {
1143       SendSourceTerminationSignalIfException sentry(actReg_.get());
1144 
1145       input_->doBeginRun(runPrincipal, &processContext_);
1146       sentry.completedSuccessfully();
1147     }
1148 
1149     IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1150     if (forceESCacheClearOnNewRun_) {
1151       espController_->forceCacheClear();
1152     }
1153     {
1154       SendSourceTerminationSignalIfException sentry(actReg_.get());
1155       actReg_->preESSyncIOVSignal_.emit(ts);
1156       synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
1157       actReg_->postESSyncIOVSignal_.emit(ts);
1158       eventSetupForInstanceSucceeded = true;
1159       sentry.completedSuccessfully();
1160     }
1161     auto const& es = esp_->eventSetupImpl();
1162     if (looper_ && looperBeginJobRun_ == false) {
1163       looper_->copyInfo(ScheduleInfo(schedule_.get()));
1164 
1165       FinalWaitingTask waitTask;
1166       using namespace edm::waiting_task::chain;
1167       chain::first([this, &es](auto nextTask) {
1168         looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1169       }) | then([this, &es](auto nextTask) mutable {
1170         looper_->beginOfJob(es);
1171         looperBeginJobRun_ = true;
1172         looper_->doStartingNewLoop();
1173       }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1174 
1175       do {
1176         taskGroup_.wait();
1177       } while (not waitTask.done());
1178       if (waitTask.exceptionPtr() != nullptr) {
1179         std::rethrow_exception(*(waitTask.exceptionPtr()));
1180       }
1181     }
1182     {
1183       using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1184       FinalWaitingTask globalWaitTask;
1185 
1186       using namespace edm::waiting_task::chain;
1187       chain::first([&runPrincipal, &es, this](auto waitTask) {
1188         RunTransitionInfo transitionInfo(runPrincipal, es);
1189         beginGlobalTransitionAsync<Traits>(
1190             std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1191       }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1192         globalBeginSucceeded = true;
1193         FDEBUG(1) << "\tbeginRun " << run << "\n";
1194       }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1195         looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1196       }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1197         looper_->doBeginRun(runPrincipal, es, &processContext_);
1198       }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1199 
1200       do {
1201         taskGroup_.wait();
1202       } while (not globalWaitTask.done());
1203       if (globalWaitTask.exceptionPtr() != nullptr) {
1204         std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1205       }
1206     }
1207     {
1208       //To wait, the ref count has to be 1+#streams
1209       FinalWaitingTask streamLoopWaitTask;
1210 
1211       using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1212 
1213       RunTransitionInfo transitionInfo(runPrincipal, es);
1214       beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1215                                           *schedule_,
1216                                           preallocations_.numberOfStreams(),
1217                                           transitionInfo,
1218                                           serviceToken_,
1219                                           subProcesses_);
1220       do {
1221         taskGroup_.wait();
1222       } while (not streamLoopWaitTask.done());
1223       if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1224         std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1225       }
1226     }
1227     FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1228     if (looper_) {
1229       //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1230     }
1231   }
1232 
1233   void EventProcessor::endUnfinishedRun(ProcessHistoryID const& phid,
1234                                         RunNumber_t run,
1235                                         bool globalBeginSucceeded,
1236                                         bool cleaningUpAfterException,
1237                                         bool eventSetupForInstanceSucceeded) {
1238     if (eventSetupForInstanceSucceeded) {
1239       //If we skip empty runs, this would be called conditionally
1240       endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1241 
1242       if (globalBeginSucceeded) {
1243         RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1244         if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
1245           FinalWaitingTask t;
1246           MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1247           mergeableRunProductMetadata->preWriteRun();
1248           writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1249           do {
1250             taskGroup_.wait();
1251           } while (not t.done());
1252           mergeableRunProductMetadata->postWriteRun();
1253           if (t.exceptionPtr()) {
1254             std::rethrow_exception(*t.exceptionPtr());
1255           }
1256         }
1257       }
1258     }
1259     deleteRunFromCache(phid, run);
1260   }
1261 
1262   void EventProcessor::endRun(ProcessHistoryID const& phid,
1263                               RunNumber_t run,
1264                               bool globalBeginSucceeded,
1265                               bool cleaningUpAfterException) {
1266     RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1267     runPrincipal.setEndTime(input_->timestamp());
1268 
1269     IOVSyncValue ts(
1270         EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1271         runPrincipal.endTime());
1272     {
1273       SendSourceTerminationSignalIfException sentry(actReg_.get());
1274       actReg_->preESSyncIOVSignal_.emit(ts);
1275       synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
1276       actReg_->postESSyncIOVSignal_.emit(ts);
1277       sentry.completedSuccessfully();
1278     }
1279     auto const& es = esp_->eventSetupImpl();
1280     if (globalBeginSucceeded) {
1281       //To wait, the ref count has to be 1+#streams
1282       FinalWaitingTask streamLoopWaitTask;
1283 
1284       using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1285 
1286       RunTransitionInfo transitionInfo(runPrincipal, es);
1287       endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1288                                         *schedule_,
1289                                         preallocations_.numberOfStreams(),
1290                                         transitionInfo,
1291                                         serviceToken_,
1292                                         subProcesses_,
1293                                         cleaningUpAfterException);
1294       do {
1295         taskGroup_.wait();
1296       } while (not streamLoopWaitTask.done());
1297       if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1298         std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1299       }
1300     }
1301     FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1302     if (looper_) {
1303       //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1304     }
1305     {
1306       FinalWaitingTask globalWaitTask;
1307 
1308       using namespace edm::waiting_task::chain;
1309       chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1310         RunTransitionInfo transitionInfo(runPrincipal, es);
1311         using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1312         endGlobalTransitionAsync<Traits>(
1313             std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1314       }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1315         looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1316       }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1317         looper_->doEndRun(runPrincipal, es, &processContext_);
1318       }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1319 
1320       do {
1321         taskGroup_.wait();
1322       } while (not globalWaitTask.done());
1323       if (globalWaitTask.exceptionPtr() != nullptr) {
1324         std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1325       }
1326     }
1327     FDEBUG(1) << "\tendRun " << run << "\n";
1328   }
1329 
1330   InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1331     FinalWaitingTask waitTask;
1332     if (streamLumiActive_ > 0) {
1333       assert(streamLumiActive_ == preallocations_.numberOfStreams());
1334       // Continue after opening a new input file
1335       continueLumiAsync(WaitingTaskHolder{taskGroup_, &waitTask});
1336     } else {
1337       beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1338                                   input_->luminosityBlockAuxiliary()->beginTime()),
1339                      iRunResource,
1340                      WaitingTaskHolder{taskGroup_, &waitTask});
1341     }
1342     do {
1343       taskGroup_.wait();
1344     } while (not waitTask.done());
1345 
1346     if (waitTask.exceptionPtr() != nullptr) {
1347       std::rethrow_exception(*(waitTask.exceptionPtr()));
1348     }
1349     return lastTransitionType();
1350   }
1351 
1352   void EventProcessor::beginLumiAsync(IOVSyncValue const& iSync,
1353                                       std::shared_ptr<void> const& iRunResource,
1354                                       edm::WaitingTaskHolder iHolder) {
1355     if (iHolder.taskHasFailed()) {
1356       return;
1357     }
1358 
1359     actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1360     // We must be careful with the status object here and in code this function calls. IF we want
1361     // endRun to be called, then we must call resetResources before the things waiting on
1362     // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1363     // endRun to be called much later than it should be, because status is holding iRunResource).
1364     // Note that this must be done explicitly. Relying on the destructor does not work well
1365     // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1366     // queues, plus it is difficult to guarantee the destructor is called  before iHolder gets
1367     // destroyed inside this function and lumiWork.
1368     auto status =
1369         std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1370     chain::first([&](auto nextTask) {
1371       auto asyncEventSetup = [](ActivityRegistry* actReg,
1372                                 auto* espController,
1373                                 auto& queue,
1374                                 WaitingTaskHolder task,
1375                                 auto& status,
1376                                 IOVSyncValue const& iSync) {
1377         queue.pause();
1378         CMS_SA_ALLOW try {
1379           SendSourceTerminationSignalIfException sentry(actReg);
1380           // Pass in iSync to let the EventSetup system know which run and lumi
1381           // need to be processed and prepare IOVs for it.
1382           // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1383           // lumi is done and no longer needs its EventSetup IOVs.
1384           actReg->preESSyncIOVSignal_.emit(iSync);
1385           espController->eventSetupForInstanceAsync(
1386               iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1387           sentry.completedSuccessfully();
1388         } catch (...) {
1389           task.doneWaiting(std::current_exception());
1390         }
1391       };
1392       if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1393         // We only get here inside this block if there is an EventSetup
1394         // module not able to handle concurrent IOVs (usually an ESSource)
1395         // and the new sync value is outside the current IOV of that module.
1396         auto group = nextTask.group();
1397         queueWhichWaitsForIOVsToFinish_.push(
1398             *group, [this, task = std::move(nextTask), iSync, status, asyncEventSetup]() mutable {
1399               asyncEventSetup(
1400                   actReg_.get(), espController_.get(), queueWhichWaitsForIOVsToFinish_, std::move(task), status, iSync);
1401             });
1402       } else {
1403         asyncEventSetup(
1404             actReg_.get(), espController_.get(), queueWhichWaitsForIOVsToFinish_, std::move(nextTask), status, iSync);
1405       }
1406     }) | chain::then([this, status, iSync](std::exception_ptr const* iPtr, auto nextTask) {
1407       actReg_->postESSyncIOVSignal_.emit(iSync);
1408       //the call to doneWaiting will cause the count to decrement
1409       auto copyTask = nextTask;
1410       if (iPtr) {
1411         nextTask.doneWaiting(*iPtr);
1412       }
1413       auto group = copyTask.group();
1414       lumiQueue_->pushAndPause(
1415           *group, [this, task = std::move(copyTask), status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1416             if (task.taskHasFailed()) {
1417               status->resetResources();
1418               return;
1419             }
1420 
1421             status->setResumer(std::move(iResumer));
1422 
1423             auto group = task.group();
1424             sourceResourcesAcquirer_.serialQueueChain().push(
1425                 *group, [this, postQueueTask = std::move(task), status = std::move(status)]() mutable {
1426                   //make the services available
1427                   ServiceRegistry::Operate operate(serviceToken_);
1428                   // Caught exception is propagated via WaitingTaskHolder
1429                   CMS_SA_ALLOW try {
1430                     readLuminosityBlock(*status);
1431 
1432                     LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1433                     {
1434                       SendSourceTerminationSignalIfException sentry(actReg_.get());
1435 
1436                       input_->doBeginLumi(lumiPrincipal, &processContext_);
1437                       sentry.completedSuccessfully();
1438                     }
1439 
1440                     Service<RandomNumberGenerator> rng;
1441                     if (rng.isAvailable()) {
1442                       LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1443                       rng->preBeginLumi(lb);
1444                     }
1445 
1446                     EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1447 
1448                     using namespace edm::waiting_task::chain;
1449                     chain::first([this, status, &lumiPrincipal](auto nextTask) {
1450                       EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1451                       {
1452                         LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1453                         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1454                         beginGlobalTransitionAsync<Traits>(
1455                             nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1456                       }
1457                     }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1458                       looper_->prefetchAsync(
1459                           nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1460                     }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1461                       status->globalBeginDidSucceed();
1462                       //make the services available
1463                       ServiceRegistry::Operate operateLooper(serviceToken_);
1464                       looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1465                     }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1466                       if (iPtr) {
1467                         status->resetResources();
1468                         holder.doneWaiting(*iPtr);
1469                       } else {
1470                         if (not looper_) {
1471                           status->globalBeginDidSucceed();
1472                         }
1473                         EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1474                         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1475 
1476                         for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1477                           streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1478                             streamQueues_[i].pause();
1479 
1480                             auto& event = principalCache_.eventPrincipal(i);
1481                             //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1482                             // held by the container as this lambda may not finish executing before all the tasks it
1483                             // spawns have already started to run.
1484                             auto eventSetupImpls = &status->eventSetupImpls();
1485                             auto lp = status->lumiPrincipal().get();
1486                             streamLumiStatus_[i] = std::move(status);
1487                             ++streamLumiActive_;
1488                             event.setLuminosityBlockPrincipal(lp);
1489                             LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1490                             using namespace edm::waiting_task::chain;
1491                             chain::first([this, i, &transitionInfo](auto nextTask) {
1492                               beginStreamTransitionAsync<Traits>(
1493                                   std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1494                             }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1495                               if (exceptionFromBeginStreamLumi) {
1496                                 WaitingTaskHolder tmp(nextTask);
1497                                 tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1498                                 streamEndLumiAsync(nextTask, i);
1499                               } else {
1500                                 handleNextEventForStreamAsync(std::move(nextTask), i);
1501                               }
1502                             }) | runLast(holder);
1503                           });
1504                         }
1505                       }
1506                     }) | runLast(postQueueTask);
1507 
1508                   } catch (...) {
1509                     status->resetResources();
1510                     postQueueTask.doneWaiting(std::current_exception());
1511                   }
1512                 });  // task in sourceResourcesAcquirer
1513           });
1514     }) | chain::runLast(std::move(iHolder));
1515   }
1516 
1517   void EventProcessor::continueLumiAsync(edm::WaitingTaskHolder iHolder) {
1518     {
1519       //all streams are sharing the same status at the moment
1520       auto status = streamLumiStatus_[0];  //read from streamLumiActive_ happened in calling routine
1521       status->needToContinueLumi();
1522       status->startProcessingEvents();
1523     }
1524 
1525     unsigned int streamIndex = 0;
1526     oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1527     for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1528       arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1529     }
1530     iHolder.group()->run(
1531         [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1532   }
1533 
1534   void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1535     if (setDeferredException(*iPtr)) {
1536       WaitingTaskHolder tmp(holder);
1537       tmp.doneWaiting(*iPtr);
1538     } else {
1539       setExceptionMessageLumis();
1540     }
1541   }
1542 
1543   void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask,
1544                                           std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1545     // Get some needed info out of the status object before moving
1546     // it into finalTaskForThisLumi.
1547     auto& lp = *(iLumiStatus->lumiPrincipal());
1548     bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1549     bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1550     EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1551     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1552 
1553     using namespace edm::waiting_task::chain;
1554     chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1555       IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1556 
1557       LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1558       using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1559       endGlobalTransitionAsync<Traits>(
1560           std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1561     }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1562       //Only call writeLumi if beginLumi succeeded
1563       if (didGlobalBeginSucceed) {
1564         writeLumiAsync(std::move(nextTask), lumiPrincipal);
1565       }
1566     }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1567       looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1568     }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1569       //any thrown exception auto propagates to nextTask via the chain
1570       ServiceRegistry::Operate operate(serviceToken_);
1571       looper_->doEndLuminosityBlock(lp, es, &processContext_);
1572     }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1573       std::exception_ptr ptr;
1574       if (iPtr) {
1575         ptr = *iPtr;
1576       }
1577       ServiceRegistry::Operate operate(serviceToken_);
1578 
1579       // Try hard to clean up resources so the
1580       // process can terminate in a controlled
1581       // fashion even after exceptions have occurred.
1582       // Caught exception is passed to handleEndLumiExceptions()
1583       CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1584         if (not ptr) {
1585           ptr = std::current_exception();
1586         }
1587       }
1588       // Caught exception is passed to handleEndLumiExceptions()
1589       CMS_SA_ALLOW try {
1590         status->resumeGlobalLumiQueue();
1591         queueWhichWaitsForIOVsToFinish_.resume();
1592       } catch (...) {
1593         if (not ptr) {
1594           ptr = std::current_exception();
1595         }
1596       }
1597       // Caught exception is passed to handleEndLumiExceptions()
1598       CMS_SA_ALLOW try {
1599         // This call to status.resetResources() must occur before iTask is destroyed.
1600         // Otherwise there will be a data race which could result in endRun
1601         // being delayed until it is too late to successfully call it.
1602         status->resetResources();
1603         status.reset();
1604       } catch (...) {
1605         if (not ptr) {
1606           ptr = std::current_exception();
1607         }
1608       }
1609 
1610       if (ptr) {
1611         handleEndLumiExceptions(&ptr, nextTask);
1612       }
1613     }) | runLast(std::move(iTask));
1614   }
1615 
1616   void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1617     auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1618       if (iPtr) {
1619         handleEndLumiExceptions(iPtr, iTask);
1620       }
1621       auto status = streamLumiStatus_[iStreamIndex];
1622       //reset status before releasing queue else get race condtion
1623       streamLumiStatus_[iStreamIndex].reset();
1624       --streamLumiActive_;
1625       streamQueues_[iStreamIndex].resume();
1626 
1627       //are we the last one?
1628       if (status->streamFinishedLumi()) {
1629         globalEndLumiAsync(iTask, std::move(status));
1630       }
1631     });
1632 
1633     edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1634 
1635     //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1636     // therefore we do not want to hold the shared_ptr
1637     auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1638     lumiStatus->setEndTime();
1639 
1640     EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1641 
1642     bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1643     auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1644 
1645     if (lumiStatus->didGlobalBeginSucceed()) {
1646       auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1647       IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1648                       lumiPrincipal.endTime());
1649       using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1650       LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1651       endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1652                                        *schedule_,
1653                                        iStreamIndex,
1654                                        transitionInfo,
1655                                        serviceToken_,
1656                                        subProcesses_,
1657                                        cleaningUpAfterException);
1658     }
1659   }
1660 
1661   void EventProcessor::endUnfinishedLumi() {
1662     if (streamLumiActive_.load() > 0) {
1663       FinalWaitingTask globalWaitTask;
1664       {
1665         WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1666         for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1667           if (streamLumiStatus_[i]) {
1668             streamEndLumiAsync(globalTaskHolder, i);
1669           }
1670         }
1671       }
1672       do {
1673         taskGroup_.wait();
1674       } while (not globalWaitTask.done());
1675       if (globalWaitTask.exceptionPtr() != nullptr) {
1676         std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1677       }
1678     }
1679   }
1680 
1681   void EventProcessor::readProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
1682     SendSourceTerminationSignalIfException sentry(actReg_.get());
1683     input_->readProcessBlock(processBlockPrincipal);
1684     sentry.completedSuccessfully();
1685   }
1686 
1687   std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1688     if (principalCache_.hasRunPrincipal()) {
1689       throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1690                                                     << "Illegal attempt to insert run into cache\n"
1691                                                     << "Contact a Framework Developer\n";
1692     }
1693     auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1694                                              preg(),
1695                                              *processConfiguration_,
1696                                              historyAppender_.get(),
1697                                              0,
1698                                              true,
1699                                              &mergeableRunProductProcesses_);
1700     {
1701       SendSourceTerminationSignalIfException sentry(actReg_.get());
1702       input_->readRun(*rp, *historyAppender_);
1703       sentry.completedSuccessfully();
1704     }
1705     assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1706     principalCache_.insert(rp);
1707     return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1708   }
1709 
1710   std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1711     principalCache_.merge(input_->runAuxiliary(), preg());
1712     auto runPrincipal = principalCache_.runPrincipalPtr();
1713     {
1714       SendSourceTerminationSignalIfException sentry(actReg_.get());
1715       input_->readAndMergeRun(*runPrincipal);
1716       sentry.completedSuccessfully();
1717     }
1718     assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1719     return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1720   }
1721 
1722   void EventProcessor::readLuminosityBlock(LuminosityBlockProcessingStatus& iStatus) {
1723     if (!principalCache_.hasRunPrincipal()) {
1724       throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1725                                                     << "Illegal attempt to insert lumi into cache\n"
1726                                                     << "Run is invalid\n"
1727                                                     << "Contact a Framework Developer\n";
1728     }
1729     auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
1730     assert(lbp);
1731     lbp->setAux(*input_->luminosityBlockAuxiliary());
1732     {
1733       SendSourceTerminationSignalIfException sentry(actReg_.get());
1734       input_->readLuminosityBlock(*lbp, *historyAppender_);
1735       sentry.completedSuccessfully();
1736     }
1737     lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1738     iStatus.lumiPrincipal() = std::move(lbp);
1739   }
1740 
1741   int EventProcessor::readAndMergeLumi(LuminosityBlockProcessingStatus& iStatus) {
1742     auto& lumiPrincipal = *iStatus.lumiPrincipal();
1743     assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1744            input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1745                input_->processHistoryRegistry().reducedProcessHistoryID(
1746                    input_->luminosityBlockAuxiliary()->processHistoryID()));
1747     bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1748     assert(lumiOK);
1749     lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1750     {
1751       SendSourceTerminationSignalIfException sentry(actReg_.get());
1752       input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1753       sentry.completedSuccessfully();
1754     }
1755     return input_->luminosityBlock();
1756   }
1757 
1758   void EventProcessor::writeProcessBlockAsync(WaitingTaskHolder task, ProcessBlockType processBlockType) {
1759     using namespace edm::waiting_task;
1760     chain::first([&](auto nextTask) {
1761       ServiceRegistry::Operate op(serviceToken_);
1762       schedule_->writeProcessBlockAsync(
1763           nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1764     }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
1765       ServiceRegistry::Operate op(serviceToken_);
1766       for (auto& s : subProcesses_) {
1767         s.writeProcessBlockAsync(nextTask, processBlockType);
1768       }
1769     }) | chain::runLast(std::move(task));
1770   }
1771 
1772   void EventProcessor::writeRunAsync(WaitingTaskHolder task,
1773                                      ProcessHistoryID const& phid,
1774                                      RunNumber_t run,
1775                                      MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1776     using namespace edm::waiting_task;
1777     chain::first([&](auto nextTask) {
1778       ServiceRegistry::Operate op(serviceToken_);
1779       schedule_->writeRunAsync(nextTask,
1780                                principalCache_.runPrincipal(phid, run),
1781                                &processContext_,
1782                                actReg_.get(),
1783                                mergeableRunProductMetadata);
1784     }) | chain::ifThen(not subProcesses_.empty(), [this, phid, run, mergeableRunProductMetadata](auto nextTask) {
1785       ServiceRegistry::Operate op(serviceToken_);
1786       for (auto& s : subProcesses_) {
1787         s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1788       }
1789     }) | chain::runLast(std::move(task));
1790   }
1791 
1792   void EventProcessor::deleteRunFromCache(ProcessHistoryID const& phid, RunNumber_t run) {
1793     principalCache_.deleteRun(phid, run);
1794     for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1795     FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1796   }
1797 
1798   void EventProcessor::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& lumiPrincipal) {
1799     using namespace edm::waiting_task;
1800     if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
1801       chain::first([&](auto nextTask) {
1802         ServiceRegistry::Operate op(serviceToken_);
1803 
1804         lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1805         schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
1806       }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
1807         ServiceRegistry::Operate op(serviceToken_);
1808         for (auto& s : subProcesses_) {
1809           s.writeLumiAsync(nextTask, lumiPrincipal);
1810         }
1811       }) | chain::lastTask(std::move(task));
1812     }
1813   }
1814 
1815   void EventProcessor::deleteLumiFromCache(LuminosityBlockProcessingStatus& iStatus) {
1816     for (auto& s : subProcesses_) {
1817       s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1818     }
1819     iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
1820     iStatus.lumiPrincipal()->clearPrincipal();
1821     //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1822   }
1823 
1824   bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus& iStatus) {
1825     if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1826       iStatus.endLumi();
1827       return false;
1828     }
1829 
1830     if (iStatus.wasEventProcessingStopped()) {
1831       return false;
1832     }
1833 
1834     if (shouldWeStop()) {
1835       lastSourceTransition_ = InputSource::IsStop;
1836       iStatus.stopProcessingEvents();
1837       iStatus.endLumi();
1838       return false;
1839     }
1840 
1841     ServiceRegistry::Operate operate(serviceToken_);
1842     // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1843     CMS_SA_ALLOW try {
1844       //need to use lock in addition to the serial task queue because
1845       // of delayed provenance reading and reading data in response to
1846       // edm::Refs etc
1847       std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1848 
1849       auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1850       if (InputSource::IsLumi == itemType) {
1851         iStatus.haveContinuedLumi();
1852         while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1853                iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1854           readAndMergeLumi(iStatus);
1855           itemType = nextTransitionType();
1856         }
1857         if (InputSource::IsLumi == itemType) {
1858           iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1859                                                 input_->luminosityBlockAuxiliary()->beginTime()));
1860         }
1861       }
1862       if (InputSource::IsEvent != itemType) {
1863         iStatus.stopProcessingEvents();
1864 
1865         //IsFile may continue processing the lumi and
1866         // looper_ can cause the input source to declare a new IsRun which is actually
1867         // just a continuation of the previous run
1868         if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1869             (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1870           iStatus.endLumi();
1871         }
1872         return false;
1873       }
1874       readEvent(iStreamIndex);
1875     } catch (...) {
1876       bool expected = false;
1877       if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1878         deferredExceptionPtr_ = std::current_exception();
1879         iStatus.endLumi();
1880       }
1881       return false;
1882     }
1883     return true;
1884   }
1885 
1886   void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1887     sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1888       ServiceRegistry::Operate operate(serviceToken_);
1889       //we do not want to extend the lifetime of the shared_ptr to the end of this function
1890       // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1891       auto status = streamLumiStatus_[iStreamIndex].get();
1892       // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1893       CMS_SA_ALLOW try {
1894         if (readNextEventForStream(iStreamIndex, *status)) {
1895           auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1896             if (iPtr) {
1897               // Try to end the stream properly even if an exception was
1898               // thrown on an event.
1899               bool expected = false;
1900               if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1901                 // This is the case where the exception in iPtr is the primary
1902                 // exception and we want to see its message.
1903                 deferredExceptionPtr_ = *iPtr;
1904                 WaitingTaskHolder tempHolder(iTask);
1905                 tempHolder.doneWaiting(*iPtr);
1906               }
1907               streamEndLumiAsync(std::move(iTask), iStreamIndex);
1908               //the stream will stop now
1909               return;
1910             }
1911             handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1912           });
1913 
1914           processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1915         } else {
1916           //the stream will stop now
1917           if (status->isLumiEnding()) {
1918             if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1919               status->startNextLumi();
1920               beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1921             }
1922             streamEndLumiAsync(std::move(iTask), iStreamIndex);
1923           } else {
1924             iTask.doneWaiting(std::exception_ptr{});
1925           }
1926         }
1927       } catch (...) {
1928         // It is unlikely we will ever get in here ...
1929         // But if we do try to clean up and propagate the exception
1930         if (streamLumiStatus_[iStreamIndex]) {
1931           streamEndLumiAsync(iTask, iStreamIndex);
1932         }
1933         bool expected = false;
1934         if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1935           auto e = std::current_exception();
1936           deferredExceptionPtr_ = e;
1937           iTask.doneWaiting(e);
1938         }
1939       }
1940     });
1941   }
1942 
1943   void EventProcessor::readEvent(unsigned int iStreamIndex) {
1944     //TODO this will have to become per stream
1945     auto& event = principalCache_.eventPrincipal(iStreamIndex);
1946     StreamContext streamContext(event.streamID(), &processContext_);
1947 
1948     SendSourceTerminationSignalIfException sentry(actReg_.get());
1949     input_->readEvent(event, streamContext);
1950 
1951     streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1952     sentry.completedSuccessfully();
1953 
1954     FDEBUG(1) << "\treadEvent\n";
1955   }
1956 
1957   void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1958     iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1959   }
1960 
1961   void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1962     auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1963 
1964     ServiceRegistry::Operate operate(serviceToken_);
1965     Service<RandomNumberGenerator> rng;
1966     if (rng.isAvailable()) {
1967       Event ev(*pep, ModuleDescription(), nullptr);
1968       rng->postEventRead(ev);
1969     }
1970 
1971     EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1972     using namespace edm::waiting_task::chain;
1973     chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1974       EventTransitionInfo info(*pep, es);
1975       schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1976     }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1977       for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1978         subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1979       }
1980     }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1981       //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
1982       ServiceRegistry::Operate operateLooper(serviceToken_);
1983       processEventWithLooper(*pep, iStreamIndex);
1984     }) | then([pep](auto nextTask) {
1985       FDEBUG(1) << "\tprocessEvent\n";
1986       pep->clearEventPrincipal();
1987     }) | runLast(iHolder);
1988   }
1989 
1990   void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
1991     bool randomAccess = input_->randomAccess();
1992     ProcessingController::ForwardState forwardState = input_->forwardState();
1993     ProcessingController::ReverseState reverseState = input_->reverseState();
1994     ProcessingController pc(forwardState, reverseState, randomAccess);
1995 
1996     EDLooperBase::Status status = EDLooperBase::kContinue;
1997     do {
1998       StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1999       EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2000       status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2001 
2002       bool succeeded = true;
2003       if (randomAccess) {
2004         if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2005           input_->skipEvents(-2);
2006         } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2007           succeeded = input_->goToEvent(pc.specifiedEventTransition());
2008         }
2009       }
2010       pc.setLastOperationSucceeded(succeeded);
2011     } while (!pc.lastOperationSucceeded());
2012     if (status != EDLooperBase::kContinue) {
2013       shouldWeStop_ = true;
2014       lastSourceTransition_ = InputSource::IsStop;
2015     }
2016   }
2017 
2018   bool EventProcessor::shouldWeStop() const {
2019     FDEBUG(1) << "\tshouldWeStop\n";
2020     if (shouldWeStop_)
2021       return true;
2022     if (!subProcesses_.empty()) {
2023       for (auto const& subProcess : subProcesses_) {
2024         if (subProcess.terminate()) {
2025           return true;
2026         }
2027       }
2028       return false;
2029     }
2030     return schedule_->terminate();
2031   }
2032 
2033   void EventProcessor::setExceptionMessageFiles(std::string& message) { exceptionMessageFiles_ = message; }
2034 
2035   void EventProcessor::setExceptionMessageRuns() { exceptionMessageRuns_ = true; }
2036 
2037   void EventProcessor::setExceptionMessageLumis() { exceptionMessageLumis_ = true; }
2038 
2039   bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2040     bool expected = false;
2041     if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2042       deferredExceptionPtr_ = iException;
2043       return true;
2044     }
2045     return false;
2046   }
2047 
2048   void EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization() const {
2049     cms::Exception ex("ModulesSynchingOnLumis");
2050     ex << "The framework is configured to use at least two streams, but the following modules\n"
2051        << "require synchronizing on LuminosityBlock boundaries:";
2052     bool found = false;
2053     for (auto worker : schedule_->allWorkers()) {
2054       if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2055         found = true;
2056         ex << "\n  " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2057       }
2058     }
2059     if (found) {
2060       ex << "\n\nThe situation can be fixed by either\n"
2061          << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2062          << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2063       throw ex;
2064     }
2065   }
2066 
2067   void EventProcessor::warnAboutLegacyModules() const {
2068     std::unique_ptr<LogSystem> s;
2069     for (auto worker : schedule_->allWorkers()) {
2070       if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2071         if (not s) {
2072           s = std::make_unique<LogSystem>("LegacyModules");
2073           (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2074                   "is going to end soon. These modules need to be converted to have type\n"
2075                   "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2076         }
2077         (*s) << "\n  " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2078       }
2079     }
2080   }
2081 }  // namespace edm