Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-08-26 03:15:59

0001 #include "FWCore/Framework/interface/SubProcess.h"
0002 
0003 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0005 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0007 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0008 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0009 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0010 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0011 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0012 #include "DataFormats/Provenance/interface/SubProcessParentageHelper.h"
0013 #include "FWCore/Common/interface/SubProcessBlockHelper.h"
0014 #include "FWCore/Framework/interface/EventForOutput.h"
0015 #include "FWCore/Framework/interface/EventPrincipal.h"
0016 #include "FWCore/Framework/interface/FileBlock.h"
0017 #include "FWCore/Framework/interface/ProductResolverBase.h"
0018 #include "FWCore/Framework/interface/HistoryAppender.h"
0019 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0020 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0021 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0022 #include "FWCore/Framework/src/OutputModuleDescription.h"
0023 #include "FWCore/Framework/interface/RunPrincipal.h"
0024 #include "FWCore/Framework/interface/getAllTriggerNames.h"
0025 #include "FWCore/Framework/interface/TriggerNamesService.h"
0026 #include "FWCore/Framework/interface/ScheduleItems.h"
0027 #include "FWCore/Framework/interface/EventSetupsController.h"
0028 #include "FWCore/Framework/interface/SignallingProductRegistry.h"
0029 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0030 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0031 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0032 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0033 #include "FWCore/Framework/interface/ESRecordsToProxyIndices.h"
0034 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
0035 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0036 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0037 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0038 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0039 #include "FWCore/Concurrency/interface/WaitingTask.h"
0040 #include "FWCore/Concurrency/interface/chain_first.h"
0041 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0042 
0043 #include "boost/range/adaptor/reversed.hpp"
0044 
0045 #include <cassert>
0046 #include <exception>
0047 #include <string>
0048 
0049 namespace edm {
0050 
0051   SubProcess::SubProcess(ParameterSet& parameterSet,
0052                          ParameterSet const& topLevelParameterSet,
0053                          std::shared_ptr<ProductRegistry const> parentProductRegistry,
0054                          std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
0055                          ProcessBlockHelperBase const& parentProcessBlockHelper,
0056                          ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
0057                          SubProcessParentageHelper const& parentSubProcessParentageHelper,
0058                          eventsetup::EventSetupsController& esController,
0059                          ActivityRegistry& parentActReg,
0060                          ServiceToken const& token,
0061                          serviceregistry::ServiceLegacy iLegacy,
0062                          PreallocationConfiguration const& preallocConfig,
0063                          ProcessContext const* parentProcessContext)
0064       : EDConsumerBase(),
0065         serviceToken_(),
0066         parentPreg_(parentProductRegistry),
0067         preg_(),
0068         branchIDListHelper_(),
0069         act_table_(),
0070         processConfiguration_(),
0071         historyLumiOffset_(preallocConfig.numberOfStreams()),
0072         historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
0073         processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
0074         historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
0075         principalCache_(),
0076         esp_(),
0077         schedule_(),
0078         parentToChildPhID_(),
0079         subProcesses_(),
0080         processParameterSet_(),
0081         productSelectorRules_(parameterSet, "outputCommands", "OutputModule"),
0082         productSelector_(),
0083         wantAllEvents_(true) {
0084     //Setup the event selection
0085     Service<service::TriggerNamesService> tns;
0086 
0087     ParameterSet selectevents = parameterSet.getUntrackedParameterSet("SelectEvents", ParameterSet());
0088 
0089     selectevents.registerIt();  // Just in case this PSet is not registered
0090     wantAllEvents_ = detail::configureEventSelector(
0091         selectevents, tns->getProcessName(), getAllTriggerNames(), selectors_, consumesCollector());
0092     std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0093     selector_config_id_ = detail::registerProperSelectionInfo(
0094         selectevents, "", outputModulePathPositions, parentProductRegistry->anyProductProduced());
0095 
0096     std::map<BranchID, bool> keepAssociation;
0097     selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
0098 
0099     std::string const maxEvents("maxEvents");
0100     std::string const maxLumis("maxLuminosityBlocks");
0101 
0102     // propagate_const<T> has no reset() function
0103     processParameterSet_ =
0104         std::unique_ptr<ParameterSet>(parameterSet.popParameterSet(std::string("process")).release());
0105 
0106     // if this process has a maxEvents or maxLuminosityBlocks parameter set, remove them.
0107     if (processParameterSet_->exists(maxEvents)) {
0108       processParameterSet_->popParameterSet(maxEvents);
0109     }
0110     if (processParameterSet_->exists(maxLumis)) {
0111       processParameterSet_->popParameterSet(maxLumis);
0112     }
0113 
0114     // if the top level process has a maxEvents or maxLuminosityBlocks parameter set, add them to this process.
0115     if (topLevelParameterSet.exists(maxEvents)) {
0116       processParameterSet_->addUntrackedParameter<ParameterSet>(
0117           maxEvents, topLevelParameterSet.getUntrackedParameterSet(maxEvents));
0118     }
0119     if (topLevelParameterSet.exists(maxLumis)) {
0120       processParameterSet_->addUntrackedParameter<ParameterSet>(
0121           maxLumis, topLevelParameterSet.getUntrackedParameterSet(maxLumis));
0122     }
0123 
0124     // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
0125     auto subProcessVParameterSet = popSubProcessVParameterSet(*processParameterSet_);
0126     bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
0127 
0128     // Validates the parameters in the 'options', 'maxEvents', and 'maxLuminosityBlocks'
0129     // top level parameter sets. Default values are also set in here if the
0130     // parameters were not explicitly set.
0131     validateTopLevelParameterSets(processParameterSet_.get());
0132 
0133     processBlockHelper_ = std::make_shared<SubProcessBlockHelper>();
0134 
0135     ScheduleItems items(*parentProductRegistry, *this, *processBlockHelper_, parentProcessBlockHelper);
0136     actReg_ = items.actReg_;
0137 
0138     //initialize the services
0139     ServiceToken iToken;
0140 
0141     // get any configured services.
0142     auto serviceSets = processParameterSet_->popVParameterSet(std::string("services"));
0143 
0144     ServiceToken newToken = items.initServices(serviceSets, *processParameterSet_, token, iLegacy, false);
0145     parentActReg.connectToSubProcess(*items.actReg_);
0146     serviceToken_ = items.addCPRandTNS(*processParameterSet_, newToken);
0147 
0148     //make the services available
0149     ServiceRegistry::Operate operate(serviceToken_);
0150 
0151     // intialize miscellaneous items
0152     items.initMisc(*processParameterSet_);
0153 
0154     // intialize the event setup provider
0155     esp_ = esController.makeProvider(*processParameterSet_, actReg_.get());
0156 
0157     branchIDListHelper_ = items.branchIDListHelper();
0158     updateBranchIDListHelper(parentBranchIDListHelper->branchIDLists());
0159 
0160     thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0161     thinnedAssociationsHelper_->updateFromParentProcess(
0162         parentThinnedAssociationsHelper, keepAssociation, droppedBranchIDToKeptBranchID_);
0163 
0164     // intialize the Schedule
0165     schedule_ = items.initSchedule(
0166         *processParameterSet_, hasSubProcesses, preallocConfig, &processContext_, *processBlockHelper_);
0167 
0168     // set the items
0169     act_table_ = std::move(items.act_table_);
0170     preg_ = items.preg();
0171 
0172     subProcessParentageHelper_ = items.subProcessParentageHelper();
0173     subProcessParentageHelper_->update(parentSubProcessParentageHelper, *parentProductRegistry);
0174 
0175     //CMS-THREADING this only works since Run/Lumis are synchronous so when principalCache asks for
0176     // the reducedProcessHistoryID from a full ProcessHistoryID that registry will not be in use by
0177     // another thread. We really need to change how this is done in the PrincipalCache.
0178     principalCache_.setProcessHistoryRegistry(processHistoryRegistries_[historyRunOffset_]);
0179 
0180     processConfiguration_ = items.processConfiguration();
0181     processContext_.setProcessConfiguration(processConfiguration_.get());
0182     processContext_.setParentProcessContext(parentProcessContext);
0183 
0184     principalCache_.setNumberOfConcurrentPrincipals(preallocConfig);
0185     for (unsigned int index = 0; index < preallocConfig.numberOfStreams(); ++index) {
0186       auto ep = std::make_shared<EventPrincipal>(preg_,
0187                                                  branchIDListHelper(),
0188                                                  thinnedAssociationsHelper(),
0189                                                  *processConfiguration_,
0190                                                  &(historyAppenders_[index]),
0191                                                  index,
0192                                                  false /*not primary process*/,
0193                                                  &*processBlockHelper_);
0194       principalCache_.insert(ep);
0195     }
0196     for (unsigned int index = 0; index < preallocConfig.numberOfLuminosityBlocks(); ++index) {
0197       auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
0198           preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_ + index]), index, false);
0199       principalCache_.insert(std::move(lbpp));
0200     }
0201 
0202     {
0203       auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
0204       principalCache_.insert(std::move(pb));
0205 
0206       auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
0207       principalCache_.insertForInput(std::move(pbForInput));
0208     }
0209 
0210     inUseLumiPrincipals_.resize(preallocConfig.numberOfLuminosityBlocks());
0211 
0212     subProcesses_.reserve(subProcessVParameterSet.size());
0213     for (auto& subProcessPSet : subProcessVParameterSet) {
0214       subProcesses_.emplace_back(subProcessPSet,
0215                                  topLevelParameterSet,
0216                                  preg_,
0217                                  branchIDListHelper(),
0218                                  *processBlockHelper_,
0219                                  *thinnedAssociationsHelper_,
0220                                  *subProcessParentageHelper_,
0221                                  esController,
0222                                  *items.actReg_,
0223                                  newToken,
0224                                  iLegacy,
0225                                  preallocConfig,
0226                                  &processContext_);
0227     }
0228   }
0229 
0230   SubProcess::~SubProcess() {}
0231 
0232   std::vector<ModuleProcessName> SubProcess::keepOnlyConsumedUnscheduledModules(bool deleteModules) {
0233     schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0234     pathsAndConsumesOfModules_.initialize(schedule_.get(), preg_);
0235 
0236     // Note: all these may throw
0237     checkForModuleDependencyCorrectness(pathsAndConsumesOfModules_, false);
0238 
0239     // Consumes information from the child SubProcesses
0240     std::vector<ModuleProcessName> consumedByChildren;
0241     for_all(subProcesses_, [&consumedByChildren, deleteModules](auto& subProcess) {
0242       auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
0243       if (consumedByChildren.empty()) {
0244         std::swap(consumedByChildren, c);
0245       } else if (not c.empty()) {
0246         std::vector<ModuleProcessName> tmp;
0247         tmp.reserve(consumedByChildren.size() + c.size());
0248         std::merge(consumedByChildren.begin(), consumedByChildren.end(), c.begin(), c.end(), std::back_inserter(tmp));
0249         std::swap(consumedByChildren, tmp);
0250       }
0251     });
0252 
0253     // Non-consumed unscheduled modules in this SubProcess, take into account of the consumes from child SubProcesses
0254     if (deleteModules) {
0255       if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedByChildren);
0256           not unusedModules.empty()) {
0257         pathsAndConsumesOfModules_.removeModules(unusedModules);
0258 
0259         edm::LogInfo("DeleteModules").log([&unusedModules, this](auto& l) {
0260           l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
0261                "and "
0262                "therefore they are deleted from SubProcess "
0263             << processConfiguration_->processName() << " before beginJob transition.";
0264           for (auto const& description : unusedModules) {
0265             l << "\n " << description->moduleLabel();
0266           }
0267         });
0268         for (auto const& description : unusedModules) {
0269           schedule_->deleteModule(description->moduleLabel(), actReg_.get());
0270         }
0271       }
0272     }
0273 
0274     // Products possibly consumed from the parent (Sub)Process
0275     for (auto const& description : pathsAndConsumesOfModules_.allModules()) {
0276       for (auto const& dep :
0277            pathsAndConsumesOfModules_.modulesInPreviousProcessesWhoseProductsAreConsumedBy(description->id())) {
0278         auto it = std::lower_bound(consumedByChildren.begin(),
0279                                    consumedByChildren.end(),
0280                                    ModuleProcessName{dep.moduleLabel(), dep.processName()});
0281         consumedByChildren.emplace(it, dep.moduleLabel(), dep.processName());
0282       }
0283     }
0284     return consumedByChildren;
0285   }
0286 
0287   void SubProcess::doBeginJob() { this->beginJob(); }
0288 
0289   void SubProcess::doEndJob() { endJob(); }
0290 
0291   void SubProcess::beginJob() {
0292     // If event selection is being used, the SubProcess class reads TriggerResults
0293     // object(s) in the parent process from the event. This next call is needed for
0294     // getByToken to work properly. Normally, this is done by the worker, but since
0295     // a SubProcess is not a module, it has no worker.
0296     updateLookup(InEvent, *parentPreg_->productLookup(InEvent), false);
0297 
0298     if (!droppedBranchIDToKeptBranchID().empty()) {
0299       fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID());
0300     }
0301     ServiceRegistry::Operate operate(serviceToken_);
0302     actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
0303     schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
0304     for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
0305   }
0306 
0307   void SubProcess::endJob() {
0308     ServiceRegistry::Operate operate(serviceToken_);
0309     ExceptionCollector c(
0310         "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
0311     schedule_->endJob(c);
0312     for (auto& subProcess : subProcesses_) {
0313       c.call([&subProcess]() { subProcess.doEndJob(); });
0314     }
0315     if (c.hasThrown()) {
0316       c.rethrow();
0317     }
0318   }
0319 
0320   void SubProcess::selectProducts(ProductRegistry const& preg,
0321                                   ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
0322                                   std::map<BranchID, bool>& keepAssociation) {
0323     if (productSelector_.initialized())
0324       return;
0325     productSelector_.initialize(productSelectorRules_, preg.allBranchDescriptions());
0326 
0327     // TODO: See if we can collapse keptProducts_ and productSelector_ into a
0328     // single object. See the notes in the header for ProductSelector
0329     // for more information.
0330 
0331     std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
0332     std::vector<BranchDescription const*> associationDescriptions;
0333     std::set<BranchID> keptProductsInEvent;
0334 
0335     for (auto const& it : preg.productList()) {
0336       BranchDescription const& desc = it.second;
0337       if (desc.transient()) {
0338         // if the class of the branch is marked transient, output nothing
0339       } else if (!desc.present() && !desc.produced()) {
0340         // else if the branch containing the product has been previously dropped,
0341         // output nothing
0342       } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) {
0343         associationDescriptions.push_back(&desc);
0344       } else if (productSelector_.selected(desc)) {
0345         keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0346       }
0347     }
0348 
0349     parentThinnedAssociationsHelper.selectAssociationProducts(
0350         associationDescriptions, keptProductsInEvent, keepAssociation);
0351 
0352     for (auto association : associationDescriptions) {
0353       if (keepAssociation[association->branchID()]) {
0354         keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0355       }
0356     }
0357 
0358     // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
0359     ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
0360   }
0361 
0362   void SubProcess::keepThisBranch(BranchDescription const& desc,
0363                                   std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
0364                                   std::set<BranchID>& keptProductsInEvent) {
0365     ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc);
0366 
0367     if (desc.branchType() == InEvent) {
0368       if (desc.produced()) {
0369         keptProductsInEvent.insert(desc.originalBranchID());
0370       } else {
0371         keptProductsInEvent.insert(desc.branchID());
0372       }
0373     }
0374     EDGetToken token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0375                                 InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()});
0376 
0377     // Now put it in the list of selected branches.
0378     keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
0379   }
0380 
0381   void SubProcess::fixBranchIDListsForEDAliases(
0382       std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID) {
0383     // Check for branches dropped while an EDAlias was kept.
0384     // Replace BranchID of each dropped branch with that of the kept alias.
0385     for (BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
0386       for (BranchID::value_type& branchID : branchIDList) {
0387         std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
0388             droppedBranchIDToKeptBranchID.find(branchID);
0389         if (iter != droppedBranchIDToKeptBranchID.end()) {
0390           branchID = iter->second;
0391         }
0392       }
0393     }
0394     for_all(subProcesses_, [&droppedBranchIDToKeptBranchID](auto& subProcess) {
0395       subProcess.fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID);
0396     });
0397   }
0398 
0399   void SubProcess::doEventAsync(WaitingTaskHolder iHolder,
0400                                 EventPrincipal const& ep,
0401                                 std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
0402     ServiceRegistry::Operate operate(serviceToken_);
0403     /* BEGIN relevant bits from OutputModule::doEvent */
0404     if (!wantAllEvents_) {
0405       EventForOutput e(ep, ModuleDescription(), nullptr);
0406       e.setConsumer(this);
0407       if (!selectors_.wantEvent(e)) {
0408         return;
0409       }
0410     }
0411     processAsync(std::move(iHolder), ep, iEventSetupImpls);
0412     /* END relevant bits from OutputModule::doEvent */
0413   }
0414 
0415   void SubProcess::processAsync(WaitingTaskHolder iHolder,
0416                                 EventPrincipal const& principal,
0417                                 std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
0418     EventAuxiliary aux(principal.aux());
0419     aux.setProcessHistoryID(principal.processHistoryID());
0420 
0421     EventSelectionIDVector esids{principal.eventSelectionIDs()};
0422     if (principal.productRegistry().anyProductProduced() || !wantAllEvents_) {
0423       esids.push_back(selector_config_id_);
0424     }
0425 
0426     EventPrincipal& ep = principalCache_.eventPrincipal(principal.streamID().value());
0427     auto& processHistoryRegistry = processHistoryRegistries_[principal.streamID().value()];
0428     processHistoryRegistry.registerProcessHistory(principal.processHistory());
0429     BranchListIndexes bli(principal.branchListIndexes());
0430     branchIDListHelper_->fixBranchListIndexes(bli);
0431     bool deepCopyRetriever = false;
0432     ep.fillEventPrincipal(
0433         aux,
0434         &principal.processHistory(),
0435         std::move(esids),
0436         std::move(bli),
0437         principal.eventToProcessBlockIndexes(),
0438         *(principal.productProvenanceRetrieverPtr()),  //NOTE: this transfers the per product provenance
0439         principal.reader(),
0440         deepCopyRetriever);
0441     ep.setLuminosityBlockPrincipal(inUseLumiPrincipals_[principal.luminosityBlockPrincipal().index()].get());
0442     propagateProducts(InEvent, principal, ep);
0443 
0444     using namespace edm::waiting_task;
0445     chain::first([&](auto nextTask) {
0446       EventTransitionInfo info(ep, *((*iEventSetupImpls)[esp_->subProcessIndex()]));
0447       schedule_->processOneEventAsync(std::move(nextTask), ep.streamID().value(), info, serviceToken_);
0448     }) | chain::ifThen(not subProcesses_.empty(), [this, &ep, iEventSetupImpls](auto nextTask) {
0449       for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
0450         subProcess.doEventAsync(nextTask, ep, iEventSetupImpls);
0451       }
0452     }) | chain::then([&ep](std::exception_ptr const* iPtr, auto nextTask) {
0453       ep.clearEventPrincipal();
0454       if (iPtr) {
0455         nextTask.doneWaiting(*iPtr);
0456       }
0457     }) | chain::runLast(std::move(iHolder));
0458   }
0459 
0460   template <>
0461   void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
0462       WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool cleaningUpAfterException) {
0463     ServiceRegistry::Operate operate(serviceToken_);
0464 
0465     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0466     ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0467     processBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
0468     propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
0469 
0470     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0471     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0472     beginGlobalTransitionAsync<Traits>(
0473         std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0474   }
0475 
0476   template <>
0477   void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
0478       WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool) {
0479     ServiceRegistry::Operate operate(serviceToken_);
0480 
0481     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0482     ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0483     processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0484     propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
0485 
0486     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0487     using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0488     beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
0489   }
0490 
0491   void SubProcess::doEndProcessBlockAsync(WaitingTaskHolder iHolder,
0492                                           ProcessBlockTransitionInfo const& iTransitionInfo,
0493                                           bool cleaningUpAfterException) {
0494     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0495     ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0496     propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
0497 
0498     ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0499 
0500     if (parentProducedProductIsKept(parentPrincipal, processBlockPrincipal)) {
0501       ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0502       inputProcessBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
0503       propagateProducts(InProcess, parentPrincipal, inputProcessBlockPrincipal);
0504       ProcessBlockTransitionInfo inputTransitionInfo(inputProcessBlockPrincipal);
0505 
0506       using namespace edm::waiting_task;
0507       chain::first([&](const std::exception_ptr*, auto nextTask) {
0508         using TraitsInput = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0509         beginGlobalTransitionAsync<TraitsInput>(std::move(nextTask),
0510                                                 *schedule_,
0511                                                 inputTransitionInfo,
0512                                                 serviceToken_,
0513                                                 subProcesses_,
0514                                                 cleaningUpAfterException);
0515       }) | chain::then([this](auto nextTask) { writeProcessBlockAsync(nextTask, ProcessBlockType::Input); }) |
0516           chain::then([this, info = transitionInfo, cleaningUpAfterException](std::exception_ptr const* iPtr,
0517                                                                               auto nextTask) mutable {
0518             ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0519             inputProcessBlockPrincipal.clearPrincipal();
0520             for (auto& s : subProcesses_) {
0521               s.clearProcessBlockPrincipal(ProcessBlockType::Input);
0522             }
0523             if (iPtr) {
0524               nextTask.doneWaiting(*iPtr);
0525             } else {
0526               using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0527               endGlobalTransitionAsync<Traits>(
0528                   std::move(nextTask), *schedule_, info, serviceToken_, subProcesses_, cleaningUpAfterException);
0529             }
0530           }) |
0531           chain::runLast(std::move(iHolder));
0532     } else {
0533       using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0534       endGlobalTransitionAsync<Traits>(
0535           std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0536     }
0537   }
0538 
0539   void SubProcess::doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const& iTransitionInfo) {
0540     ServiceRegistry::Operate operate(serviceToken_);
0541 
0542     RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
0543     auto aux = std::make_shared<RunAuxiliary>(parentPrincipal.aux());
0544     aux->setProcessHistoryID(parentPrincipal.processHistoryID());
0545     auto rpp = std::make_shared<RunPrincipal>(aux,
0546                                               preg_,
0547                                               *processConfiguration_,
0548                                               &(historyAppenders_[historyRunOffset_ + parentPrincipal.index()]),
0549                                               parentPrincipal.index(),
0550                                               false);
0551     auto& processHistoryRegistry = processHistoryRegistries_[historyRunOffset_ + parentPrincipal.index()];
0552     processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
0553     rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.reader());
0554     principalCache_.insert(rpp);
0555 
0556     ProcessHistoryID const& parentInputReducedPHID = parentPrincipal.reducedProcessHistoryID();
0557     ProcessHistoryID const& inputReducedPHID = rpp->reducedProcessHistoryID();
0558 
0559     parentToChildPhID_.insert(std::make_pair(parentInputReducedPHID, inputReducedPHID));
0560 
0561     RunPrincipal& rp = *principalCache_.runPrincipalPtr();
0562     propagateProducts(InRun, parentPrincipal, rp);
0563 
0564     RunTransitionInfo transitionInfo(rp, esp_->eventSetupImpl());
0565     using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0566     beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
0567   }
0568 
0569   void SubProcess::doEndRunAsync(WaitingTaskHolder iHolder,
0570                                  RunTransitionInfo const& iTransitionInfo,
0571                                  bool cleaningUpAfterException) {
0572     RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
0573     RunPrincipal& rp = *principalCache_.runPrincipalPtr();
0574     propagateProducts(InRun, parentPrincipal, rp);
0575 
0576     RunTransitionInfo transitionInfo(rp, esp_->eventSetupImpl());
0577     using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0578     endGlobalTransitionAsync<Traits>(
0579         std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0580   }
0581 
0582   void SubProcess::writeProcessBlockAsync(edm::WaitingTaskHolder task, ProcessBlockType processBlockType) {
0583     using namespace edm::waiting_task;
0584     chain::first([&](std::exception_ptr const*, auto nextTask) {
0585       ServiceRegistry::Operate operate(serviceToken_);
0586       schedule_->writeProcessBlockAsync(
0587           nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
0588     }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
0589       ServiceRegistry::Operate operate(serviceToken_);
0590       for (auto& s : subProcesses_) {
0591         s.writeProcessBlockAsync(nextTask, processBlockType);
0592       }
0593     }) | chain::runLast(std::move(task));
0594   }
0595 
0596   void SubProcess::writeRunAsync(edm::WaitingTaskHolder task,
0597                                  ProcessHistoryID const& parentPhID,
0598                                  int runNumber,
0599                                  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
0600     ServiceRegistry::Operate operate(serviceToken_);
0601     std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
0602     assert(it != parentToChildPhID_.end());
0603     auto const& childPhID = it->second;
0604 
0605     using namespace edm::waiting_task;
0606     chain::first([&](std::exception_ptr const*, auto nextTask) {
0607       ServiceRegistry::Operate operate(serviceToken_);
0608       schedule_->writeRunAsync(nextTask,
0609                                principalCache_.runPrincipal(childPhID, runNumber),
0610                                &processContext_,
0611                                actReg_.get(),
0612                                mergeableRunProductMetadata);
0613     }) |
0614         chain::ifThen(not subProcesses_.empty(),
0615                       [this, childPhID, runNumber, mergeableRunProductMetadata](auto nextTask) {
0616                         ServiceRegistry::Operate operateWriteRun(serviceToken_);
0617                         for (auto& s : subProcesses_) {
0618                           s.writeRunAsync(nextTask, childPhID, runNumber, mergeableRunProductMetadata);
0619                         }
0620                       }) |
0621         chain::runLast(task);
0622   }
0623 
0624   void SubProcess::deleteRunFromCache(ProcessHistoryID const& parentPhID, int runNumber) {
0625     std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
0626     assert(it != parentToChildPhID_.end());
0627     auto const& childPhID = it->second;
0628     principalCache_.deleteRun(childPhID, runNumber);
0629     for_all(subProcesses_,
0630             [&childPhID, runNumber](auto& subProcess) { subProcess.deleteRunFromCache(childPhID, runNumber); });
0631   }
0632 
0633   void SubProcess::clearProcessBlockPrincipal(ProcessBlockType processBlockType) {
0634     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal(processBlockType);
0635     processBlockPrincipal.clearPrincipal();
0636     for (auto& s : subProcesses_) {
0637       s.clearProcessBlockPrincipal(processBlockType);
0638     }
0639   }
0640 
0641   void SubProcess::doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const& iTransitionInfo) {
0642     ServiceRegistry::Operate operate(serviceToken_);
0643 
0644     LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0645     auto aux = parentPrincipal.aux();
0646     aux.setProcessHistoryID(parentPrincipal.processHistoryID());
0647     auto lbpp = principalCache_.getAvailableLumiPrincipalPtr();
0648     lbpp->setAux(aux);
0649     auto& processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_ + lbpp->index()];
0650     inUseLumiPrincipals_[parentPrincipal.index()] = lbpp;
0651     processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
0652     lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.processHistory(), parentPrincipal.reader());
0653     lbpp->setRunPrincipal(principalCache_.runPrincipalPtr());
0654     LuminosityBlockPrincipal& lbp = *lbpp;
0655     propagateProducts(InLumi, parentPrincipal, lbp);
0656 
0657     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0658     LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0659     using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0660     beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
0661   }
0662 
0663   void SubProcess::doEndLuminosityBlockAsync(WaitingTaskHolder iHolder,
0664                                              LumiTransitionInfo const& iTransitionInfo,
0665                                              bool cleaningUpAfterException) {
0666     LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0667     LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[parentPrincipal.index()];
0668     propagateProducts(InLumi, parentPrincipal, lbp);
0669 
0670     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0671     LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0672     using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0673     endGlobalTransitionAsync<Traits>(
0674         std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0675   }
0676 
0677   void SubProcess::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& principal) {
0678     using namespace edm::waiting_task;
0679 
0680     auto l = inUseLumiPrincipals_[principal.index()];
0681     chain::first([&](std::exception_ptr const*, auto nextTask) {
0682       ServiceRegistry::Operate operate(serviceToken_);
0683       schedule_->writeLumiAsync(nextTask, *l, &processContext_, actReg_.get());
0684     }) | chain::ifThen(not subProcesses_.empty(), [this, l](auto nextTask) {
0685       ServiceRegistry::Operate operateWriteLumi(serviceToken_);
0686       for (auto& s : subProcesses_) {
0687         s.writeLumiAsync(nextTask, *l);
0688       }
0689     }) | chain::runLast(std::move(task));
0690   }
0691 
0692   void SubProcess::deleteLumiFromCache(LuminosityBlockPrincipal& principal) {
0693     //release from list but stay around till end of routine
0694     auto lb = std::move(inUseLumiPrincipals_[principal.index()]);
0695     for (auto& s : subProcesses_) {
0696       s.deleteLumiFromCache(*lb);
0697     }
0698     lb->clearPrincipal();
0699   }
0700 
0701   void SubProcess::doBeginStream(unsigned int iID) {
0702     ServiceRegistry::Operate operate(serviceToken_);
0703     schedule_->beginStream(iID);
0704     for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doBeginStream(iID); });
0705   }
0706 
0707   void SubProcess::doEndStream(unsigned int iID) {
0708     ServiceRegistry::Operate operate(serviceToken_);
0709     schedule_->endStream(iID);
0710     for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doEndStream(iID); });
0711   }
0712 
0713   void SubProcess::doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int id, RunTransitionInfo const&) {
0714     using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0715 
0716     RunPrincipal& rp = *principalCache_.runPrincipalPtr();
0717 
0718     RunTransitionInfo transitionInfo(rp, esp_->eventSetupImpl());
0719     beginStreamTransitionAsync<Traits>(
0720         std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
0721   }
0722 
0723   void SubProcess::doStreamEndRunAsync(WaitingTaskHolder iHolder,
0724                                        unsigned int id,
0725                                        RunTransitionInfo const&,
0726                                        bool cleaningUpAfterException) {
0727     RunPrincipal& rp = *principalCache_.runPrincipalPtr();
0728     using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0729 
0730     RunTransitionInfo transitionInfo(rp, esp_->eventSetupImpl());
0731     endStreamTransitionAsync<Traits>(
0732         std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0733   }
0734 
0735   void SubProcess::doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder,
0736                                                      unsigned int id,
0737                                                      LumiTransitionInfo const& iTransitionInfo) {
0738     using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0739 
0740     LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
0741     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0742     LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0743     beginStreamTransitionAsync<Traits>(
0744         std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
0745   }
0746 
0747   void SubProcess::doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder,
0748                                                    unsigned int id,
0749                                                    LumiTransitionInfo const& iTransitionInfo,
0750                                                    bool cleaningUpAfterException) {
0751     LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
0752     using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0753     std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0754     LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0755     endStreamTransitionAsync<Traits>(
0756         std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0757   }
0758 
0759   void SubProcess::propagateProducts(BranchType type, Principal const& parentPrincipal, Principal& principal) const {
0760     SelectedProducts const& keptVector = keptProducts()[type];
0761     for (auto const& item : keptVector) {
0762       BranchDescription const& desc = *item.first;
0763       ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
0764       if (parentProductResolver != nullptr) {
0765         ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
0766         if (productResolver != nullptr) {
0767           //Propagate the per event(run)(lumi)(processBlock) data for this product to the subprocess.
0768           productResolver->connectTo(*parentProductResolver, &parentPrincipal);
0769         }
0770       }
0771     }
0772   }
0773 
0774   bool SubProcess::parentProducedProductIsKept(Principal const& parentPrincipal, Principal& principal) const {
0775     SelectedProducts const& keptVector = keptProducts()[InProcess];
0776     for (auto const& item : keptVector) {
0777       BranchDescription const& desc = *item.first;
0778       assert(desc.branchType() == InProcess);
0779       ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
0780       if (parentProductResolver != nullptr) {
0781         ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
0782         if (productResolver != nullptr) {
0783           if (parentProductResolver->branchDescription().produced()) {
0784             return true;
0785           }
0786         }
0787       }
0788     }
0789     return false;
0790   }
0791 
0792   void SubProcess::updateBranchIDListHelper(BranchIDLists const& branchIDLists) {
0793     branchIDListHelper_->updateFromParent(branchIDLists);
0794     for_all(subProcesses_,
0795             [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
0796   }
0797 
0798   // Call respondToOpenInputFile() on all Modules
0799   void SubProcess::respondToOpenInputFile(FileBlock const& fb) {
0800     ServiceRegistry::Operate operate(serviceToken_);
0801     schedule_->respondToOpenInputFile(fb);
0802     for_all(subProcesses_, [&fb](auto& subProcess) { subProcess.respondToOpenInputFile(fb); });
0803   }
0804 
0805   // free function
0806   std::vector<ParameterSet> popSubProcessVParameterSet(ParameterSet& parameterSet) {
0807     std::vector<std::string> subProcesses =
0808         parameterSet.getUntrackedParameter<std::vector<std::string>>("@all_subprocesses");
0809     if (!subProcesses.empty()) {
0810       return parameterSet.popVParameterSet("subProcesses");
0811     }
0812     return {};
0813   }
0814 }  // namespace edm