Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:15

0001 #include "FWCore/Framework/interface/SubProcess.h"
0002 
0003 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0005 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0007 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0008 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0009 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0010 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0011 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0012 #include "DataFormats/Provenance/interface/SubProcessParentageHelper.h"
0013 #include "FWCore/Common/interface/SubProcessBlockHelper.h"
0014 #include "FWCore/Framework/interface/EventForOutput.h"
0015 #include "FWCore/Framework/interface/EventPrincipal.h"
0016 #include "FWCore/Framework/interface/FileBlock.h"
0017 #include "FWCore/Framework/interface/ProductResolverBase.h"
0018 #include "FWCore/Framework/interface/HistoryAppender.h"
0019 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0020 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0021 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0022 #include "FWCore/Framework/src/OutputModuleDescription.h"
0023 #include "FWCore/Framework/interface/RunPrincipal.h"
0024 #include "FWCore/Framework/interface/getAllTriggerNames.h"
0025 #include "FWCore/Framework/interface/TriggerNamesService.h"
0026 #include "FWCore/Framework/interface/ScheduleItems.h"
0027 #include "FWCore/Framework/interface/EventSetupsController.h"
0028 #include "FWCore/Framework/interface/SignallingProductRegistry.h"
0029 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0030 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0031 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0032 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0033 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0034 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
0035 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0036 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0037 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0038 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0039 #include "FWCore/Concurrency/interface/WaitingTask.h"
0040 #include "FWCore/Concurrency/interface/chain_first.h"
0041 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0042 
0043 #include "boost/range/adaptor/reversed.hpp"
0044 
0045 #include <cassert>
0046 #include <exception>
0047 #include <string>
0048 
0049 namespace edm {
0050 
0051   SubProcess::SubProcess(ParameterSet& parameterSet,
0052                          ParameterSet const& topLevelParameterSet,
0053                          std::shared_ptr<ProductRegistry const> parentProductRegistry,
0054                          std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
0055                          ProcessBlockHelperBase const& parentProcessBlockHelper,
0056                          ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
0057                          SubProcessParentageHelper const& parentSubProcessParentageHelper,
0058                          eventsetup::EventSetupsController& esController,
0059                          ActivityRegistry& parentActReg,
0060                          ServiceToken const& token,
0061                          serviceregistry::ServiceLegacy iLegacy,
0062                          PreallocationConfiguration const& preallocConfig,
0063                          ProcessContext const* parentProcessContext,
0064                          ModuleTypeResolverMaker const* typeResolverMaker)
0065       : EDConsumerBase(),
0066         serviceToken_(),
0067         parentPreg_(parentProductRegistry),
0068         preg_(),
0069         branchIDListHelper_(),
0070         act_table_(),
0071         processConfiguration_(),
0072         historyLumiOffset_(preallocConfig.numberOfStreams()),
0073         historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
0074         processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
0075         historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
0076         principalCache_(),
0077         esp_(),
0078         schedule_(),
0079         subProcesses_(),
0080         processParameterSet_(),
0081         productSelectorRules_(parameterSet, "outputCommands", "OutputModule"),
0082         productSelector_(),
0083         wantAllEvents_(true) {
0084     //Setup the event selection
0085     Service<service::TriggerNamesService> tns;
0086 
0087     ParameterSet selectevents = parameterSet.getUntrackedParameterSet("SelectEvents", ParameterSet());
0088 
0089     selectevents.registerIt();  // Just in case this PSet is not registered
0090     wantAllEvents_ = detail::configureEventSelector(
0091         selectevents, tns->getProcessName(), getAllTriggerNames(), selectors_, consumesCollector());
0092     std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0093     selector_config_id_ = detail::registerProperSelectionInfo(
0094         selectevents, "", outputModulePathPositions, parentProductRegistry->anyProductProduced());
0095 
0096     std::map<BranchID, bool> keepAssociation;
0097     selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
0098 
0099     std::string const maxEvents("maxEvents");
0100     std::string const maxLumis("maxLuminosityBlocks");
0101 
0102     // propagate_const<T> has no reset() function
0103     processParameterSet_ =
0104         std::unique_ptr<ParameterSet>(parameterSet.popParameterSet(std::string("process")).release());
0105 
0106     // if this process has a maxEvents or maxLuminosityBlocks parameter set, remove them.
0107     if (processParameterSet_->exists(maxEvents)) {
0108       processParameterSet_->popParameterSet(maxEvents);
0109     }
0110     if (processParameterSet_->exists(maxLumis)) {
0111       processParameterSet_->popParameterSet(maxLumis);
0112     }
0113 
0114     // if the top level process has a maxEvents or maxLuminosityBlocks parameter set, add them to this process.
0115     if (topLevelParameterSet.exists(maxEvents)) {
0116       processParameterSet_->addUntrackedParameter<ParameterSet>(
0117           maxEvents, topLevelParameterSet.getUntrackedParameterSet(maxEvents));
0118     }
0119     if (topLevelParameterSet.exists(maxLumis)) {
0120       processParameterSet_->addUntrackedParameter<ParameterSet>(
0121           maxLumis, topLevelParameterSet.getUntrackedParameterSet(maxLumis));
0122     }
0123 
0124     // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
0125     auto subProcessVParameterSet = popSubProcessVParameterSet(*processParameterSet_);
0126     bool hasSubProcesses = !subProcessVParameterSet.empty();
0127 
0128     // Validates the parameters in the 'options', 'maxEvents', and 'maxLuminosityBlocks'
0129     // top level parameter sets. Default values are also set in here if the
0130     // parameters were not explicitly set.
0131     validateTopLevelParameterSets(processParameterSet_.get());
0132 
0133     processBlockHelper_ = std::make_shared<SubProcessBlockHelper>();
0134 
0135     ScheduleItems items(*parentProductRegistry, *this, *processBlockHelper_, parentProcessBlockHelper);
0136     actReg_ = items.actReg_;
0137 
0138     //initialize the services
0139     ServiceToken iToken;
0140 
0141     // get any configured services.
0142     auto serviceSets = processParameterSet_->popVParameterSet(std::string("services"));
0143 
0144     ServiceToken newToken = items.initServices(serviceSets, *processParameterSet_, token, iLegacy, false);
0145     parentActReg.connectToSubProcess(*items.actReg_);
0146     serviceToken_ = items.addCPRandTNS(*processParameterSet_, newToken);
0147 
0148     //make the services available
0149     ServiceRegistry::Operate operate(serviceToken_);
0150 
0151     // intialize miscellaneous items
0152     items.initMisc(*processParameterSet_);
0153 
0154     // intialize the event setup provider
0155     esp_ = esController.makeProvider(*processParameterSet_, actReg_.get());
0156 
0157     branchIDListHelper_ = items.branchIDListHelper();
0158     updateBranchIDListHelper(parentBranchIDListHelper->branchIDLists());
0159 
0160     thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0161     thinnedAssociationsHelper_->updateFromParentProcess(
0162         parentThinnedAssociationsHelper, keepAssociation, droppedBranchIDToKeptBranchID_);
0163 
0164     // intialize the Schedule
0165     schedule_ = items.initSchedule(*processParameterSet_,
0166                                    hasSubProcesses,
0167                                    preallocConfig,
0168                                    &processContext_,
0169                                    typeResolverMaker,
0170                                    *processBlockHelper_);
0171 
0172     // set the items
0173     act_table_ = std::move(items.act_table_);
0174     preg_ = items.preg();
0175 
0176     subProcessParentageHelper_ = items.subProcessParentageHelper();
0177     subProcessParentageHelper_->update(parentSubProcessParentageHelper, *parentProductRegistry);
0178 
0179     processConfiguration_ = items.processConfiguration();
0180     processContext_.setProcessConfiguration(processConfiguration_.get());
0181     processContext_.setParentProcessContext(parentProcessContext);
0182 
0183     principalCache_.setNumberOfConcurrentPrincipals(preallocConfig);
0184     for (unsigned int index = 0; index < preallocConfig.numberOfStreams(); ++index) {
0185       auto ep = std::make_shared<EventPrincipal>(preg_,
0186                                                  branchIDListHelper(),
0187                                                  thinnedAssociationsHelper(),
0188                                                  *processConfiguration_,
0189                                                  &(historyAppenders_[index]),
0190                                                  index,
0191                                                  false /*not primary process*/,
0192                                                  &*processBlockHelper_);
0193       principalCache_.insert(ep);
0194     }
0195 
0196     for (unsigned int index = 0; index < preallocConfig.numberOfRuns(); ++index) {
0197       auto rpp = std::make_unique<RunPrincipal>(
0198           preg_, *processConfiguration_, &(historyAppenders_[historyRunOffset_ + index]), index, false);
0199       principalCache_.insert(std::move(rpp));
0200     }
0201 
0202     for (unsigned int index = 0; index < preallocConfig.numberOfLuminosityBlocks(); ++index) {
0203       auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
0204           preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_ + index]), index, false);
0205       principalCache_.insert(std::move(lbpp));
0206     }
0207 
0208     {
0209       auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
0210       principalCache_.insert(std::move(pb));
0211 
0212       auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
0213       principalCache_.insertForInput(std::move(pbForInput));
0214     }
0215 
0216     inUseRunPrincipals_.resize(preallocConfig.numberOfRuns());
0217     inUseLumiPrincipals_.resize(preallocConfig.numberOfLuminosityBlocks());
0218 
0219     subProcesses_.reserve(subProcessVParameterSet.size());
0220     for (auto& subProcessPSet : subProcessVParameterSet) {
0221       subProcesses_.emplace_back(subProcessPSet,
0222                                  topLevelParameterSet,
0223                                  preg_,
0224                                  branchIDListHelper(),
0225                                  *processBlockHelper_,
0226                                  *thinnedAssociationsHelper_,
0227                                  *subProcessParentageHelper_,
0228                                  esController,
0229                                  *items.actReg_,
0230                                  newToken,
0231                                  iLegacy,
0232                                  preallocConfig,
0233                                  &processContext_,
0234                                  typeResolverMaker);
0235     }
0236   }
0237 
0238   SubProcess::~SubProcess() {}
0239 
0240   std::vector<ModuleProcessName> SubProcess::keepOnlyConsumedUnscheduledModules(bool deleteModules) {
0241     schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0242     pathsAndConsumesOfModules_.initialize(schedule_.get(), preg_);
0243 
0244     // Note: all these may throw
0245     checkForModuleDependencyCorrectness(pathsAndConsumesOfModules_, false);
0246 
0247     // Consumes information from the child SubProcesses
0248     std::vector<ModuleProcessName> consumedByChildren;
0249     for_all(subProcesses_, [&consumedByChildren, deleteModules](auto& subProcess) {
0250       auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
0251       if (consumedByChildren.empty()) {
0252         std::swap(consumedByChildren, c);
0253       } else if (not c.empty()) {
0254         std::vector<ModuleProcessName> tmp;
0255         tmp.reserve(consumedByChildren.size() + c.size());
0256         std::merge(consumedByChildren.begin(), consumedByChildren.end(), c.begin(), c.end(), std::back_inserter(tmp));
0257         std::swap(consumedByChildren, tmp);
0258       }
0259     });
0260 
0261     // Non-consumed unscheduled modules in this SubProcess, take into account of the consumes from child SubProcesses
0262     if (deleteModules) {
0263       if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedByChildren);
0264           not unusedModules.empty()) {
0265         pathsAndConsumesOfModules_.removeModules(unusedModules);
0266 
0267         edm::LogInfo("DeleteModules").log([&unusedModules, this](auto& l) {
0268           l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
0269                "and "
0270                "therefore they are deleted from SubProcess "
0271             << processConfiguration_->processName() << " before beginJob transition.";
0272           for (auto const& description : unusedModules) {
0273             l << "\n " << description->moduleLabel();
0274           }
0275         });
0276         for (auto const& description : unusedModules) {
0277           schedule_->deleteModule(description->moduleLabel(), actReg_.get());
0278         }
0279       }
0280     }
0281 
0282     // Products possibly consumed from the parent (Sub)Process
0283     for (auto const& description : pathsAndConsumesOfModules_.allModules()) {
0284       for (auto const& dep :
0285            pathsAndConsumesOfModules_.modulesInPreviousProcessesWhoseProductsAreConsumedBy(description->id())) {
0286         auto it = std::lower_bound(consumedByChildren.begin(),
0287                                    consumedByChildren.end(),
0288                                    ModuleProcessName{dep.moduleLabel(), dep.processName()});
0289         consumedByChildren.emplace(it, dep.moduleLabel(), dep.processName());
0290       }
0291     }
0292     return consumedByChildren;
0293   }
0294 
0295   void SubProcess::doBeginJob() { this->beginJob(); }
0296 
0297   void SubProcess::doEndJob() { endJob(); }
0298 
0299   void SubProcess::beginJob() {
0300     // If event selection is being used, the SubProcess class reads TriggerResults
0301     // object(s) in the parent process from the event. This next call is needed for
0302     // getByToken to work properly. Normally, this is done by the worker, but since
0303     // a SubProcess is not a module, it has no worker.
0304     updateLookup(InEvent, *parentPreg_->productLookup(InEvent), false);
0305 
0306     if (!droppedBranchIDToKeptBranchID().empty()) {
0307       fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID());
0308     }
0309     ServiceRegistry::Operate operate(serviceToken_);
0310     actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
0311     schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
0312     for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
0313   }
0314 
0315   void SubProcess::endJob() {
0316     ServiceRegistry::Operate operate(serviceToken_);
0317     ExceptionCollector c(
0318         "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
0319     schedule_->endJob(c);
0320     for (auto& subProcess : subProcesses_) {
0321       c.call([&subProcess]() { subProcess.doEndJob(); });
0322     }
0323     if (c.hasThrown()) {
0324       c.rethrow();
0325     }
0326   }
0327 
0328   void SubProcess::selectProducts(ProductRegistry const& preg,
0329                                   ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
0330                                   std::map<BranchID, bool>& keepAssociation) {
0331     if (productSelector_.initialized())
0332       return;
0333     productSelector_.initialize(productSelectorRules_, preg.allBranchDescriptions());
0334 
0335     // TODO: See if we can collapse keptProducts_ and productSelector_ into a
0336     // single object. See the notes in the header for ProductSelector
0337     // for more information.
0338 
0339     std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
0340     std::vector<BranchDescription const*> associationDescriptions;
0341     std::set<BranchID> keptProductsInEvent;
0342 
0343     for (auto const& it : preg.productList()) {
0344       BranchDescription const& desc = it.second;
0345       if (desc.transient()) {
0346         // if the class of the branch is marked transient, output nothing
0347       } else if (!desc.present() && !desc.produced()) {
0348         // else if the branch containing the product has been previously dropped,
0349         // output nothing
0350       } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) {
0351         associationDescriptions.push_back(&desc);
0352       } else if (productSelector_.selected(desc)) {
0353         keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0354       }
0355     }
0356 
0357     parentThinnedAssociationsHelper.selectAssociationProducts(
0358         associationDescriptions, keptProductsInEvent, keepAssociation);
0359 
0360     for (auto association : associationDescriptions) {
0361       if (keepAssociation[association->branchID()]) {
0362         keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0363       }
0364     }
0365 
0366     // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
0367     ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
0368   }
0369 
0370   void SubProcess::keepThisBranch(BranchDescription const& desc,
0371                                   std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
0372                                   std::set<BranchID>& keptProductsInEvent) {
0373     ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc);
0374 
0375     if (desc.branchType() == InEvent) {
0376       if (desc.produced()) {
0377         keptProductsInEvent.insert(desc.originalBranchID());
0378       } else {
0379         keptProductsInEvent.insert(desc.branchID());
0380       }
0381     }
0382     EDGetToken token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0383                                 InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()});
0384 
0385     // Now put it in the list of selected branches.
0386     keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
0387   }
0388 
0389   void SubProcess::fixBranchIDListsForEDAliases(
0390       std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID) {
0391     // Check for branches dropped while an EDAlias was kept.
0392     // Replace BranchID of each dropped branch with that of the kept alias.
0393     for (BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
0394       for (BranchID::value_type& branchID : branchIDList) {
0395         std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
0396             droppedBranchIDToKeptBranchID.find(branchID);
0397         if (iter != droppedBranchIDToKeptBranchID.end()) {
0398           branchID = iter->second;
0399         }
0400       }
0401     }
0402     for_all(subProcesses_, [&droppedBranchIDToKeptBranchID](auto& subProcess) {
0403       subProcess.fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID);
0404     });
0405   }
0406 
0407   void SubProcess::doEventAsync(WaitingTaskHolder iHolder,
0408                                 EventPrincipal const& ep,
0409                                 std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
0410     ServiceRegistry::Operate operate(serviceToken_);
0411     /* BEGIN relevant bits from OutputModule::doEvent */
0412     if (!wantAllEvents_) {
0413       EventForOutput e(ep, ModuleDescription(), nullptr);
0414       e.setConsumer(this);
0415       if (!selectors_.wantEvent(e)) {
0416         return;
0417       }
0418     }
0419     processAsync(std::move(iHolder), ep, iEventSetupImpls);
0420     /* END relevant bits from OutputModule::doEvent */
0421   }
0422 
0423   void SubProcess::processAsync(WaitingTaskHolder iHolder,
0424                                 EventPrincipal const& principal,
0425                                 std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
0426     EventAuxiliary aux(principal.aux());
0427     aux.setProcessHistoryID(principal.processHistoryID());
0428 
0429     EventSelectionIDVector esids{principal.eventSelectionIDs()};
0430     if (principal.productRegistry().anyProductProduced() || !wantAllEvents_) {
0431       esids.push_back(selector_config_id_);
0432     }
0433 
0434     EventPrincipal& ep = principalCache_.eventPrincipal(principal.streamID().value());
0435     auto& processHistoryRegistry = processHistoryRegistries_[principal.streamID().value()];
0436     processHistoryRegistry.registerProcessHistory(principal.processHistory());
0437     BranchListIndexes bli(principal.branchListIndexes());
0438     branchIDListHelper_->fixBranchListIndexes(bli);
0439     bool deepCopyRetriever = false;
0440     ep.fillEventPrincipal(
0441         aux,
0442         &principal.processHistory(),
0443         std::move(esids),
0444         std::move(bli),
0445         principal.eventToProcessBlockIndexes(),
0446         *(principal.productProvenanceRetrieverPtr()),  //NOTE: this transfers the per product provenance
0447         principal.reader(),
0448         deepCopyRetriever);
0449     ep.setLuminosityBlockPrincipal(inUseLumiPrincipals_[principal.luminosityBlockPrincipal().index()].get());
0450     propagateProducts(InEvent, principal, ep);
0451 
0452     using namespace edm::waiting_task;
0453     chain::first([&](auto nextTask) {
0454       EventTransitionInfo info(ep, *((*iEventSetupImpls)[esp_->subProcessIndex()]));
0455       schedule_->processOneEventAsync(std::move(nextTask), ep.streamID().value(), info, serviceToken_);
0456     }) | chain::ifThen(not subProcesses_.empty(), [this, &ep, iEventSetupImpls](auto nextTask) {
0457       for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
0458         subProcess.doEventAsync(nextTask, ep, iEventSetupImpls);
0459       }
0460     }) | chain::then([&ep](std::exception_ptr const* iPtr, auto nextTask) {
0461       ep.clearEventPrincipal();
0462       if (iPtr) {
0463         nextTask.doneWaiting(*iPtr);
0464       }
0465     }) | chain::runLast(std::move(iHolder));
0466   }
0467 
0468   template <>
0469   void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
0470       WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool cleaningUpAfterException) {
0471     ServiceRegistry::Operate operate(serviceToken_);
0472 
0473     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0474     ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0475     processBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
0476     propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
0477 
0478     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0479     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0480     beginGlobalTransitionAsync<Traits>(
0481         std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0482   }
0483 
0484   template <>
0485   void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
0486       WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool) {
0487     ServiceRegistry::Operate operate(serviceToken_);
0488 
0489     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0490     ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0491     processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0492     propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
0493 
0494     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0495     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0496     beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
0497   }
0498 
0499   void SubProcess::doEndProcessBlockAsync(WaitingTaskHolder iHolder,
0500                                           ProcessBlockTransitionInfo const& iTransitionInfo,
0501                                           bool cleaningUpAfterException) {
0502     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0503     ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0504     propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
0505 
0506     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0507 
0508     if (parentProducedProductIsKept(parentPrincipal, processBlockPrincipal)) {
0509       ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0510       inputProcessBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
0511       propagateProducts(InProcess, parentPrincipal, inputProcessBlockPrincipal);
0512       ProcessBlockTransitionInfo inputTransitionInfo(inputProcessBlockPrincipal);
0513 
0514       using namespace edm::waiting_task;
0515       chain::first([&](const std::exception_ptr*, auto nextTask) {
0516         using TraitsInput = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0517         beginGlobalTransitionAsync<TraitsInput>(std::move(nextTask),
0518                                                 *schedule_,
0519                                                 inputTransitionInfo,
0520                                                 serviceToken_,
0521                                                 subProcesses_,
0522                                                 cleaningUpAfterException);
0523       }) | chain::then([this](auto nextTask) { writeProcessBlockAsync(nextTask, ProcessBlockType::Input); }) |
0524           chain::then([this, info = transitionInfo, cleaningUpAfterException](std::exception_ptr const* iPtr,
0525                                                                               auto nextTask) mutable {
0526             ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0527             inputProcessBlockPrincipal.clearPrincipal();
0528             for (auto& s : subProcesses_) {
0529               s.clearProcessBlockPrincipal(ProcessBlockType::Input);
0530             }
0531             if (iPtr) {
0532               nextTask.doneWaiting(*iPtr);
0533             } else {
0534               using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0535               endGlobalTransitionAsync<Traits>(
0536                   std::move(nextTask), *schedule_, info, serviceToken_, subProcesses_, cleaningUpAfterException);
0537             }
0538           }) |
0539           chain::runLast(std::move(iHolder));
0540     } else {
0541       using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0542       endGlobalTransitionAsync<Traits>(
0543           std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0544     }
0545   }
0546 
0547   void SubProcess::doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const& iTransitionInfo) {
0548     ServiceRegistry::Operate operate(serviceToken_);
0549 
0550     RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
0551     auto aux = parentPrincipal.aux();
0552     aux.setProcessHistoryID(parentPrincipal.processHistoryID());
0553     auto rpp = principalCache_.getAvailableRunPrincipalPtr();
0554     rpp->setAux(aux);
0555     auto& processHistoryRegistry = processHistoryRegistries_[historyRunOffset_ + parentPrincipal.index()];
0556     inUseRunPrincipals_[parentPrincipal.index()] = rpp;
0557     processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
0558     rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.reader());
0559 
0560     RunPrincipal& rp = *rpp;
0561     propagateProducts(InRun, parentPrincipal, rp);
0562 
0563     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0564     RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0565     using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0566     beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
0567   }
0568 
0569   void SubProcess::doEndRunAsync(WaitingTaskHolder iHolder,
0570                                  RunTransitionInfo const& iTransitionInfo,
0571                                  bool cleaningUpAfterException) {
0572     RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
0573     RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
0574     propagateProducts(InRun, parentPrincipal, rp);
0575 
0576     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0577     RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0578     using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0579     endGlobalTransitionAsync<Traits>(
0580         std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0581   }
0582 
0583   void SubProcess::writeProcessBlockAsync(edm::WaitingTaskHolder task, ProcessBlockType processBlockType) {
0584     using namespace edm::waiting_task;
0585     chain::first([&](std::exception_ptr const*, auto nextTask) {
0586       ServiceRegistry::Operate operate(serviceToken_);
0587       schedule_->writeProcessBlockAsync(
0588           nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
0589     }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
0590       ServiceRegistry::Operate operate(serviceToken_);
0591       for (auto& s : subProcesses_) {
0592         s.writeProcessBlockAsync(nextTask, processBlockType);
0593       }
0594     }) | chain::runLast(std::move(task));
0595   }
0596 
0597   void SubProcess::writeRunAsync(edm::WaitingTaskHolder task,
0598                                  RunPrincipal const& principal,
0599                                  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
0600     using namespace edm::waiting_task;
0601 
0602     auto rp = inUseRunPrincipals_[principal.index()];
0603     chain::first([&](std::exception_ptr const*, auto nextTask) {
0604       ServiceRegistry::Operate operate(serviceToken_);
0605       schedule_->writeRunAsync(nextTask, *rp, &processContext_, actReg_.get(), mergeableRunProductMetadata);
0606     }) | chain::ifThen(not subProcesses_.empty(), [this, rp, mergeableRunProductMetadata](auto nextTask) {
0607       ServiceRegistry::Operate operateWriteRun(serviceToken_);
0608       for (auto& s : subProcesses_) {
0609         s.writeRunAsync(nextTask, *rp, mergeableRunProductMetadata);
0610       }
0611     }) | chain::runLast(std::move(task));
0612   }
0613 
0614   void SubProcess::clearRunPrincipal(RunPrincipal& parentPrincipal) {
0615     //release from list but stay around till end of routine
0616     auto rp = std::move(inUseRunPrincipals_[parentPrincipal.index()]);
0617     for (auto& s : subProcesses_) {
0618       s.clearRunPrincipal(*rp);
0619     }
0620     rp->clearPrincipal();
0621   }
0622 
0623   void SubProcess::clearProcessBlockPrincipal(ProcessBlockType processBlockType) {
0624     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal(processBlockType);
0625     processBlockPrincipal.clearPrincipal();
0626     for (auto& s : subProcesses_) {
0627       s.clearProcessBlockPrincipal(processBlockType);
0628     }
0629   }
0630 
0631   void SubProcess::doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const& iTransitionInfo) {
0632     ServiceRegistry::Operate operate(serviceToken_);
0633 
0634     LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0635     auto aux = parentPrincipal.aux();
0636     aux.setProcessHistoryID(parentPrincipal.processHistoryID());
0637     auto lbpp = principalCache_.getAvailableLumiPrincipalPtr();
0638     lbpp->setAux(aux);
0639     auto& processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_ + lbpp->index()];
0640     inUseLumiPrincipals_[parentPrincipal.index()] = lbpp;
0641     processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
0642     lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.processHistory(), parentPrincipal.reader());
0643     lbpp->setRunPrincipal(inUseRunPrincipals_[parentPrincipal.runPrincipal().index()]);
0644     LuminosityBlockPrincipal& lbp = *lbpp;
0645     propagateProducts(InLumi, parentPrincipal, lbp);
0646 
0647     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0648     LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0649     using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0650     beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
0651   }
0652 
0653   void SubProcess::doEndLuminosityBlockAsync(WaitingTaskHolder iHolder,
0654                                              LumiTransitionInfo const& iTransitionInfo,
0655                                              bool cleaningUpAfterException) {
0656     LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0657     LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[parentPrincipal.index()];
0658     propagateProducts(InLumi, parentPrincipal, lbp);
0659 
0660     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0661     LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0662     using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0663     endGlobalTransitionAsync<Traits>(
0664         std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0665   }
0666 
0667   void SubProcess::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& principal) {
0668     using namespace edm::waiting_task;
0669 
0670     auto l = inUseLumiPrincipals_[principal.index()];
0671     chain::first([&](std::exception_ptr const*, auto nextTask) {
0672       ServiceRegistry::Operate operate(serviceToken_);
0673       schedule_->writeLumiAsync(nextTask, *l, &processContext_, actReg_.get());
0674     }) | chain::ifThen(not subProcesses_.empty(), [this, l](auto nextTask) {
0675       ServiceRegistry::Operate operateWriteLumi(serviceToken_);
0676       for (auto& s : subProcesses_) {
0677         s.writeLumiAsync(nextTask, *l);
0678       }
0679     }) | chain::runLast(std::move(task));
0680   }
0681 
0682   void SubProcess::clearLumiPrincipal(LuminosityBlockPrincipal& principal) {
0683     //release from list but stay around till end of routine
0684     auto lb = std::move(inUseLumiPrincipals_[principal.index()]);
0685     for (auto& s : subProcesses_) {
0686       s.clearLumiPrincipal(*lb);
0687     }
0688     lb->setRunPrincipal(std::shared_ptr<RunPrincipal>());
0689     lb->clearPrincipal();
0690   }
0691 
0692   void SubProcess::doBeginStream(unsigned int iID) {
0693     ServiceRegistry::Operate operate(serviceToken_);
0694     schedule_->beginStream(iID);
0695     for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doBeginStream(iID); });
0696   }
0697 
0698   void SubProcess::doEndStream(unsigned int iID) {
0699     ServiceRegistry::Operate operate(serviceToken_);
0700     schedule_->endStream(iID);
0701     for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doEndStream(iID); });
0702   }
0703 
0704   void SubProcess::doStreamBeginRunAsync(WaitingTaskHolder iHolder,
0705                                          unsigned int id,
0706                                          RunTransitionInfo const& iTransitionInfo) {
0707     using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0708 
0709     RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
0710     RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
0711 
0712     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0713     RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0714     beginStreamTransitionAsync<Traits>(
0715         std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
0716   }
0717 
0718   void SubProcess::doStreamEndRunAsync(WaitingTaskHolder iHolder,
0719                                        unsigned int id,
0720                                        RunTransitionInfo const& iTransitionInfo,
0721                                        bool cleaningUpAfterException) {
0722     using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0723 
0724     RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
0725     RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
0726 
0727     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0728     RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0729     endStreamTransitionAsync<Traits>(
0730         std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0731   }
0732 
0733   void SubProcess::doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder,
0734                                                      unsigned int id,
0735                                                      LumiTransitionInfo const& iTransitionInfo) {
0736     using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0737 
0738     LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
0739     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0740     LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0741     beginStreamTransitionAsync<Traits>(
0742         std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
0743   }
0744 
0745   void SubProcess::doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder,
0746                                                    unsigned int id,
0747                                                    LumiTransitionInfo const& iTransitionInfo,
0748                                                    bool cleaningUpAfterException) {
0749     LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
0750     using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0751     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0752     LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0753     endStreamTransitionAsync<Traits>(
0754         std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0755   }
0756 
0757   void SubProcess::propagateProducts(BranchType type, Principal const& parentPrincipal, Principal& principal) const {
0758     SelectedProducts const& keptVector = keptProducts()[type];
0759     for (auto const& item : keptVector) {
0760       BranchDescription const& desc = *item.first;
0761       ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
0762       if (parentProductResolver != nullptr) {
0763         ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
0764         if (productResolver != nullptr) {
0765           //Propagate the per event(run)(lumi)(processBlock) data for this product to the subprocess.
0766           productResolver->connectTo(*parentProductResolver, &parentPrincipal);
0767         }
0768       }
0769     }
0770   }
0771 
0772   bool SubProcess::parentProducedProductIsKept(Principal const& parentPrincipal, Principal& principal) const {
0773     SelectedProducts const& keptVector = keptProducts()[InProcess];
0774     for (auto const& item : keptVector) {
0775       BranchDescription const& desc = *item.first;
0776       assert(desc.branchType() == InProcess);
0777       ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
0778       if (parentProductResolver != nullptr) {
0779         ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
0780         if (productResolver != nullptr) {
0781           if (parentProductResolver->branchDescription().produced()) {
0782             return true;
0783           }
0784         }
0785       }
0786     }
0787     return false;
0788   }
0789 
0790   void SubProcess::updateBranchIDListHelper(BranchIDLists const& branchIDLists) {
0791     branchIDListHelper_->updateFromParent(branchIDLists);
0792     for_all(subProcesses_,
0793             [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
0794   }
0795 
0796   // Call respondToOpenInputFile() on all Modules
0797   void SubProcess::respondToOpenInputFile(FileBlock const& fb) {
0798     ServiceRegistry::Operate operate(serviceToken_);
0799     schedule_->respondToOpenInputFile(fb);
0800     for_all(subProcesses_, [&fb](auto& subProcess) { subProcess.respondToOpenInputFile(fb); });
0801   }
0802 
0803   // free function
0804   std::vector<ParameterSet> popSubProcessVParameterSet(ParameterSet& parameterSet) {
0805     std::vector<std::string> subProcesses =
0806         parameterSet.getUntrackedParameter<std::vector<std::string>>("@all_subprocesses");
0807     if (!subProcesses.empty()) {
0808       return parameterSet.popVParameterSet("subProcesses");
0809     }
0810     return {};
0811   }
0812 }  // namespace edm