File indexing completed on 2025-06-29 22:58:04
0001 #include "FWCore/Framework/interface/Schedule.h"
0002
0003 #include "DataFormats/Common/interface/setIsMergeable.h"
0004 #include "DataFormats/Common/interface/TriggerResults.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0008 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 #include "FWCore/AbstractServices/interface/RandomNumberGenerator.h"
0011 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0012 #include "FWCore/Framework/interface/ConsumesCollector.h"
0013 #include "FWCore/Framework/interface/EDConsumerBase.h"
0014 #include "FWCore/Framework/src/OutputModuleDescription.h"
0015 #include "FWCore/Framework/interface/TriggerNamesService.h"
0016 #include "FWCore/Framework/src/TriggerReport.h"
0017 #include "FWCore/Framework/src/TriggerTimingReport.h"
0018 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0019 #include "FWCore/Framework/src/ModuleHolderFactory.h"
0020 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0021 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0022 #include "FWCore/Framework/interface/ModuleRegistry.h"
0023 #include "FWCore/Framework/interface/ModuleRegistryUtilities.h"
0024 #include "FWCore/Framework/src/TriggerResultInserter.h"
0025 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0026 #include "FWCore/Framework/src/PathStatusInserter.h"
0027 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0028 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0029 #include "FWCore/Framework/interface/ComponentDescription.h"
0030 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0031 #include "FWCore/Concurrency/interface/chain_first.h"
0032 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0033 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0034 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0035 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0036 #include "FWCore/ServiceRegistry/interface/ModuleConsumesInfo.h"
0037 #include "FWCore/Utilities/interface/Algorithms.h"
0038 #include "FWCore/Utilities/interface/ConvertException.h"
0039 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0040 #include "FWCore/Utilities/interface/TypeID.h"
0041 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0042
0043 #include <array>
0044 #include <algorithm>
0045 #include <cassert>
0046 #include <cstdlib>
0047 #include <functional>
0048 #include <iomanip>
0049 #include <list>
0050 #include <map>
0051 #include <set>
0052 #include <exception>
0053 #include <sstream>
0054
0055 #include "make_shared_noexcept_false.h"
0056 #include "processEDAliases.h"
0057
0058 namespace edm {
0059
0060 class ModuleMakerBase;
0061
0062 namespace {
0063 using std::placeholders::_1;
0064
0065 bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0066 return std::binary_search(v.begin(), v.end(), s);
0067 }
0068
0069
0070
0071
0072 std::shared_ptr<TriggerResultInserter> makeInserter(ParameterSet& proc_pset,
0073 PreallocationConfiguration const& iPrealloc,
0074 SignallingProductRegistryFiller& preg,
0075 ExceptionToActionTable const& actions,
0076 std::shared_ptr<ActivityRegistry> areg,
0077 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0078 ModuleRegistry& moduleRegistry) {
0079 ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
0080 trig_pset->registerIt();
0081
0082 WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
0083 ModuleDescription md(trig_pset->id(),
0084 "TriggerResultInserter",
0085 "TriggerResults",
0086 processConfiguration.get(),
0087 ModuleDescription::getUniqueID());
0088
0089 return moduleRegistry.makeExplicitModule<TriggerResultInserter>(md,
0090 iPrealloc,
0091 &preg,
0092 areg->preModuleConstructionSignal_,
0093 areg->postModuleConstructionSignal_,
0094 *trig_pset,
0095 iPrealloc.numberOfStreams());
0096 }
0097
0098 template <typename T>
0099 void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
0100 std::vector<std::string> const& pathNames,
0101 PreallocationConfiguration const& iPrealloc,
0102 SignallingProductRegistryFiller& preg,
0103 std::shared_ptr<ActivityRegistry> areg,
0104 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0105 ModuleRegistry& moduleRegistry,
0106 std::string const& moduleTypeName) {
0107 ParameterSet pset;
0108 pset.addParameter<std::string>("@module_type", moduleTypeName);
0109 pset.addParameter<std::string>("@module_edm_type", "EDProducer");
0110 pset.registerIt();
0111
0112 pathStatusInserters.reserve(pathNames.size());
0113
0114 for (auto const& pathName : pathNames) {
0115 ModuleDescription md(
0116 pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
0117 auto module = moduleRegistry.makeExplicitModule<T>(md,
0118 iPrealloc,
0119 &preg,
0120 areg->preModuleConstructionSignal_,
0121 areg->postModuleConstructionSignal_,
0122 iPrealloc.numberOfStreams());
0123 pathStatusInserters.emplace_back(std::move(module));
0124 }
0125 }
0126
0127 typedef std::vector<std::string> vstring;
0128
0129 void processSwitchProducers(ParameterSet const& proc_pset,
0130 std::string const& processName,
0131 SignallingProductRegistryFiller& preg) {
0132
0133 struct BranchesCases {
0134 BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0135 std::vector<BranchKey> chosenBranches;
0136 std::vector<std::string> caseLabels;
0137 };
0138 std::map<std::string, BranchesCases> switchMap;
0139 for (auto& prod : preg.productListUpdator()) {
0140 if (prod.second.isSwitchAlias()) {
0141 auto it = switchMap.find(prod.second.moduleLabel());
0142 if (it == switchMap.end()) {
0143 auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0144 auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0145 switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0146 assert(inserted.second);
0147 it = inserted.first;
0148 }
0149
0150 bool found = false;
0151 for (auto const& productIter : preg.registry().productList()) {
0152 BranchKey const& branchKey = productIter.first;
0153
0154
0155
0156
0157 if (branchKey.processName() != processName) {
0158 continue;
0159 }
0160
0161 ProductDescription const& desc = productIter.second;
0162 if (desc.branchType() == prod.second.branchType() and
0163 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0164 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0165 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0166 prod.second.setSwitchAliasForBranch(desc);
0167 if (!prod.second.transient()) {
0168 it->second.chosenBranches.push_back(prod.first);
0169 }
0170 found = true;
0171 }
0172 }
0173 if (not found) {
0174 Exception ex(errors::LogicError);
0175 ex << "Trying to find a ProductDescription to be aliased-for by SwitchProducer with\n"
0176 << " friendly class name = " << prod.second.friendlyClassName() << "\n"
0177 << " module label = " << prod.second.moduleLabel() << "\n"
0178 << " product instance name = " << prod.second.productInstanceName() << "\n"
0179 << " process name = " << processName
0180 << "\n\nbut did not find any. Please contact a framework developer.";
0181 ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0182 throw ex;
0183 }
0184 }
0185 }
0186 if (switchMap.empty())
0187 return;
0188
0189 for (auto& elem : switchMap) {
0190 std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0191 }
0192
0193 auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0194 std::map<std::string, std::vector<BranchKey>> caseBranches;
0195 for (auto const& item : preg.registry().productList()) {
0196 if (item.first.processName() != processName)
0197 continue;
0198
0199 if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0200 found != caseLabels.end()) {
0201 caseBranches[*found].push_back(item.first);
0202 }
0203 }
0204
0205 for (auto const& caseLabel : caseLabels) {
0206 ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0207 auto& branches = caseBranches[caseLabel];
0208 std::sort(branches.begin(), branches.end());
0209 for (auto const& branch : branches) {
0210 ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0211 }
0212 }
0213 };
0214
0215
0216
0217 std::vector<bool> foundBranches;
0218 for (auto const& switchItem : switchMap) {
0219 auto const& switchLabel = switchItem.first;
0220 auto const& chosenBranches = switchItem.second.chosenBranches;
0221 auto const& caseLabels = switchItem.second.caseLabels;
0222 foundBranches.resize(chosenBranches.size());
0223 for (auto const& caseLabel : caseLabels) {
0224 std::fill(foundBranches.begin(), foundBranches.end(), false);
0225 for (auto& nonConstItem : preg.productListUpdator()) {
0226 auto const& item = nonConstItem;
0227 if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0228
0229
0230 if (!item.second.transient()) {
0231 auto range = std::equal_range(chosenBranches.begin(),
0232 chosenBranches.end(),
0233 BranchKey(item.first.friendlyClassName(),
0234 switchLabel,
0235 item.first.productInstanceName(),
0236 item.first.processName()));
0237 if (range.first == range.second) {
0238 Exception ex(errors::Configuration);
0239 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0240 << item.first << " that is not produced by the chosen case "
0241 << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0242 .getUntrackedParameter<std::string>("@chosen_case")
0243 << " and that product is not transient. "
0244 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0245 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0246 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0247 addProductsToException(caseLabels, ex);
0248 throw ex;
0249 }
0250 assert(std::distance(range.first, range.second) == 1);
0251 foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0252 }
0253
0254
0255
0256
0257
0258
0259
0260
0261
0262 nonConstItem.second.setTransient(true);
0263
0264
0265 auto const& bd = item.second;
0266 if (not bd.branchAliases().empty()) {
0267 auto ex = Exception(errors::UnimplementedFeature)
0268 << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0269 "aliases for SwitchProducer with label "
0270 << switchLabel << " for case " << caseLabel << ":";
0271 for (auto const& branchAlias : bd.branchAliases()) {
0272 ex << " " << branchAlias;
0273 }
0274 throw ex;
0275 }
0276 }
0277 }
0278
0279 for (size_t i = 0; i < chosenBranches.size(); i++) {
0280 if (not foundBranches[i]) {
0281 auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0282 .getUntrackedParameter<std::string>("@chosen_case");
0283 Exception ex(errors::Configuration);
0284 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0285 << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0286 << chosenLabel << " and that product is not transient. "
0287 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0288 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0289 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0290 addProductsToException(caseLabels, ex);
0291 throw ex;
0292 }
0293 }
0294 }
0295 }
0296 }
0297
0298 void reduceParameterSet(ParameterSet& proc_pset,
0299 vstring const& end_path_name_list,
0300 vstring& modulesInConfig,
0301 std::set<std::string> const& usedModuleLabels,
0302 std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0303
0304
0305
0306
0307
0308
0309
0310
0311
0312
0313
0314 vstring outputModuleLabels;
0315 std::string edmType;
0316 std::string const moduleEdmType("@module_edm_type");
0317 std::string const outputModule("OutputModule");
0318 std::string const edAnalyzer("EDAnalyzer");
0319 std::string const edFilter("EDFilter");
0320 std::string const edProducer("EDProducer");
0321
0322 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0323
0324
0325
0326 vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0327 std::set<std::string> modulesOnPaths;
0328 {
0329 std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0330 for (auto const& endPath : end_path_name_list) {
0331 noEndPaths.erase(endPath);
0332 }
0333 {
0334 vstring labels;
0335 for (auto const& path : noEndPaths) {
0336 labels = proc_pset.getParameter<vstring>(path);
0337 modulesOnPaths.insert(labels.begin(), labels.end());
0338 }
0339 }
0340 }
0341
0342
0343 std::vector<std::string> labelsToBeDropped;
0344 labelsToBeDropped.reserve(modulesInConfigSet.size());
0345 std::set_difference(modulesInConfigSet.begin(),
0346 modulesInConfigSet.end(),
0347 usedModuleLabels.begin(),
0348 usedModuleLabels.end(),
0349 std::back_inserter(labelsToBeDropped));
0350
0351 const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0352 for (auto const& modLabel : usedModuleLabels) {
0353
0354
0355 if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0356 edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0357 if (edmType == outputModule) {
0358 outputModuleLabels.push_back(modLabel);
0359 labelsToBeDropped.push_back(modLabel);
0360 }
0361 if (edmType == edAnalyzer) {
0362 if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0363 labelsToBeDropped.push_back(modLabel);
0364 }
0365 }
0366 }
0367 }
0368
0369 std::inplace_merge(
0370 labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0371
0372
0373 for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0374
0375
0376 vstring::iterator endAfterRemove =
0377 std::remove_if(modulesInConfig.begin(),
0378 modulesInConfig.end(),
0379 std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0380 modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0381 proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0382
0383
0384 vstring endPathsToBeDropped;
0385 vstring labels;
0386 for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0387 iEndPath != endEndPath;
0388 ++iEndPath) {
0389 labels = proc_pset.getParameter<vstring>(*iEndPath);
0390 vstring::iterator iSave = labels.begin();
0391 vstring::iterator iBegin = labels.begin();
0392
0393 for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0394 if (binary_search_string(labelsToBeDropped, *iLabel)) {
0395 if (binary_search_string(outputModuleLabels, *iLabel)) {
0396 outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0397 }
0398 } else {
0399 if (iSave != iLabel) {
0400 iSave->swap(*iLabel);
0401 }
0402 ++iSave;
0403 }
0404 }
0405 labels.erase(iSave, labels.end());
0406 if (labels.empty()) {
0407
0408 proc_pset.eraseSimpleParameter(*iEndPath);
0409 endPathsToBeDropped.push_back(*iEndPath);
0410 } else {
0411 proc_pset.addParameter<vstring>(*iEndPath, labels);
0412 }
0413 }
0414 sort_all(endPathsToBeDropped);
0415
0416
0417 endAfterRemove = std::remove_if(scheduledPaths.begin(),
0418 scheduledPaths.end(),
0419 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0420 scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0421 proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0422
0423
0424 vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0425 endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0426 scheduledEndPaths.end(),
0427 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0428 scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0429 proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0430 }
0431
0432 class RngEDConsumer : public EDConsumerBase {
0433 public:
0434 explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
0435 Service<RandomNumberGenerator> rng;
0436 if (rng.isAvailable()) {
0437 rng->consumes(consumesCollector());
0438 for (auto const& consumesInfo : this->moduleConsumesInfos()) {
0439 typesConsumed.emplace(consumesInfo.type());
0440 }
0441 }
0442 }
0443 };
0444
0445 template <typename F>
0446 auto doCleanup(F&& iF) {
0447 auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0448 CMS_SA_ALLOW try { f(); } catch (...) {
0449 }
0450 if (iPtr) {
0451 iTask.doneWaiting(*iPtr);
0452 }
0453 };
0454 return wrapped;
0455 }
0456 }
0457
0458
0459 typedef std::vector<std::string> vstring;
0460
0461
0462
0463 Schedule::Schedule(ParameterSet& proc_pset,
0464 service::TriggerNamesService const& tns,
0465 SignallingProductRegistryFiller& preg,
0466 ExceptionToActionTable const& actions,
0467 std::shared_ptr<ActivityRegistry> areg,
0468 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0469 PreallocationConfiguration const& prealloc,
0470 ProcessContext const* processContext,
0471 ModuleTypeResolverMaker const* resolverMaker)
0472 :
0473 moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
0474 resultsInserter_{
0475 tns.getTrigPaths().empty()
0476 ? std::shared_ptr<TriggerResultInserter>{}
0477 : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration, *moduleRegistry_)},
0478 all_output_communicators_(),
0479 preallocConfig_(prealloc),
0480 pathNames_(&tns.getTrigPaths()),
0481 endPathNames_(&tns.getEndPaths()),
0482 wantSummary_(tns.wantSummary()) {
0483 makePathStatusInserters(pathStatusInserters_,
0484 *pathNames_,
0485 prealloc,
0486 preg,
0487 areg,
0488 processConfiguration,
0489 *moduleRegistry_,
0490 std::string("PathStatusInserter"));
0491
0492 if (endPathNames_->size() > 1) {
0493 makePathStatusInserters(endPathStatusInserters_,
0494 *endPathNames_,
0495 prealloc,
0496 preg,
0497 areg,
0498 processConfiguration,
0499 *moduleRegistry_,
0500 std::string("EndPathStatusInserter"));
0501 }
0502
0503 assert(0 < prealloc.numberOfStreams());
0504 streamSchedules_.reserve(prealloc.numberOfStreams());
0505 for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0506 streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
0507 pathStatusInserters_,
0508 endPathStatusInserters_,
0509 moduleRegistry(),
0510 proc_pset,
0511 tns,
0512 prealloc,
0513 preg,
0514 actions,
0515 areg,
0516 processConfiguration,
0517 StreamID{i},
0518 processContext));
0519 }
0520
0521 const std::string kTriggerResults("TriggerResults");
0522 std::vector<std::string> modulesToUse;
0523 modulesToUse.reserve(streamSchedules_[0]->allWorkersLumisAndEvents().size());
0524 for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
0525 if (worker->description()->moduleLabel() != kTriggerResults) {
0526 modulesToUse.push_back(worker->description()->moduleLabel());
0527 }
0528 }
0529
0530 unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
0531 if (nUnscheduledModules > 0) {
0532 std::vector<std::string> temp;
0533 temp.reserve(modulesToUse.size());
0534 auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
0535 std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
0536 std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
0537 temp.swap(modulesToUse);
0538 }
0539
0540
0541 globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0542 pathStatusInserters_,
0543 endPathStatusInserters_,
0544 moduleRegistry(),
0545 modulesToUse,
0546 proc_pset,
0547 preg,
0548 prealloc,
0549 actions,
0550 areg,
0551 processConfiguration,
0552 processContext);
0553 }
0554
0555 void Schedule::finishSetup(ParameterSet& proc_pset,
0556 service::TriggerNamesService const& tns,
0557 SignallingProductRegistryFiller& preg,
0558 BranchIDListHelper& branchIDListHelper,
0559 ProcessBlockHelperBase& processBlockHelper,
0560 ThinnedAssociationsHelper& thinnedAssociationsHelper,
0561 std::shared_ptr<ActivityRegistry> areg,
0562 std::shared_ptr<ProcessConfiguration> processConfiguration,
0563 PreallocationConfiguration const& prealloc,
0564 ProcessContext const* processContext) {
0565
0566
0567 const std::string kTriggerResults("TriggerResults");
0568
0569 std::set<std::string> usedModuleLabels;
0570 for (auto const& worker : allWorkers()) {
0571 if (worker->description()->moduleLabel() != kTriggerResults) {
0572 usedModuleLabels.insert(worker->description()->moduleLabel());
0573 }
0574 }
0575 std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0576 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0577 reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0578 {
0579 std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0580 detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0581 }
0582
0583
0584
0585 {
0586 auto const& unsched = streamSchedules_[0]->unscheduledWorkersLumisAndEvents();
0587 if (not unsched.empty()) {
0588 std::set<std::string> unscheduledModules;
0589 std::transform(unsched.begin(),
0590 unsched.end(),
0591 std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
0592 [](auto worker) { return worker->description()->moduleLabel(); });
0593 preg.setUnscheduledProducts(unscheduledModules);
0594 }
0595 }
0596
0597 processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0598 proc_pset.registerIt();
0599 processConfiguration->setParameterSetID(proc_pset.id());
0600 processConfiguration->setProcessConfigurationID();
0601
0602
0603
0604 size_t all_workers_count = allWorkers().size();
0605
0606 moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0607 auto comm = iHolder->createOutputModuleCommunicator();
0608 if (comm) {
0609 all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0610 }
0611 });
0612
0613 limitOutput(proc_pset, branchIDListHelper.branchIDLists());
0614
0615
0616
0617 assert(all_workers_count == allWorkers().size());
0618
0619 branchIDListHelper.updateFromRegistry(preg.registry());
0620
0621 moduleRegistry_->forAllModuleHolders([&preg, &thinnedAssociationsHelper](auto& iHolder) {
0622 iHolder->registerThinnedAssociations(preg.registry(), thinnedAssociationsHelper);
0623 });
0624
0625 processBlockHelper.updateForNewProcess(preg.registry(), processConfiguration->processName());
0626
0627
0628
0629 for (auto& c : all_output_communicators_) {
0630 c->selectProducts(preg.registry(), thinnedAssociationsHelper, processBlockHelper);
0631 }
0632
0633 for (auto& product : preg.productListUpdator()) {
0634 setIsMergeable(product.second);
0635 }
0636
0637 {
0638
0639 std::set<TypeID> productTypesConsumed;
0640 std::set<TypeID> elementTypesConsumed;
0641
0642 for (auto const& worker : allWorkers()) {
0643 for (auto const& consumesInfo : worker->moduleConsumesInfos()) {
0644 if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0645 productTypesConsumed.emplace(consumesInfo.type());
0646 } else {
0647 elementTypesConsumed.emplace(consumesInfo.type());
0648 }
0649 }
0650 }
0651
0652 { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
0653 preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0654 }
0655
0656 for (auto& c : all_output_communicators_) {
0657 c->setEventSelectionInfo(outputModulePathPositions, preg.registry().anyProductProduced());
0658 }
0659
0660 if (wantSummary_) {
0661 std::vector<const ModuleDescription*> modDesc;
0662 const auto& workers = allWorkers();
0663 modDesc.reserve(workers.size());
0664
0665 std::transform(workers.begin(),
0666 workers.end(),
0667 std::back_inserter(modDesc),
0668 [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0669
0670
0671 summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0672 auto timeKeeperPtr = summaryTimeKeeper_.get();
0673
0674 areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0675
0676 areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0677 areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0678 areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0679 areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0680 areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0681 areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0682
0683 areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0684 areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0685
0686 areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0687 areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0688
0689 areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0690 areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0691
0692
0693
0694 }
0695
0696 }
0697
0698 void Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
0699 std::string const output("output");
0700
0701 ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0702 int maxEventSpecs = 0;
0703 int maxEventsOut = -1;
0704 ParameterSet const* vMaxEventsOut = nullptr;
0705 std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0706 if (search_all(intNamesE, output)) {
0707 maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0708 ++maxEventSpecs;
0709 }
0710 std::vector<std::string> psetNamesE;
0711 maxEventsPSet.getParameterSetNames(psetNamesE, false);
0712 if (search_all(psetNamesE, output)) {
0713 vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0714 ++maxEventSpecs;
0715 }
0716
0717 if (maxEventSpecs > 1) {
0718 throw Exception(errors::Configuration)
0719 << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0720 }
0721
0722 for (auto& c : all_output_communicators_) {
0723 OutputModuleDescription desc(branchIDLists, maxEventsOut);
0724 if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0725 std::string const& moduleLabel = c->description().moduleLabel();
0726 try {
0727 desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0728 } catch (Exception const&) {
0729 throw Exception(errors::Configuration)
0730 << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0731 }
0732 }
0733 c->configure(desc);
0734 }
0735 }
0736
0737 bool Schedule::terminate() const {
0738 if (all_output_communicators_.empty()) {
0739 return false;
0740 }
0741 for (auto& c : all_output_communicators_) {
0742 if (!c->limitReached()) {
0743
0744 return false;
0745 }
0746 }
0747 LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0748 << "has reached its configured limit.\n";
0749 return true;
0750 }
0751
0752 void Schedule::endJob(ExceptionCollector& collector) {
0753 globalSchedule_->endJob(collector, *moduleRegistry_);
0754 if (collector.hasThrown()) {
0755 return;
0756 }
0757
0758 if (wantSummary_) {
0759 try {
0760 convertException::wrap([this]() { sendFwkSummaryToMessageLogger(); });
0761 } catch (cms::Exception const& ex) {
0762 collector.addException(ex);
0763 }
0764 }
0765 }
0766
0767 void Schedule::sendFwkSummaryToMessageLogger() const {
0768
0769
0770 auto logForEach = [](auto const& iContainer, auto iMessage) {
0771 auto logger = LogFwkVerbatim("FwkSummary");
0772 int count = 0;
0773 for (auto const& element : iContainer) {
0774 iMessage(logger, element);
0775 if (++count == 10) {
0776 logger = LogFwkVerbatim("FwkSummary");
0777 count = 0;
0778 } else {
0779 logger << "\n";
0780 }
0781 }
0782 };
0783
0784 {
0785 TriggerReport tr;
0786 getTriggerReport(tr);
0787
0788
0789
0790 LogFwkVerbatim("FwkSummary") << "";
0791 LogFwkVerbatim("FwkSummary") << "TrigReport "
0792 << "---------- Event Summary ------------";
0793 if (!tr.trigPathSummaries.empty()) {
0794 LogFwkVerbatim("FwkSummary") << "TrigReport"
0795 << " Events total = " << tr.eventSummary.totalEvents
0796 << " passed = " << tr.eventSummary.totalEventsPassed
0797 << " failed = " << tr.eventSummary.totalEventsFailed << "";
0798 } else {
0799 LogFwkVerbatim("FwkSummary") << "TrigReport"
0800 << " Events total = " << tr.eventSummary.totalEvents
0801 << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0802 }
0803
0804 LogFwkVerbatim("FwkSummary") << "";
0805 LogFwkVerbatim("FwkSummary") << "TrigReport "
0806 << "---------- Path Summary ------------";
0807 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0808 << " " << std::right << std::setw(10) << "Executed"
0809 << " " << std::right << std::setw(10) << "Passed"
0810 << " " << std::right << std::setw(10) << "Failed"
0811 << " " << std::right << std::setw(10) << "Error"
0812 << " "
0813 << "Name"
0814 << "";
0815 logForEach(tr.trigPathSummaries, [](auto& logger, auto const& p) {
0816 logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << p.bitPosition << " "
0817 << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0818 << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0819 << p.timesExcept << " " << p.name;
0820 });
0821
0822
0823
0824
0825
0826
0827
0828
0829
0830
0831
0832
0833
0834
0835
0836
0837
0838
0839 LogFwkVerbatim("FwkSummary") << "\n"
0840 << "TrigReport "
0841 << "-------End-Path Summary ------------\n"
0842 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0843 << " " << std::right << std::setw(10) << "Executed"
0844 << " " << std::right << std::setw(10) << "Passed"
0845 << " " << std::right << std::setw(10) << "Failed"
0846 << " " << std::right << std::setw(10) << "Error"
0847 << " "
0848 << "Name";
0849 logForEach(tr.endPathSummaries, [](auto& logger, auto const& p) {
0850 logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << p.bitPosition << " "
0851 << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0852 << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0853 << p.timesExcept << " " << p.name;
0854 });
0855
0856 for (auto const& p : tr.trigPathSummaries) {
0857 LogFwkVerbatim("FwkSummary") << "\n"
0858 << "TrigReport "
0859 << "---------- Modules in Path: " << p.name << " ------------\n"
0860 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0861 << " " << std::right << std::setw(10) << "Visited"
0862 << " " << std::right << std::setw(10) << "Passed"
0863 << " " << std::right << std::setw(10) << "Failed"
0864 << " " << std::right << std::setw(10) << "Error"
0865 << " "
0866 << "Name";
0867
0868 logForEach(p.moduleInPathSummaries, [](auto& logger, auto const& mod) {
0869 logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << mod.bitPosition
0870 << " " << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0871 << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0872 << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0873 });
0874 }
0875
0876 for (auto const& p : tr.endPathSummaries) {
0877 LogFwkVerbatim("FwkSummary") << "\n"
0878 << "TrigReport "
0879 << "------ Modules in End-Path: " << p.name << " ------------\n"
0880 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0881 << " " << std::right << std::setw(10) << "Visited"
0882 << " " << std::right << std::setw(10) << "Passed"
0883 << " " << std::right << std::setw(10) << "Failed"
0884 << " " << std::right << std::setw(10) << "Error"
0885 << " "
0886 << "Name";
0887
0888 unsigned int bitpos = 0;
0889 logForEach(p.moduleInPathSummaries, [&bitpos](auto& logger, auto const& mod) {
0890 logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << bitpos << " "
0891 << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0892 << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0893 << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0894 ++bitpos;
0895 });
0896 }
0897
0898 LogFwkVerbatim("FwkSummary") << "\n"
0899 << "TrigReport "
0900 << "---------- Module Summary ------------\n"
0901 << "TrigReport " << std::right << std::setw(10) << "Visited"
0902 << " " << std::right << std::setw(10) << "Executed"
0903 << " " << std::right << std::setw(10) << "Passed"
0904 << " " << std::right << std::setw(10) << "Failed"
0905 << " " << std::right << std::setw(10) << "Error"
0906 << " "
0907 << "Name";
0908
0909 logForEach(tr.workerSummaries, [](auto& logger, auto const& worker) {
0910 logger << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " " << std::right
0911 << std::setw(10) << worker.timesRun << " " << std::right << std::setw(10) << worker.timesPassed << " "
0912 << std::right << std::setw(10) << worker.timesFailed << " " << std::right << std::setw(10)
0913 << worker.timesExcept << " " << worker.moduleLabel;
0914 });
0915 LogFwkVerbatim("FwkSummary") << "";
0916 }
0917
0918 TriggerTimingReport tr;
0919 getTriggerTimingReport(tr);
0920
0921 const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0922
0923 LogFwkVerbatim("FwkSummary") << "TimeReport "
0924 << "---------- Event Summary ---[sec]----";
0925 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0926 << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0927 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0928 << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0929 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0930 << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0931 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0932 << " efficiency CPU/Real/thread = "
0933 << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0934 preallocConfig_.numberOfThreads();
0935
0936 constexpr int kColumn1Size = 10;
0937 constexpr int kColumn2Size = 12;
0938 constexpr int kColumn3Size = 12;
0939 LogFwkVerbatim("FwkSummary") << "";
0940 LogFwkVerbatim("FwkSummary") << "TimeReport "
0941 << "---------- Path Summary ---[Real sec]----";
0942 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0943 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0944 << " Name";
0945 logForEach(tr.trigPathSummaries, [&](auto& logger, auto const& p) {
0946 const int timesRun = std::max(1, p.timesRun);
0947 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0948 << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0949 << " " << p.name;
0950 });
0951 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0952 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0953 << " Name";
0954
0955 LogFwkVerbatim("FwkSummary") << "\n"
0956 << "TimeReport "
0957 << "-------End-Path Summary ---[Real sec]----\n"
0958 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0959 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0960 << " Name";
0961 logForEach(tr.endPathSummaries, [&](auto& logger, auto const& p) {
0962 const int timesRun = std::max(1, p.timesRun);
0963
0964 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0965 << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0966 << " " << p.name;
0967 });
0968 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0969 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0970 << " Name";
0971
0972 for (auto const& p : tr.trigPathSummaries) {
0973 LogFwkVerbatim("FwkSummary") << "\n"
0974 << "TimeReport "
0975 << "---------- Modules in Path: " << p.name << " ---[Real sec]----\n"
0976 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0977 << " " << std::right << std::setw(kColumn2Size) << "per visit"
0978 << " Name";
0979 logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
0980 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0981 << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
0982 << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
0983 });
0984 }
0985 if (not tr.trigPathSummaries.empty()) {
0986 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0987 << " " << std::right << std::setw(kColumn2Size) << "per visit"
0988 << " Name";
0989 }
0990 for (auto const& p : tr.endPathSummaries) {
0991 LogFwkVerbatim("FwkSummary") << "\n"
0992 << "TimeReport "
0993 << "------ Modules in End-Path: " << p.name << " ---[Real sec]----\n"
0994 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0995 << " " << std::right << std::setw(kColumn2Size) << "per visit"
0996 << " Name";
0997 logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
0998 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0999 << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1000 << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
1001 });
1002 }
1003 if (not tr.endPathSummaries.empty()) {
1004 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1005 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1006 << " Name";
1007 }
1008 LogFwkVerbatim("FwkSummary") << "\n"
1009 << "TimeReport "
1010 << "---------- Module Summary ---[Real sec]----\n"
1011 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1012 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1013 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1014 << " Name";
1015 logForEach(tr.workerSummaries, [&](auto& logger, auto const& worker) {
1016 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1017 << worker.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1018 << worker.realTime / std::max(1, worker.timesRun) << " " << std::right << std::setw(kColumn3Size)
1019 << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel;
1020 });
1021 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1022 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1023 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1024 << " Name";
1025
1026 LogFwkVerbatim("FwkSummary") << "\nT---Report end!\n\n";
1027 }
1028
1029 void Schedule::closeOutputFiles() {
1030 using std::placeholders::_1;
1031 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
1032 for (auto& worker : allWorkers()) {
1033 worker->respondToCloseOutputFile();
1034 }
1035 }
1036
1037 void Schedule::openOutputFiles(FileBlock& fb) {
1038 using std::placeholders::_1;
1039 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1040 }
1041
1042 void Schedule::writeRunAsync(WaitingTaskHolder task,
1043 RunPrincipal const& rp,
1044 ProcessContext const* processContext,
1045 ActivityRegistry* activityRegistry,
1046 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1047 auto token = ServiceRegistry::instance().presentToken();
1048 GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
1049 LuminosityBlockID(rp.run(), 0),
1050 rp.index(),
1051 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1052 rp.endTime(),
1053 processContext);
1054
1055 using namespace edm::waiting_task;
1056 chain::first([&](auto nextTask) {
1057
1058 ServiceRegistry::Operate op(token);
1059
1060
1061 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1062 }
1063 for (auto& c : all_output_communicators_) {
1064 c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1065 }
1066 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1067
1068 ServiceRegistry::Operate op(token);
1069
1070 activityRegistry->postGlobalWriteRunSignal_(globalContext);
1071 })) |
1072 chain::runLast(task);
1073 }
1074
1075 void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
1076 ProcessBlockPrincipal const& pbp,
1077 ProcessContext const* processContext,
1078 ActivityRegistry* activityRegistry) {
1079 auto token = ServiceRegistry::instance().presentToken();
1080 GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
1081 LuminosityBlockID(),
1082 RunIndex::invalidRunIndex(),
1083 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1084 Timestamp::invalidTimestamp(),
1085 processContext);
1086
1087 using namespace edm::waiting_task;
1088 chain::first([&](auto nextTask) {
1089
1090 ServiceRegistry::Operate op(token);
1091 CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1092 }
1093 for (auto& c : all_output_communicators_) {
1094 c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1095 }
1096 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1097
1098 ServiceRegistry::Operate op(token);
1099
1100 activityRegistry->postWriteProcessBlockSignal_(globalContext);
1101 })) |
1102 chain::runLast(std::move(task));
1103 }
1104
1105 void Schedule::writeLumiAsync(WaitingTaskHolder task,
1106 LuminosityBlockPrincipal const& lbp,
1107 ProcessContext const* processContext,
1108 ActivityRegistry* activityRegistry) {
1109 auto token = ServiceRegistry::instance().presentToken();
1110 GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1111 lbp.id(),
1112 lbp.runPrincipal().index(),
1113 lbp.index(),
1114 lbp.beginTime(),
1115 processContext);
1116
1117 using namespace edm::waiting_task;
1118 chain::first([&](auto nextTask) {
1119 ServiceRegistry::Operate op(token);
1120 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1121 }
1122 for (auto& c : all_output_communicators_) {
1123 c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1124 }
1125 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1126
1127 ServiceRegistry::Operate op(token);
1128
1129 activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1130 })) |
1131 chain::runLast(task);
1132 }
1133
1134 bool Schedule::shouldWeCloseOutput() const {
1135 using std::placeholders::_1;
1136
1137 return (std::find_if(all_output_communicators_.begin(),
1138 all_output_communicators_.end(),
1139 std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1140 all_output_communicators_.end());
1141 }
1142
1143 void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1144 using std::placeholders::_1;
1145 for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1146 }
1147
1148 void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1149 using std::placeholders::_1;
1150 for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1151 }
1152
1153 void Schedule::beginJob(ProductRegistry const& iRegistry,
1154 eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
1155 ProcessBlockHelperBase const& processBlockHelperBase,
1156 std::string const& iProcessName) {
1157 finishModulesInitialization(*moduleRegistry_, iRegistry, iESIndices, processBlockHelperBase, iProcessName);
1158 globalSchedule_->beginJob(*moduleRegistry_);
1159 }
1160
1161 void Schedule::beginStream(unsigned int streamID) {
1162 assert(streamID < streamSchedules_.size());
1163 streamSchedules_[streamID]->beginStream(*moduleRegistry_);
1164 }
1165
1166 void Schedule::endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept {
1167 assert(streamID < streamSchedules_.size());
1168 streamSchedules_[streamID]->endStream(*moduleRegistry_, collector, collectorMutex);
1169 }
1170
1171 void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1172 unsigned int iStreamID,
1173 EventTransitionInfo& info,
1174 ServiceToken const& token) {
1175 assert(iStreamID < streamSchedules_.size());
1176 streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1177 }
1178
1179 bool Schedule::changeModule(std::string const& iLabel,
1180 ParameterSet const& iPSet,
1181 const SignallingProductRegistryFiller& iRegistry,
1182 eventsetup::ESRecordsToProductResolverIndices const& iIndices) {
1183 Worker* found = nullptr;
1184 for (auto const& worker : allWorkers()) {
1185 if (worker->description()->moduleLabel() == iLabel) {
1186 found = worker;
1187 break;
1188 }
1189 }
1190 if (nullptr == found) {
1191 return false;
1192 }
1193
1194 auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1195
1196 globalSchedule_->replaceModule(newMod, iLabel);
1197
1198 for (auto& s : streamSchedules_) {
1199 s->replaceModule(newMod, iLabel);
1200 }
1201
1202 {
1203
1204 auto const processBlockLookup = iRegistry.registry().productLookup(InProcess);
1205 auto const runLookup = iRegistry.registry().productLookup(InRun);
1206 auto const lumiLookup = iRegistry.registry().productLookup(InLumi);
1207 auto const eventLookup = iRegistry.registry().productLookup(InEvent);
1208 newMod->updateLookup(InProcess, *runLookup);
1209 newMod->updateLookup(InRun, *runLookup);
1210 newMod->updateLookup(InLumi, *lumiLookup);
1211 newMod->updateLookup(InEvent, *eventLookup);
1212 newMod->updateLookup(iIndices);
1213
1214 auto const& processName = newMod->moduleDescription().processName();
1215 auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1216 auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1217 auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1218 auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1219 newMod->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1220 newMod->resolvePutIndicies(InRun, runModuleToIndicies);
1221 newMod->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1222 newMod->resolvePutIndicies(InEvent, eventModuleToIndicies);
1223 }
1224
1225 return true;
1226 }
1227
1228 void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1229 globalSchedule_->deleteModule(iLabel);
1230 for (auto& stream : streamSchedules_) {
1231 stream->deleteModule(iLabel);
1232 }
1233 moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1234 }
1235
1236 void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1237 std::multimap<std::string, std::string> const& referencesToBranches,
1238 std::vector<std::string> const& modulesToSkip,
1239 edm::ProductRegistry const& preg) {
1240 for (auto& stream : streamSchedules_) {
1241 stream->initializeEarlyDelete(
1242 *moduleRegistry(), branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1243 }
1244 }
1245
1246 std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1247 std::vector<ModuleDescription const*> result;
1248 result.reserve(allWorkers().size());
1249
1250 for (auto const& worker : allWorkers()) {
1251 ModuleDescription const* p = worker->description();
1252 result.push_back(p);
1253 }
1254 return result;
1255 }
1256
1257 Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1258
1259 void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1260 moduleRegistry_->forAllModuleHolders([&](auto& iHolder) { iHolder->convertCurrentProcessAlias(processName); });
1261 }
1262
1263 void Schedule::releaseMemoryPostLookupSignal() {
1264 moduleRegistry_->forAllModuleHolders([&](auto& iHolder) { iHolder->releaseMemoryPostLookupSignal(); });
1265 }
1266
1267 void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1268 streamSchedules_[0]->availablePaths(oLabelsToFill);
1269 }
1270
1271 void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1272
1273 void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1274
1275 void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1276 streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1277 }
1278
1279 void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1280 std::vector<ModuleDescription const*>& descriptions,
1281 unsigned int hint) const {
1282 streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1283 }
1284
1285 void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1286 std::vector<ModuleDescription const*>& descriptions,
1287 unsigned int hint) const {
1288 streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1289 }
1290
1291 void Schedule::getTriggerReport(TriggerReport& rep) const {
1292 rep.eventSummary.totalEvents = 0;
1293 rep.eventSummary.totalEventsPassed = 0;
1294 rep.eventSummary.totalEventsFailed = 0;
1295 for (auto& s : streamSchedules_) {
1296 s->getTriggerReport(rep);
1297 }
1298 sort_all(rep.workerSummaries);
1299 }
1300
1301 void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1302 rep.eventSummary.totalEvents = 0;
1303 rep.eventSummary.cpuTime = 0.;
1304 rep.eventSummary.realTime = 0.;
1305 summaryTimeKeeper_->fillTriggerTimingReport(rep);
1306 }
1307
1308 int Schedule::totalEvents() const {
1309 int returnValue = 0;
1310 for (auto& s : streamSchedules_) {
1311 returnValue += s->totalEvents();
1312 }
1313 return returnValue;
1314 }
1315
1316 int Schedule::totalEventsPassed() const {
1317 int returnValue = 0;
1318 for (auto& s : streamSchedules_) {
1319 returnValue += s->totalEventsPassed();
1320 }
1321 return returnValue;
1322 }
1323
1324 int Schedule::totalEventsFailed() const {
1325 int returnValue = 0;
1326 for (auto& s : streamSchedules_) {
1327 returnValue += s->totalEventsFailed();
1328 }
1329 return returnValue;
1330 }
1331
1332 void Schedule::clearCounters() {
1333 for (auto& s : streamSchedules_) {
1334 s->clearCounters();
1335 }
1336 }
1337 }