File indexing completed on 2024-04-06 12:12:15
0001 #include "FWCore/Framework/interface/SubProcess.h"
0002
0003 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0004 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0005 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0007 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0008 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0009 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0010 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0011 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0012 #include "DataFormats/Provenance/interface/SubProcessParentageHelper.h"
0013 #include "FWCore/Common/interface/SubProcessBlockHelper.h"
0014 #include "FWCore/Framework/interface/EventForOutput.h"
0015 #include "FWCore/Framework/interface/EventPrincipal.h"
0016 #include "FWCore/Framework/interface/FileBlock.h"
0017 #include "FWCore/Framework/interface/ProductResolverBase.h"
0018 #include "FWCore/Framework/interface/HistoryAppender.h"
0019 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0020 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0021 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0022 #include "FWCore/Framework/src/OutputModuleDescription.h"
0023 #include "FWCore/Framework/interface/RunPrincipal.h"
0024 #include "FWCore/Framework/interface/getAllTriggerNames.h"
0025 #include "FWCore/Framework/interface/TriggerNamesService.h"
0026 #include "FWCore/Framework/interface/ScheduleItems.h"
0027 #include "FWCore/Framework/interface/EventSetupsController.h"
0028 #include "FWCore/Framework/interface/SignallingProductRegistry.h"
0029 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0030 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0031 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0032 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0033 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0034 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
0035 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0036 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0037 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0038 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0039 #include "FWCore/Concurrency/interface/WaitingTask.h"
0040 #include "FWCore/Concurrency/interface/chain_first.h"
0041 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0042
0043 #include "boost/range/adaptor/reversed.hpp"
0044
0045 #include <cassert>
0046 #include <exception>
0047 #include <string>
0048
0049 namespace edm {
0050
0051 SubProcess::SubProcess(ParameterSet& parameterSet,
0052 ParameterSet const& topLevelParameterSet,
0053 std::shared_ptr<ProductRegistry const> parentProductRegistry,
0054 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
0055 ProcessBlockHelperBase const& parentProcessBlockHelper,
0056 ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
0057 SubProcessParentageHelper const& parentSubProcessParentageHelper,
0058 eventsetup::EventSetupsController& esController,
0059 ActivityRegistry& parentActReg,
0060 ServiceToken const& token,
0061 serviceregistry::ServiceLegacy iLegacy,
0062 PreallocationConfiguration const& preallocConfig,
0063 ProcessContext const* parentProcessContext,
0064 ModuleTypeResolverMaker const* typeResolverMaker)
0065 : EDConsumerBase(),
0066 serviceToken_(),
0067 parentPreg_(parentProductRegistry),
0068 preg_(),
0069 branchIDListHelper_(),
0070 act_table_(),
0071 processConfiguration_(),
0072 historyLumiOffset_(preallocConfig.numberOfStreams()),
0073 historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
0074 processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
0075 historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
0076 principalCache_(),
0077 esp_(),
0078 schedule_(),
0079 subProcesses_(),
0080 processParameterSet_(),
0081 productSelectorRules_(parameterSet, "outputCommands", "OutputModule"),
0082 productSelector_(),
0083 wantAllEvents_(true) {
0084
0085 Service<service::TriggerNamesService> tns;
0086
0087 ParameterSet selectevents = parameterSet.getUntrackedParameterSet("SelectEvents", ParameterSet());
0088
0089 selectevents.registerIt();
0090 wantAllEvents_ = detail::configureEventSelector(
0091 selectevents, tns->getProcessName(), getAllTriggerNames(), selectors_, consumesCollector());
0092 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0093 selector_config_id_ = detail::registerProperSelectionInfo(
0094 selectevents, "", outputModulePathPositions, parentProductRegistry->anyProductProduced());
0095
0096 std::map<BranchID, bool> keepAssociation;
0097 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
0098
0099 std::string const maxEvents("maxEvents");
0100 std::string const maxLumis("maxLuminosityBlocks");
0101
0102
0103 processParameterSet_ =
0104 std::unique_ptr<ParameterSet>(parameterSet.popParameterSet(std::string("process")).release());
0105
0106
0107 if (processParameterSet_->exists(maxEvents)) {
0108 processParameterSet_->popParameterSet(maxEvents);
0109 }
0110 if (processParameterSet_->exists(maxLumis)) {
0111 processParameterSet_->popParameterSet(maxLumis);
0112 }
0113
0114
0115 if (topLevelParameterSet.exists(maxEvents)) {
0116 processParameterSet_->addUntrackedParameter<ParameterSet>(
0117 maxEvents, topLevelParameterSet.getUntrackedParameterSet(maxEvents));
0118 }
0119 if (topLevelParameterSet.exists(maxLumis)) {
0120 processParameterSet_->addUntrackedParameter<ParameterSet>(
0121 maxLumis, topLevelParameterSet.getUntrackedParameterSet(maxLumis));
0122 }
0123
0124
0125 auto subProcessVParameterSet = popSubProcessVParameterSet(*processParameterSet_);
0126 bool hasSubProcesses = !subProcessVParameterSet.empty();
0127
0128
0129
0130
0131 validateTopLevelParameterSets(processParameterSet_.get());
0132
0133 processBlockHelper_ = std::make_shared<SubProcessBlockHelper>();
0134
0135 ScheduleItems items(*parentProductRegistry, *this, *processBlockHelper_, parentProcessBlockHelper);
0136 actReg_ = items.actReg_;
0137
0138
0139 ServiceToken iToken;
0140
0141
0142 auto serviceSets = processParameterSet_->popVParameterSet(std::string("services"));
0143
0144 ServiceToken newToken = items.initServices(serviceSets, *processParameterSet_, token, iLegacy, false);
0145 parentActReg.connectToSubProcess(*items.actReg_);
0146 serviceToken_ = items.addCPRandTNS(*processParameterSet_, newToken);
0147
0148
0149 ServiceRegistry::Operate operate(serviceToken_);
0150
0151
0152 items.initMisc(*processParameterSet_);
0153
0154
0155 esp_ = esController.makeProvider(*processParameterSet_, actReg_.get());
0156
0157 branchIDListHelper_ = items.branchIDListHelper();
0158 updateBranchIDListHelper(parentBranchIDListHelper->branchIDLists());
0159
0160 thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0161 thinnedAssociationsHelper_->updateFromParentProcess(
0162 parentThinnedAssociationsHelper, keepAssociation, droppedBranchIDToKeptBranchID_);
0163
0164
0165 schedule_ = items.initSchedule(*processParameterSet_,
0166 hasSubProcesses,
0167 preallocConfig,
0168 &processContext_,
0169 typeResolverMaker,
0170 *processBlockHelper_);
0171
0172
0173 act_table_ = std::move(items.act_table_);
0174 preg_ = items.preg();
0175
0176 subProcessParentageHelper_ = items.subProcessParentageHelper();
0177 subProcessParentageHelper_->update(parentSubProcessParentageHelper, *parentProductRegistry);
0178
0179 processConfiguration_ = items.processConfiguration();
0180 processContext_.setProcessConfiguration(processConfiguration_.get());
0181 processContext_.setParentProcessContext(parentProcessContext);
0182
0183 principalCache_.setNumberOfConcurrentPrincipals(preallocConfig);
0184 for (unsigned int index = 0; index < preallocConfig.numberOfStreams(); ++index) {
0185 auto ep = std::make_shared<EventPrincipal>(preg_,
0186 branchIDListHelper(),
0187 thinnedAssociationsHelper(),
0188 *processConfiguration_,
0189 &(historyAppenders_[index]),
0190 index,
0191 false ,
0192 &*processBlockHelper_);
0193 principalCache_.insert(ep);
0194 }
0195
0196 for (unsigned int index = 0; index < preallocConfig.numberOfRuns(); ++index) {
0197 auto rpp = std::make_unique<RunPrincipal>(
0198 preg_, *processConfiguration_, &(historyAppenders_[historyRunOffset_ + index]), index, false);
0199 principalCache_.insert(std::move(rpp));
0200 }
0201
0202 for (unsigned int index = 0; index < preallocConfig.numberOfLuminosityBlocks(); ++index) {
0203 auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
0204 preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_ + index]), index, false);
0205 principalCache_.insert(std::move(lbpp));
0206 }
0207
0208 {
0209 auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
0210 principalCache_.insert(std::move(pb));
0211
0212 auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
0213 principalCache_.insertForInput(std::move(pbForInput));
0214 }
0215
0216 inUseRunPrincipals_.resize(preallocConfig.numberOfRuns());
0217 inUseLumiPrincipals_.resize(preallocConfig.numberOfLuminosityBlocks());
0218
0219 subProcesses_.reserve(subProcessVParameterSet.size());
0220 for (auto& subProcessPSet : subProcessVParameterSet) {
0221 subProcesses_.emplace_back(subProcessPSet,
0222 topLevelParameterSet,
0223 preg_,
0224 branchIDListHelper(),
0225 *processBlockHelper_,
0226 *thinnedAssociationsHelper_,
0227 *subProcessParentageHelper_,
0228 esController,
0229 *items.actReg_,
0230 newToken,
0231 iLegacy,
0232 preallocConfig,
0233 &processContext_,
0234 typeResolverMaker);
0235 }
0236 }
0237
0238 SubProcess::~SubProcess() {}
0239
0240 std::vector<ModuleProcessName> SubProcess::keepOnlyConsumedUnscheduledModules(bool deleteModules) {
0241 schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0242 pathsAndConsumesOfModules_.initialize(schedule_.get(), preg_);
0243
0244
0245 checkForModuleDependencyCorrectness(pathsAndConsumesOfModules_, false);
0246
0247
0248 std::vector<ModuleProcessName> consumedByChildren;
0249 for_all(subProcesses_, [&consumedByChildren, deleteModules](auto& subProcess) {
0250 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
0251 if (consumedByChildren.empty()) {
0252 std::swap(consumedByChildren, c);
0253 } else if (not c.empty()) {
0254 std::vector<ModuleProcessName> tmp;
0255 tmp.reserve(consumedByChildren.size() + c.size());
0256 std::merge(consumedByChildren.begin(), consumedByChildren.end(), c.begin(), c.end(), std::back_inserter(tmp));
0257 std::swap(consumedByChildren, tmp);
0258 }
0259 });
0260
0261
0262 if (deleteModules) {
0263 if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedByChildren);
0264 not unusedModules.empty()) {
0265 pathsAndConsumesOfModules_.removeModules(unusedModules);
0266
0267 edm::LogInfo("DeleteModules").log([&unusedModules, this](auto& l) {
0268 l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
0269 "and "
0270 "therefore they are deleted from SubProcess "
0271 << processConfiguration_->processName() << " before beginJob transition.";
0272 for (auto const& description : unusedModules) {
0273 l << "\n " << description->moduleLabel();
0274 }
0275 });
0276 for (auto const& description : unusedModules) {
0277 schedule_->deleteModule(description->moduleLabel(), actReg_.get());
0278 }
0279 }
0280 }
0281
0282
0283 for (auto const& description : pathsAndConsumesOfModules_.allModules()) {
0284 for (auto const& dep :
0285 pathsAndConsumesOfModules_.modulesInPreviousProcessesWhoseProductsAreConsumedBy(description->id())) {
0286 auto it = std::lower_bound(consumedByChildren.begin(),
0287 consumedByChildren.end(),
0288 ModuleProcessName{dep.moduleLabel(), dep.processName()});
0289 consumedByChildren.emplace(it, dep.moduleLabel(), dep.processName());
0290 }
0291 }
0292 return consumedByChildren;
0293 }
0294
0295 void SubProcess::doBeginJob() { this->beginJob(); }
0296
0297 void SubProcess::doEndJob() { endJob(); }
0298
0299 void SubProcess::beginJob() {
0300
0301
0302
0303
0304 updateLookup(InEvent, *parentPreg_->productLookup(InEvent), false);
0305
0306 if (!droppedBranchIDToKeptBranchID().empty()) {
0307 fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID());
0308 }
0309 ServiceRegistry::Operate operate(serviceToken_);
0310 actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
0311 schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
0312 for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
0313 }
0314
0315 void SubProcess::endJob() {
0316 ServiceRegistry::Operate operate(serviceToken_);
0317 ExceptionCollector c(
0318 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
0319 schedule_->endJob(c);
0320 for (auto& subProcess : subProcesses_) {
0321 c.call([&subProcess]() { subProcess.doEndJob(); });
0322 }
0323 if (c.hasThrown()) {
0324 c.rethrow();
0325 }
0326 }
0327
0328 void SubProcess::selectProducts(ProductRegistry const& preg,
0329 ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
0330 std::map<BranchID, bool>& keepAssociation) {
0331 if (productSelector_.initialized())
0332 return;
0333 productSelector_.initialize(productSelectorRules_, preg.allBranchDescriptions());
0334
0335
0336
0337
0338
0339 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
0340 std::vector<BranchDescription const*> associationDescriptions;
0341 std::set<BranchID> keptProductsInEvent;
0342
0343 for (auto const& it : preg.productList()) {
0344 BranchDescription const& desc = it.second;
0345 if (desc.transient()) {
0346
0347 } else if (!desc.present() && !desc.produced()) {
0348
0349
0350 } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) {
0351 associationDescriptions.push_back(&desc);
0352 } else if (productSelector_.selected(desc)) {
0353 keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0354 }
0355 }
0356
0357 parentThinnedAssociationsHelper.selectAssociationProducts(
0358 associationDescriptions, keptProductsInEvent, keepAssociation);
0359
0360 for (auto association : associationDescriptions) {
0361 if (keepAssociation[association->branchID()]) {
0362 keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
0363 }
0364 }
0365
0366
0367 ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
0368 }
0369
0370 void SubProcess::keepThisBranch(BranchDescription const& desc,
0371 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
0372 std::set<BranchID>& keptProductsInEvent) {
0373 ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc);
0374
0375 if (desc.branchType() == InEvent) {
0376 if (desc.produced()) {
0377 keptProductsInEvent.insert(desc.originalBranchID());
0378 } else {
0379 keptProductsInEvent.insert(desc.branchID());
0380 }
0381 }
0382 EDGetToken token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
0383 InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()});
0384
0385
0386 keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
0387 }
0388
0389 void SubProcess::fixBranchIDListsForEDAliases(
0390 std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID) {
0391
0392
0393 for (BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
0394 for (BranchID::value_type& branchID : branchIDList) {
0395 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
0396 droppedBranchIDToKeptBranchID.find(branchID);
0397 if (iter != droppedBranchIDToKeptBranchID.end()) {
0398 branchID = iter->second;
0399 }
0400 }
0401 }
0402 for_all(subProcesses_, [&droppedBranchIDToKeptBranchID](auto& subProcess) {
0403 subProcess.fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID);
0404 });
0405 }
0406
0407 void SubProcess::doEventAsync(WaitingTaskHolder iHolder,
0408 EventPrincipal const& ep,
0409 std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
0410 ServiceRegistry::Operate operate(serviceToken_);
0411
0412 if (!wantAllEvents_) {
0413 EventForOutput e(ep, ModuleDescription(), nullptr);
0414 e.setConsumer(this);
0415 if (!selectors_.wantEvent(e)) {
0416 return;
0417 }
0418 }
0419 processAsync(std::move(iHolder), ep, iEventSetupImpls);
0420
0421 }
0422
0423 void SubProcess::processAsync(WaitingTaskHolder iHolder,
0424 EventPrincipal const& principal,
0425 std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
0426 EventAuxiliary aux(principal.aux());
0427 aux.setProcessHistoryID(principal.processHistoryID());
0428
0429 EventSelectionIDVector esids{principal.eventSelectionIDs()};
0430 if (principal.productRegistry().anyProductProduced() || !wantAllEvents_) {
0431 esids.push_back(selector_config_id_);
0432 }
0433
0434 EventPrincipal& ep = principalCache_.eventPrincipal(principal.streamID().value());
0435 auto& processHistoryRegistry = processHistoryRegistries_[principal.streamID().value()];
0436 processHistoryRegistry.registerProcessHistory(principal.processHistory());
0437 BranchListIndexes bli(principal.branchListIndexes());
0438 branchIDListHelper_->fixBranchListIndexes(bli);
0439 bool deepCopyRetriever = false;
0440 ep.fillEventPrincipal(
0441 aux,
0442 &principal.processHistory(),
0443 std::move(esids),
0444 std::move(bli),
0445 principal.eventToProcessBlockIndexes(),
0446 *(principal.productProvenanceRetrieverPtr()),
0447 principal.reader(),
0448 deepCopyRetriever);
0449 ep.setLuminosityBlockPrincipal(inUseLumiPrincipals_[principal.luminosityBlockPrincipal().index()].get());
0450 propagateProducts(InEvent, principal, ep);
0451
0452 using namespace edm::waiting_task;
0453 chain::first([&](auto nextTask) {
0454 EventTransitionInfo info(ep, *((*iEventSetupImpls)[esp_->subProcessIndex()]));
0455 schedule_->processOneEventAsync(std::move(nextTask), ep.streamID().value(), info, serviceToken_);
0456 }) | chain::ifThen(not subProcesses_.empty(), [this, &ep, iEventSetupImpls](auto nextTask) {
0457 for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
0458 subProcess.doEventAsync(nextTask, ep, iEventSetupImpls);
0459 }
0460 }) | chain::then([&ep](std::exception_ptr const* iPtr, auto nextTask) {
0461 ep.clearEventPrincipal();
0462 if (iPtr) {
0463 nextTask.doneWaiting(*iPtr);
0464 }
0465 }) | chain::runLast(std::move(iHolder));
0466 }
0467
0468 template <>
0469 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
0470 WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool cleaningUpAfterException) {
0471 ServiceRegistry::Operate operate(serviceToken_);
0472
0473 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0474 ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0475 processBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
0476 propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
0477
0478 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0479 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0480 beginGlobalTransitionAsync<Traits>(
0481 std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0482 }
0483
0484 template <>
0485 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
0486 WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool) {
0487 ServiceRegistry::Operate operate(serviceToken_);
0488
0489 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0490 ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0491 processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0492 propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
0493
0494 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0495 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0496 beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
0497 }
0498
0499 void SubProcess::doEndProcessBlockAsync(WaitingTaskHolder iHolder,
0500 ProcessBlockTransitionInfo const& iTransitionInfo,
0501 bool cleaningUpAfterException) {
0502 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0503 ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0504 propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
0505
0506 ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0507
0508 if (parentProducedProductIsKept(parentPrincipal, processBlockPrincipal)) {
0509 ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0510 inputProcessBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
0511 propagateProducts(InProcess, parentPrincipal, inputProcessBlockPrincipal);
0512 ProcessBlockTransitionInfo inputTransitionInfo(inputProcessBlockPrincipal);
0513
0514 using namespace edm::waiting_task;
0515 chain::first([&](const std::exception_ptr*, auto nextTask) {
0516 using TraitsInput = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0517 beginGlobalTransitionAsync<TraitsInput>(std::move(nextTask),
0518 *schedule_,
0519 inputTransitionInfo,
0520 serviceToken_,
0521 subProcesses_,
0522 cleaningUpAfterException);
0523 }) | chain::then([this](auto nextTask) { writeProcessBlockAsync(nextTask, ProcessBlockType::Input); }) |
0524 chain::then([this, info = transitionInfo, cleaningUpAfterException](std::exception_ptr const* iPtr,
0525 auto nextTask) mutable {
0526 ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0527 inputProcessBlockPrincipal.clearPrincipal();
0528 for (auto& s : subProcesses_) {
0529 s.clearProcessBlockPrincipal(ProcessBlockType::Input);
0530 }
0531 if (iPtr) {
0532 nextTask.doneWaiting(*iPtr);
0533 } else {
0534 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0535 endGlobalTransitionAsync<Traits>(
0536 std::move(nextTask), *schedule_, info, serviceToken_, subProcesses_, cleaningUpAfterException);
0537 }
0538 }) |
0539 chain::runLast(std::move(iHolder));
0540 } else {
0541 using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0542 endGlobalTransitionAsync<Traits>(
0543 std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0544 }
0545 }
0546
0547 void SubProcess::doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const& iTransitionInfo) {
0548 ServiceRegistry::Operate operate(serviceToken_);
0549
0550 RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
0551 auto aux = parentPrincipal.aux();
0552 aux.setProcessHistoryID(parentPrincipal.processHistoryID());
0553 auto rpp = principalCache_.getAvailableRunPrincipalPtr();
0554 rpp->setAux(aux);
0555 auto& processHistoryRegistry = processHistoryRegistries_[historyRunOffset_ + parentPrincipal.index()];
0556 inUseRunPrincipals_[parentPrincipal.index()] = rpp;
0557 processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
0558 rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.reader());
0559
0560 RunPrincipal& rp = *rpp;
0561 propagateProducts(InRun, parentPrincipal, rp);
0562
0563 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0564 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0565 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0566 beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
0567 }
0568
0569 void SubProcess::doEndRunAsync(WaitingTaskHolder iHolder,
0570 RunTransitionInfo const& iTransitionInfo,
0571 bool cleaningUpAfterException) {
0572 RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
0573 RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
0574 propagateProducts(InRun, parentPrincipal, rp);
0575
0576 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0577 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0578 using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0579 endGlobalTransitionAsync<Traits>(
0580 std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0581 }
0582
0583 void SubProcess::writeProcessBlockAsync(edm::WaitingTaskHolder task, ProcessBlockType processBlockType) {
0584 using namespace edm::waiting_task;
0585 chain::first([&](std::exception_ptr const*, auto nextTask) {
0586 ServiceRegistry::Operate operate(serviceToken_);
0587 schedule_->writeProcessBlockAsync(
0588 nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
0589 }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
0590 ServiceRegistry::Operate operate(serviceToken_);
0591 for (auto& s : subProcesses_) {
0592 s.writeProcessBlockAsync(nextTask, processBlockType);
0593 }
0594 }) | chain::runLast(std::move(task));
0595 }
0596
0597 void SubProcess::writeRunAsync(edm::WaitingTaskHolder task,
0598 RunPrincipal const& principal,
0599 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
0600 using namespace edm::waiting_task;
0601
0602 auto rp = inUseRunPrincipals_[principal.index()];
0603 chain::first([&](std::exception_ptr const*, auto nextTask) {
0604 ServiceRegistry::Operate operate(serviceToken_);
0605 schedule_->writeRunAsync(nextTask, *rp, &processContext_, actReg_.get(), mergeableRunProductMetadata);
0606 }) | chain::ifThen(not subProcesses_.empty(), [this, rp, mergeableRunProductMetadata](auto nextTask) {
0607 ServiceRegistry::Operate operateWriteRun(serviceToken_);
0608 for (auto& s : subProcesses_) {
0609 s.writeRunAsync(nextTask, *rp, mergeableRunProductMetadata);
0610 }
0611 }) | chain::runLast(std::move(task));
0612 }
0613
0614 void SubProcess::clearRunPrincipal(RunPrincipal& parentPrincipal) {
0615
0616 auto rp = std::move(inUseRunPrincipals_[parentPrincipal.index()]);
0617 for (auto& s : subProcesses_) {
0618 s.clearRunPrincipal(*rp);
0619 }
0620 rp->clearPrincipal();
0621 }
0622
0623 void SubProcess::clearProcessBlockPrincipal(ProcessBlockType processBlockType) {
0624 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal(processBlockType);
0625 processBlockPrincipal.clearPrincipal();
0626 for (auto& s : subProcesses_) {
0627 s.clearProcessBlockPrincipal(processBlockType);
0628 }
0629 }
0630
0631 void SubProcess::doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const& iTransitionInfo) {
0632 ServiceRegistry::Operate operate(serviceToken_);
0633
0634 LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0635 auto aux = parentPrincipal.aux();
0636 aux.setProcessHistoryID(parentPrincipal.processHistoryID());
0637 auto lbpp = principalCache_.getAvailableLumiPrincipalPtr();
0638 lbpp->setAux(aux);
0639 auto& processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_ + lbpp->index()];
0640 inUseLumiPrincipals_[parentPrincipal.index()] = lbpp;
0641 processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
0642 lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.processHistory(), parentPrincipal.reader());
0643 lbpp->setRunPrincipal(inUseRunPrincipals_[parentPrincipal.runPrincipal().index()]);
0644 LuminosityBlockPrincipal& lbp = *lbpp;
0645 propagateProducts(InLumi, parentPrincipal, lbp);
0646
0647 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0648 LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0649 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0650 beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
0651 }
0652
0653 void SubProcess::doEndLuminosityBlockAsync(WaitingTaskHolder iHolder,
0654 LumiTransitionInfo const& iTransitionInfo,
0655 bool cleaningUpAfterException) {
0656 LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
0657 LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[parentPrincipal.index()];
0658 propagateProducts(InLumi, parentPrincipal, lbp);
0659
0660 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0661 LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0662 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0663 endGlobalTransitionAsync<Traits>(
0664 std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0665 }
0666
0667 void SubProcess::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& principal) {
0668 using namespace edm::waiting_task;
0669
0670 auto l = inUseLumiPrincipals_[principal.index()];
0671 chain::first([&](std::exception_ptr const*, auto nextTask) {
0672 ServiceRegistry::Operate operate(serviceToken_);
0673 schedule_->writeLumiAsync(nextTask, *l, &processContext_, actReg_.get());
0674 }) | chain::ifThen(not subProcesses_.empty(), [this, l](auto nextTask) {
0675 ServiceRegistry::Operate operateWriteLumi(serviceToken_);
0676 for (auto& s : subProcesses_) {
0677 s.writeLumiAsync(nextTask, *l);
0678 }
0679 }) | chain::runLast(std::move(task));
0680 }
0681
0682 void SubProcess::clearLumiPrincipal(LuminosityBlockPrincipal& principal) {
0683
0684 auto lb = std::move(inUseLumiPrincipals_[principal.index()]);
0685 for (auto& s : subProcesses_) {
0686 s.clearLumiPrincipal(*lb);
0687 }
0688 lb->setRunPrincipal(std::shared_ptr<RunPrincipal>());
0689 lb->clearPrincipal();
0690 }
0691
0692 void SubProcess::doBeginStream(unsigned int iID) {
0693 ServiceRegistry::Operate operate(serviceToken_);
0694 schedule_->beginStream(iID);
0695 for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doBeginStream(iID); });
0696 }
0697
0698 void SubProcess::doEndStream(unsigned int iID) {
0699 ServiceRegistry::Operate operate(serviceToken_);
0700 schedule_->endStream(iID);
0701 for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doEndStream(iID); });
0702 }
0703
0704 void SubProcess::doStreamBeginRunAsync(WaitingTaskHolder iHolder,
0705 unsigned int id,
0706 RunTransitionInfo const& iTransitionInfo) {
0707 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0708
0709 RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
0710 RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
0711
0712 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0713 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0714 beginStreamTransitionAsync<Traits>(
0715 std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
0716 }
0717
0718 void SubProcess::doStreamEndRunAsync(WaitingTaskHolder iHolder,
0719 unsigned int id,
0720 RunTransitionInfo const& iTransitionInfo,
0721 bool cleaningUpAfterException) {
0722 using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0723
0724 RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
0725 RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
0726
0727 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0728 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0729 endStreamTransitionAsync<Traits>(
0730 std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0731 }
0732
0733 void SubProcess::doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder,
0734 unsigned int id,
0735 LumiTransitionInfo const& iTransitionInfo) {
0736 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0737
0738 LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
0739 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0740 LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0741 beginStreamTransitionAsync<Traits>(
0742 std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
0743 }
0744
0745 void SubProcess::doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder,
0746 unsigned int id,
0747 LumiTransitionInfo const& iTransitionInfo,
0748 bool cleaningUpAfterException) {
0749 LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
0750 using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0751 std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
0752 LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
0753 endStreamTransitionAsync<Traits>(
0754 std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
0755 }
0756
0757 void SubProcess::propagateProducts(BranchType type, Principal const& parentPrincipal, Principal& principal) const {
0758 SelectedProducts const& keptVector = keptProducts()[type];
0759 for (auto const& item : keptVector) {
0760 BranchDescription const& desc = *item.first;
0761 ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
0762 if (parentProductResolver != nullptr) {
0763 ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
0764 if (productResolver != nullptr) {
0765
0766 productResolver->connectTo(*parentProductResolver, &parentPrincipal);
0767 }
0768 }
0769 }
0770 }
0771
0772 bool SubProcess::parentProducedProductIsKept(Principal const& parentPrincipal, Principal& principal) const {
0773 SelectedProducts const& keptVector = keptProducts()[InProcess];
0774 for (auto const& item : keptVector) {
0775 BranchDescription const& desc = *item.first;
0776 assert(desc.branchType() == InProcess);
0777 ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
0778 if (parentProductResolver != nullptr) {
0779 ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
0780 if (productResolver != nullptr) {
0781 if (parentProductResolver->branchDescription().produced()) {
0782 return true;
0783 }
0784 }
0785 }
0786 }
0787 return false;
0788 }
0789
0790 void SubProcess::updateBranchIDListHelper(BranchIDLists const& branchIDLists) {
0791 branchIDListHelper_->updateFromParent(branchIDLists);
0792 for_all(subProcesses_,
0793 [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
0794 }
0795
0796
0797 void SubProcess::respondToOpenInputFile(FileBlock const& fb) {
0798 ServiceRegistry::Operate operate(serviceToken_);
0799 schedule_->respondToOpenInputFile(fb);
0800 for_all(subProcesses_, [&fb](auto& subProcess) { subProcess.respondToOpenInputFile(fb); });
0801 }
0802
0803
0804 std::vector<ParameterSet> popSubProcessVParameterSet(ParameterSet& parameterSet) {
0805 std::vector<std::string> subProcesses =
0806 parameterSet.getUntrackedParameter<std::vector<std::string>>("@all_subprocesses");
0807 if (!subProcesses.empty()) {
0808 return parameterSet.popVParameterSet("subProcesses");
0809 }
0810 return {};
0811 }
0812 }