File indexing completed on 2025-09-12 10:00:30
0001 #include "FWCore/Framework/interface/Schedule.h"
0002 #include "FWCore/Framework/src/ScheduleBuilder.h"
0003
0004 #include "DataFormats/Common/interface/setIsMergeable.h"
0005 #include "DataFormats/Common/interface/TriggerResults.h"
0006 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0007 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0008 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0009 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0010 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0011 #include "FWCore/AbstractServices/interface/RandomNumberGenerator.h"
0012 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0013 #include "FWCore/Framework/interface/ConsumesCollector.h"
0014 #include "FWCore/Framework/interface/EDConsumerBase.h"
0015 #include "FWCore/Framework/src/OutputModuleDescription.h"
0016 #include "FWCore/Framework/interface/TriggerNamesService.h"
0017 #include "FWCore/Framework/src/TriggerReport.h"
0018 #include "FWCore/Framework/src/TriggerTimingReport.h"
0019 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0020 #include "FWCore/Framework/src/ModuleHolderFactory.h"
0021 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0022 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0023 #include "FWCore/Framework/interface/ModuleRegistry.h"
0024 #include "FWCore/Framework/interface/ModuleRegistryUtilities.h"
0025 #include "FWCore/Framework/src/TriggerResultInserter.h"
0026 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0027 #include "FWCore/Framework/src/PathStatusInserter.h"
0028 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0029 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0030 #include "FWCore/Framework/interface/ComponentDescription.h"
0031 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0032 #include "FWCore/Concurrency/interface/chain_first.h"
0033 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0034 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0035 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0036 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0037 #include "FWCore/ServiceRegistry/interface/ModuleConsumesInfo.h"
0038 #include "FWCore/Utilities/interface/Algorithms.h"
0039 #include "FWCore/Utilities/interface/ConvertException.h"
0040 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0041 #include "FWCore/Utilities/interface/TypeID.h"
0042 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0043
0044 #include <array>
0045 #include <algorithm>
0046 #include <cassert>
0047 #include <cstdlib>
0048 #include <functional>
0049 #include <iomanip>
0050 #include <list>
0051 #include <map>
0052 #include <set>
0053 #include <exception>
0054 #include <sstream>
0055
0056 #include "make_shared_noexcept_false.h"
0057 #include "processEDAliases.h"
0058
0059 namespace edm {
0060
0061 class ModuleMakerBase;
0062
0063 namespace {
0064 using std::placeholders::_1;
0065
0066 bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0067 return std::binary_search(v.begin(), v.end(), s);
0068 }
0069
0070 typedef std::vector<std::string> vstring;
0071
0072 void processSwitchProducers(ParameterSet const& proc_pset,
0073 std::string const& processName,
0074 SignallingProductRegistryFiller& preg) {
0075
0076 struct BranchesCases {
0077 BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0078 std::vector<BranchKey> chosenBranches;
0079 std::vector<std::string> caseLabels;
0080 };
0081 std::map<std::string, BranchesCases> switchMap;
0082 for (auto& prod : preg.productListUpdator()) {
0083 if (prod.second.isSwitchAlias()) {
0084 auto it = switchMap.find(prod.second.moduleLabel());
0085 if (it == switchMap.end()) {
0086 auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0087 auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0088 switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0089 assert(inserted.second);
0090 it = inserted.first;
0091 }
0092
0093 bool found = false;
0094 for (auto const& productIter : preg.registry().productList()) {
0095 BranchKey const& branchKey = productIter.first;
0096
0097
0098
0099
0100 if (branchKey.processName() != processName) {
0101 continue;
0102 }
0103
0104 ProductDescription const& desc = productIter.second;
0105 if (desc.branchType() == prod.second.branchType() and
0106 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0107 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0108 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0109 prod.second.setSwitchAliasForBranch(desc);
0110 if (!prod.second.transient()) {
0111 it->second.chosenBranches.push_back(prod.first);
0112 }
0113 found = true;
0114 }
0115 }
0116 if (not found) {
0117 Exception ex(errors::LogicError);
0118 ex << "Trying to find a ProductDescription to be aliased-for by SwitchProducer with\n"
0119 << " friendly class name = " << prod.second.friendlyClassName() << "\n"
0120 << " module label = " << prod.second.moduleLabel() << "\n"
0121 << " product instance name = " << prod.second.productInstanceName() << "\n"
0122 << " process name = " << processName
0123 << "\n\nbut did not find any. Please contact a framework developer.";
0124 ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0125 throw ex;
0126 }
0127 }
0128 }
0129 if (switchMap.empty())
0130 return;
0131
0132 for (auto& elem : switchMap) {
0133 std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0134 }
0135
0136 auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0137 std::map<std::string, std::vector<BranchKey>> caseBranches;
0138 for (auto const& item : preg.registry().productList()) {
0139 if (item.first.processName() != processName)
0140 continue;
0141
0142 if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0143 found != caseLabels.end()) {
0144 caseBranches[*found].push_back(item.first);
0145 }
0146 }
0147
0148 for (auto const& caseLabel : caseLabels) {
0149 ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0150 auto& branches = caseBranches[caseLabel];
0151 std::sort(branches.begin(), branches.end());
0152 for (auto const& branch : branches) {
0153 ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0154 }
0155 }
0156 };
0157
0158
0159
0160 std::vector<bool> foundBranches;
0161 for (auto const& switchItem : switchMap) {
0162 auto const& switchLabel = switchItem.first;
0163 auto const& chosenBranches = switchItem.second.chosenBranches;
0164 auto const& caseLabels = switchItem.second.caseLabels;
0165 foundBranches.resize(chosenBranches.size());
0166 for (auto const& caseLabel : caseLabels) {
0167 std::fill(foundBranches.begin(), foundBranches.end(), false);
0168 for (auto& nonConstItem : preg.productListUpdator()) {
0169 auto const& item = nonConstItem;
0170 if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0171
0172
0173 if (!item.second.transient()) {
0174 auto range = std::equal_range(chosenBranches.begin(),
0175 chosenBranches.end(),
0176 BranchKey(item.first.friendlyClassName(),
0177 switchLabel,
0178 item.first.productInstanceName(),
0179 item.first.processName()));
0180 if (range.first == range.second) {
0181 Exception ex(errors::Configuration);
0182 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0183 << item.first << " that is not produced by the chosen case "
0184 << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0185 .getUntrackedParameter<std::string>("@chosen_case")
0186 << " and that product is not transient. "
0187 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0188 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0189 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0190 addProductsToException(caseLabels, ex);
0191 throw ex;
0192 }
0193 assert(std::distance(range.first, range.second) == 1);
0194 foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0195 }
0196
0197
0198
0199
0200
0201
0202
0203
0204
0205 nonConstItem.second.setTransient(true);
0206
0207
0208 auto const& bd = item.second;
0209 if (not bd.branchAliases().empty()) {
0210 auto ex = Exception(errors::UnimplementedFeature)
0211 << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0212 "aliases for SwitchProducer with label "
0213 << switchLabel << " for case " << caseLabel << ":";
0214 for (auto const& branchAlias : bd.branchAliases()) {
0215 ex << " " << branchAlias;
0216 }
0217 throw ex;
0218 }
0219 }
0220 }
0221
0222 for (size_t i = 0; i < chosenBranches.size(); i++) {
0223 if (not foundBranches[i]) {
0224 auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0225 .getUntrackedParameter<std::string>("@chosen_case");
0226 Exception ex(errors::Configuration);
0227 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0228 << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0229 << chosenLabel << " and that product is not transient. "
0230 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0231 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0232 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0233 addProductsToException(caseLabels, ex);
0234 throw ex;
0235 }
0236 }
0237 }
0238 }
0239 }
0240
0241 void reduceParameterSet(ParameterSet& proc_pset,
0242 vstring const& end_path_name_list,
0243 vstring& modulesInConfig,
0244 std::set<std::string> const& usedModuleLabels,
0245 std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0246
0247
0248
0249
0250
0251
0252
0253
0254
0255
0256
0257 vstring outputModuleLabels;
0258 std::string edmType;
0259 std::string const moduleEdmType("@module_edm_type");
0260 std::string const outputModule("OutputModule");
0261 std::string const edAnalyzer("EDAnalyzer");
0262 std::string const edFilter("EDFilter");
0263 std::string const edProducer("EDProducer");
0264
0265 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0266
0267
0268
0269 vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0270 std::set<std::string> modulesOnPaths;
0271 {
0272 std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0273 for (auto const& endPath : end_path_name_list) {
0274 noEndPaths.erase(endPath);
0275 }
0276 {
0277 vstring labels;
0278 for (auto const& path : noEndPaths) {
0279 labels = proc_pset.getParameter<vstring>(path);
0280 modulesOnPaths.insert(labels.begin(), labels.end());
0281 }
0282 }
0283 }
0284
0285
0286 std::vector<std::string> labelsToBeDropped;
0287 labelsToBeDropped.reserve(modulesInConfigSet.size());
0288 std::set_difference(modulesInConfigSet.begin(),
0289 modulesInConfigSet.end(),
0290 usedModuleLabels.begin(),
0291 usedModuleLabels.end(),
0292 std::back_inserter(labelsToBeDropped));
0293
0294 const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0295 for (auto const& modLabel : usedModuleLabels) {
0296
0297
0298 if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0299 edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0300 if (edmType == outputModule) {
0301 outputModuleLabels.push_back(modLabel);
0302 labelsToBeDropped.push_back(modLabel);
0303 }
0304 if (edmType == edAnalyzer) {
0305 if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0306 labelsToBeDropped.push_back(modLabel);
0307 }
0308 }
0309 }
0310 }
0311
0312 std::inplace_merge(
0313 labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0314
0315
0316 for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0317
0318
0319 vstring::iterator endAfterRemove =
0320 std::remove_if(modulesInConfig.begin(),
0321 modulesInConfig.end(),
0322 std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0323 modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0324 proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0325
0326
0327 vstring endPathsToBeDropped;
0328 vstring labels;
0329 for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0330 iEndPath != endEndPath;
0331 ++iEndPath) {
0332 labels = proc_pset.getParameter<vstring>(*iEndPath);
0333 vstring::iterator iSave = labels.begin();
0334 vstring::iterator iBegin = labels.begin();
0335
0336 for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0337 if (binary_search_string(labelsToBeDropped, *iLabel)) {
0338 if (binary_search_string(outputModuleLabels, *iLabel)) {
0339 outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0340 }
0341 } else {
0342 if (iSave != iLabel) {
0343 iSave->swap(*iLabel);
0344 }
0345 ++iSave;
0346 }
0347 }
0348 labels.erase(iSave, labels.end());
0349 if (labels.empty()) {
0350
0351 proc_pset.eraseSimpleParameter(*iEndPath);
0352 endPathsToBeDropped.push_back(*iEndPath);
0353 } else {
0354 proc_pset.addParameter<vstring>(*iEndPath, labels);
0355 }
0356 }
0357 sort_all(endPathsToBeDropped);
0358
0359
0360 endAfterRemove = std::remove_if(scheduledPaths.begin(),
0361 scheduledPaths.end(),
0362 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0363 scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0364 proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0365
0366
0367 vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0368 endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0369 scheduledEndPaths.end(),
0370 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0371 scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0372 proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0373 }
0374
0375 template <typename F>
0376 auto doCleanup(F&& iF) {
0377 auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0378 CMS_SA_ALLOW try { f(); } catch (...) {
0379 }
0380 if (iPtr) {
0381 iTask.doneWaiting(*iPtr);
0382 }
0383 };
0384 return wrapped;
0385 }
0386 }
0387
0388
0389 typedef std::vector<std::string> vstring;
0390
0391
0392
0393 Schedule::Schedule(ParameterSet& proc_pset,
0394 service::TriggerNamesService const& tns,
0395 SignallingProductRegistryFiller& preg,
0396 ExceptionToActionTable const& actions,
0397 std::shared_ptr<ActivityRegistry> areg,
0398 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0399 PreallocationConfiguration const& prealloc,
0400 ProcessContext const* processContext,
0401 ModuleTypeResolverMaker const* resolverMaker)
0402 :
0403 moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
0404 all_output_communicators_(),
0405 preallocConfig_(prealloc),
0406 pathNames_(&tns.getTrigPaths()),
0407 endPathNames_(&tns.getEndPaths()),
0408 wantSummary_(tns.wantSummary()) {
0409 ScheduleBuilder builder(
0410 *moduleRegistry_, proc_pset, *pathNames_, *endPathNames_, prealloc, preg, *areg, processConfiguration);
0411 resultsInserter_ = std::move(builder.resultsInserter_);
0412 assert(builder.pathStatusInserters_.size() == builder.pathNameAndModules_.size());
0413 pathStatusInserters_ = std::move(builder.pathStatusInserters_);
0414 endPathStatusInserters_ = std::move(builder.endPathStatusInserters_);
0415
0416 std::vector<StreamSchedule::PathInfo> paths;
0417 paths.reserve(builder.pathNameAndModules_.size());
0418 unsigned int index = 0;
0419 for (auto& path : builder.pathNameAndModules_) {
0420 paths.emplace_back(path.first, std::move(path.second), get_underlying_safe(pathStatusInserters_[index]));
0421 ++index;
0422 }
0423 std::vector<StreamSchedule::EndPathInfo> endpaths;
0424 endpaths.reserve(builder.endpathNameAndModules_.size());
0425 if (builder.endpathNameAndModules_.size() == 1) {
0426 assert(endPathStatusInserters_.empty());
0427 auto& path = builder.endpathNameAndModules_.front();
0428 endpaths.emplace_back(path.first, std::move(path.second), std::shared_ptr<EndPathStatusInserter>());
0429 } else {
0430 assert(endPathStatusInserters_.size() == builder.endpathNameAndModules_.size());
0431 index = 0;
0432 for (auto& path : builder.endpathNameAndModules_) {
0433 endpaths.emplace_back(path.first, std::move(path.second), get_underlying_safe(endPathStatusInserters_[index]));
0434 ++index;
0435 }
0436 }
0437
0438 assert(0 < prealloc.numberOfStreams());
0439 streamSchedules_.reserve(prealloc.numberOfStreams());
0440 for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0441 streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(paths,
0442 endpaths,
0443 builder.unscheduledModules_,
0444 resultsInserter(),
0445 moduleRegistrySharedPtr(),
0446 actions,
0447 areg,
0448 StreamID{i},
0449 processContext));
0450 }
0451
0452 globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0453 pathStatusInserters_,
0454 endPathStatusInserters_,
0455 moduleRegistrySharedPtr(),
0456 builder.allNeededModules_,
0457 prealloc,
0458 actions,
0459 areg,
0460 processContext);
0461 }
0462
0463 void Schedule::finishSetup(ParameterSet& proc_pset,
0464 service::TriggerNamesService const& tns,
0465 SignallingProductRegistryFiller& preg,
0466 BranchIDListHelper& branchIDListHelper,
0467 ProcessBlockHelperBase& processBlockHelper,
0468 ThinnedAssociationsHelper& thinnedAssociationsHelper,
0469 std::shared_ptr<ActivityRegistry> areg,
0470 std::shared_ptr<ProcessConfiguration> processConfiguration,
0471 PreallocationConfiguration const& prealloc,
0472 ProcessContext const* processContext) {
0473
0474
0475 const std::string kTriggerResults("TriggerResults");
0476
0477 std::set<std::string> usedModuleLabels;
0478 moduleRegistry_->forAllModuleHolders([&usedModuleLabels, &kTriggerResults](maker::ModuleHolder* iHolder) {
0479 if (iHolder->moduleDescription().moduleLabel() != kTriggerResults) {
0480 usedModuleLabels.insert(iHolder->moduleDescription().moduleLabel());
0481 }
0482 });
0483 std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0484 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0485 reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0486 {
0487 std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0488 detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0489 }
0490
0491
0492
0493 {
0494 auto const& unsched = streamSchedules_[0]->unscheduledWorkersLumisAndEvents();
0495 if (not unsched.empty()) {
0496 std::set<std::string> unscheduledModules;
0497 std::transform(unsched.begin(),
0498 unsched.end(),
0499 std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
0500 [](auto worker) { return worker->description()->moduleLabel(); });
0501 preg.setUnscheduledProducts(unscheduledModules);
0502 }
0503 }
0504
0505 processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0506 proc_pset.registerIt();
0507 processConfiguration->setParameterSetID(proc_pset.id());
0508 processConfiguration->setProcessConfigurationID();
0509
0510
0511
0512 size_t all_workers_count = moduleRegistry_->maxModuleID();
0513
0514 moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0515 auto comm = iHolder->createOutputModuleCommunicator();
0516 if (comm) {
0517 all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0518 }
0519 });
0520
0521 limitOutput(proc_pset, branchIDListHelper.branchIDLists());
0522
0523
0524
0525 assert(all_workers_count == moduleRegistry_->maxModuleID());
0526
0527 branchIDListHelper.updateFromRegistry(preg.registry());
0528
0529 moduleRegistry_->forAllModuleHolders([&preg, &thinnedAssociationsHelper](auto& iHolder) {
0530 iHolder->registerThinnedAssociations(preg.registry(), thinnedAssociationsHelper);
0531 });
0532
0533 processBlockHelper.updateForNewProcess(preg.registry(), processConfiguration->processName());
0534
0535
0536
0537 for (auto& c : all_output_communicators_) {
0538 c->selectProducts(preg.registry(), thinnedAssociationsHelper, processBlockHelper);
0539 }
0540
0541 for (auto& product : preg.productListUpdator()) {
0542 setIsMergeable(product.second);
0543 }
0544
0545 {
0546
0547 std::set<TypeID> productTypesConsumed;
0548 std::set<TypeID> elementTypesConsumed;
0549
0550 moduleRegistry_->forAllModuleHolders(
0551 [&productTypesConsumed, &elementTypesConsumed](maker::ModuleHolder* iHolder) {
0552 for (auto const& consumesInfo : iHolder->moduleConsumesInfos()) {
0553 if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0554 productTypesConsumed.emplace(consumesInfo.type());
0555 } else {
0556 elementTypesConsumed.emplace(consumesInfo.type());
0557 }
0558 }
0559 });
0560
0561 {
0562 Service<RandomNumberGenerator> rng;
0563 if (rng.isAvailable()) {
0564
0565 auto consumer = rng->consumer();
0566 if (consumer) {
0567 for (auto const& consumesInfo : consumer->moduleConsumesInfos()) {
0568 productTypesConsumed.emplace(consumesInfo.type());
0569 }
0570 }
0571 }
0572 }
0573 preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0574 }
0575
0576 for (auto& c : all_output_communicators_) {
0577 c->setEventSelectionInfo(outputModulePathPositions, preg.registry().anyProductProduced());
0578 }
0579
0580 if (wantSummary_) {
0581 std::vector<const ModuleDescription*> modDesc;
0582 const auto& workers = allWorkers();
0583 modDesc.reserve(workers.size());
0584
0585 std::transform(workers.begin(),
0586 workers.end(),
0587 std::back_inserter(modDesc),
0588 [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0589
0590
0591 summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0592 auto timeKeeperPtr = summaryTimeKeeper_.get();
0593
0594 areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0595
0596 areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0597 areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0598 areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0599 areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0600 areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0601 areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0602
0603 areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0604 areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0605
0606 areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0607 areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0608
0609 areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0610 areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0611
0612
0613
0614 }
0615
0616 }
0617
0618 void Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
0619 std::string const output("output");
0620
0621 ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0622 int maxEventSpecs = 0;
0623 int maxEventsOut = -1;
0624 ParameterSet const* vMaxEventsOut = nullptr;
0625 std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0626 if (search_all(intNamesE, output)) {
0627 maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0628 ++maxEventSpecs;
0629 }
0630 std::vector<std::string> psetNamesE;
0631 maxEventsPSet.getParameterSetNames(psetNamesE, false);
0632 if (search_all(psetNamesE, output)) {
0633 vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0634 ++maxEventSpecs;
0635 }
0636
0637 if (maxEventSpecs > 1) {
0638 throw Exception(errors::Configuration)
0639 << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0640 }
0641
0642 for (auto& c : all_output_communicators_) {
0643 OutputModuleDescription desc(branchIDLists, maxEventsOut);
0644 if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0645 std::string const& moduleLabel = c->description().moduleLabel();
0646 try {
0647 desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0648 } catch (Exception const&) {
0649 throw Exception(errors::Configuration)
0650 << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0651 }
0652 }
0653 c->configure(desc);
0654 }
0655 }
0656
0657 bool Schedule::terminate() const {
0658 if (all_output_communicators_.empty()) {
0659 return false;
0660 }
0661 for (auto& c : all_output_communicators_) {
0662 if (!c->limitReached()) {
0663
0664 return false;
0665 }
0666 }
0667 LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0668 << "has reached its configured limit.\n";
0669 return true;
0670 }
0671
0672 void Schedule::endJob(ExceptionCollector& collector) {
0673 globalSchedule_->endJob(collector, *moduleRegistry_);
0674 if (collector.hasThrown()) {
0675 return;
0676 }
0677
0678 if (wantSummary_) {
0679 try {
0680 convertException::wrap([this]() { sendFwkSummaryToMessageLogger(); });
0681 } catch (cms::Exception const& ex) {
0682 collector.addException(ex);
0683 }
0684 }
0685 }
0686
0687 void Schedule::sendFwkSummaryToMessageLogger() const {
0688
0689
0690 auto logForEach = [](auto const& iContainer, auto iMessage) {
0691 auto logger = LogFwkVerbatim("FwkSummary");
0692 int count = 0;
0693 for (auto const& element : iContainer) {
0694 iMessage(logger, element);
0695 if (++count == 10) {
0696 logger = LogFwkVerbatim("FwkSummary");
0697 count = 0;
0698 } else {
0699 logger << "\n";
0700 }
0701 }
0702 };
0703
0704 {
0705 TriggerReport tr;
0706 getTriggerReport(tr);
0707
0708
0709
0710 LogFwkVerbatim("FwkSummary") << "";
0711 LogFwkVerbatim("FwkSummary") << "TrigReport "
0712 << "---------- Event Summary ------------";
0713 if (!tr.trigPathSummaries.empty()) {
0714 LogFwkVerbatim("FwkSummary") << "TrigReport"
0715 << " Events total = " << tr.eventSummary.totalEvents
0716 << " passed = " << tr.eventSummary.totalEventsPassed
0717 << " failed = " << tr.eventSummary.totalEventsFailed << "";
0718 } else {
0719 LogFwkVerbatim("FwkSummary") << "TrigReport"
0720 << " Events total = " << tr.eventSummary.totalEvents
0721 << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0722 }
0723
0724 LogFwkVerbatim("FwkSummary") << "";
0725 LogFwkVerbatim("FwkSummary") << "TrigReport "
0726 << "---------- Path Summary ------------";
0727 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0728 << " " << std::right << std::setw(10) << "Executed"
0729 << " " << std::right << std::setw(10) << "Passed"
0730 << " " << std::right << std::setw(10) << "Failed"
0731 << " " << std::right << std::setw(10) << "Error"
0732 << " "
0733 << "Name"
0734 << "";
0735 logForEach(tr.trigPathSummaries, [](auto& logger, auto const& p) {
0736 logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << p.bitPosition << " "
0737 << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0738 << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0739 << p.timesExcept << " " << p.name;
0740 });
0741
0742
0743
0744
0745
0746
0747
0748
0749
0750
0751
0752
0753
0754
0755
0756
0757
0758
0759 LogFwkVerbatim("FwkSummary") << "\n"
0760 << "TrigReport "
0761 << "-------End-Path Summary ------------\n"
0762 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0763 << " " << std::right << std::setw(10) << "Executed"
0764 << " " << std::right << std::setw(10) << "Passed"
0765 << " " << std::right << std::setw(10) << "Failed"
0766 << " " << std::right << std::setw(10) << "Error"
0767 << " "
0768 << "Name";
0769 logForEach(tr.endPathSummaries, [](auto& logger, auto const& p) {
0770 logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << p.bitPosition << " "
0771 << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0772 << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0773 << p.timesExcept << " " << p.name;
0774 });
0775
0776 for (auto const& p : tr.trigPathSummaries) {
0777 LogFwkVerbatim("FwkSummary") << "\n"
0778 << "TrigReport "
0779 << "---------- Modules in Path: " << p.name << " ------------\n"
0780 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0781 << " " << std::right << std::setw(10) << "Visited"
0782 << " " << std::right << std::setw(10) << "Passed"
0783 << " " << std::right << std::setw(10) << "Failed"
0784 << " " << std::right << std::setw(10) << "Error"
0785 << " "
0786 << "Name";
0787
0788 logForEach(p.moduleInPathSummaries, [](auto& logger, auto const& mod) {
0789 logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << mod.bitPosition
0790 << " " << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0791 << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0792 << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0793 });
0794 }
0795
0796 for (auto const& p : tr.endPathSummaries) {
0797 LogFwkVerbatim("FwkSummary") << "\n"
0798 << "TrigReport "
0799 << "------ Modules in End-Path: " << p.name << " ------------\n"
0800 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0801 << " " << std::right << std::setw(10) << "Visited"
0802 << " " << std::right << std::setw(10) << "Passed"
0803 << " " << std::right << std::setw(10) << "Failed"
0804 << " " << std::right << std::setw(10) << "Error"
0805 << " "
0806 << "Name";
0807
0808 unsigned int bitpos = 0;
0809 logForEach(p.moduleInPathSummaries, [&bitpos](auto& logger, auto const& mod) {
0810 logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << bitpos << " "
0811 << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0812 << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0813 << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0814 ++bitpos;
0815 });
0816 }
0817
0818 LogFwkVerbatim("FwkSummary") << "\n"
0819 << "TrigReport "
0820 << "---------- Module Summary ------------\n"
0821 << "TrigReport " << std::right << std::setw(10) << "Visited"
0822 << " " << std::right << std::setw(10) << "Executed"
0823 << " " << std::right << std::setw(10) << "Passed"
0824 << " " << std::right << std::setw(10) << "Failed"
0825 << " " << std::right << std::setw(10) << "Error"
0826 << " "
0827 << "Name";
0828
0829 logForEach(tr.workerSummaries, [](auto& logger, auto const& worker) {
0830 logger << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " " << std::right
0831 << std::setw(10) << worker.timesRun << " " << std::right << std::setw(10) << worker.timesPassed << " "
0832 << std::right << std::setw(10) << worker.timesFailed << " " << std::right << std::setw(10)
0833 << worker.timesExcept << " " << worker.moduleLabel;
0834 });
0835 LogFwkVerbatim("FwkSummary") << "";
0836 }
0837
0838 TriggerTimingReport tr;
0839 getTriggerTimingReport(tr);
0840
0841 const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0842
0843 LogFwkVerbatim("FwkSummary") << "TimeReport "
0844 << "---------- Event Summary ---[sec]----";
0845 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0846 << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0847 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0848 << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0849 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0850 << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0851 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0852 << " efficiency CPU/Real/thread = "
0853 << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0854 preallocConfig_.numberOfThreads();
0855
0856 constexpr int kColumn1Size = 10;
0857 constexpr int kColumn2Size = 12;
0858 constexpr int kColumn3Size = 12;
0859 LogFwkVerbatim("FwkSummary") << "";
0860 LogFwkVerbatim("FwkSummary") << "TimeReport "
0861 << "---------- Path Summary ---[Real sec]----";
0862 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0863 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0864 << " Name";
0865 logForEach(tr.trigPathSummaries, [&](auto& logger, auto const& p) {
0866 const int timesRun = std::max(1, p.timesRun);
0867 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0868 << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0869 << " " << p.name;
0870 });
0871 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0872 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0873 << " Name";
0874
0875 LogFwkVerbatim("FwkSummary") << "\n"
0876 << "TimeReport "
0877 << "-------End-Path Summary ---[Real sec]----\n"
0878 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0879 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0880 << " Name";
0881 logForEach(tr.endPathSummaries, [&](auto& logger, auto const& p) {
0882 const int timesRun = std::max(1, p.timesRun);
0883
0884 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0885 << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0886 << " " << p.name;
0887 });
0888 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0889 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0890 << " Name";
0891
0892 for (auto const& p : tr.trigPathSummaries) {
0893 LogFwkVerbatim("FwkSummary") << "\n"
0894 << "TimeReport "
0895 << "---------- Modules in Path: " << p.name << " ---[Real sec]----\n"
0896 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0897 << " " << std::right << std::setw(kColumn2Size) << "per visit"
0898 << " Name";
0899 logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
0900 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0901 << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
0902 << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
0903 });
0904 }
0905 if (not tr.trigPathSummaries.empty()) {
0906 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0907 << " " << std::right << std::setw(kColumn2Size) << "per visit"
0908 << " Name";
0909 }
0910 for (auto const& p : tr.endPathSummaries) {
0911 LogFwkVerbatim("FwkSummary") << "\n"
0912 << "TimeReport "
0913 << "------ Modules in End-Path: " << p.name << " ---[Real sec]----\n"
0914 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0915 << " " << std::right << std::setw(kColumn2Size) << "per visit"
0916 << " Name";
0917 logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
0918 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0919 << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
0920 << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
0921 });
0922 }
0923 if (not tr.endPathSummaries.empty()) {
0924 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0925 << " " << std::right << std::setw(kColumn2Size) << "per visit"
0926 << " Name";
0927 }
0928 LogFwkVerbatim("FwkSummary") << "\n"
0929 << "TimeReport "
0930 << "---------- Module Summary ---[Real sec]----\n"
0931 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0932 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0933 << " " << std::right << std::setw(kColumn3Size) << "per visit"
0934 << " Name";
0935 logForEach(tr.workerSummaries, [&](auto& logger, auto const& worker) {
0936 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0937 << worker.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
0938 << worker.realTime / std::max(1, worker.timesRun) << " " << std::right << std::setw(kColumn3Size)
0939 << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel;
0940 });
0941 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0942 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0943 << " " << std::right << std::setw(kColumn3Size) << "per visit"
0944 << " Name";
0945
0946 LogFwkVerbatim("FwkSummary") << "\nT---Report end!\n\n";
0947 }
0948
0949 void Schedule::closeOutputFiles() {
0950 using std::placeholders::_1;
0951 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
0952 moduleRegistry_->forAllModuleHolders([](maker::ModuleHolder* iHolder) { iHolder->respondToCloseOutputFile(); });
0953 }
0954
0955 void Schedule::openOutputFiles(FileBlock& fb) {
0956 using std::placeholders::_1;
0957 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
0958 }
0959
0960 void Schedule::writeRunAsync(WaitingTaskHolder task,
0961 RunPrincipal const& rp,
0962 ProcessContext const* processContext,
0963 ActivityRegistry* activityRegistry,
0964 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
0965 auto token = ServiceRegistry::instance().presentToken();
0966 GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
0967 LuminosityBlockID(rp.run(), 0),
0968 rp.index(),
0969 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
0970 rp.endTime(),
0971 processContext);
0972
0973 using namespace edm::waiting_task;
0974 chain::first([&](auto nextTask) {
0975
0976 ServiceRegistry::Operate op(token);
0977
0978
0979 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
0980 }
0981 for (auto& c : all_output_communicators_) {
0982 c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
0983 }
0984 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
0985
0986 ServiceRegistry::Operate op(token);
0987
0988 activityRegistry->postGlobalWriteRunSignal_(globalContext);
0989 })) |
0990 chain::runLast(task);
0991 }
0992
0993 void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
0994 ProcessBlockPrincipal const& pbp,
0995 ProcessContext const* processContext,
0996 ActivityRegistry* activityRegistry) {
0997 auto token = ServiceRegistry::instance().presentToken();
0998 GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
0999 LuminosityBlockID(),
1000 RunIndex::invalidRunIndex(),
1001 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1002 Timestamp::invalidTimestamp(),
1003 processContext);
1004
1005 using namespace edm::waiting_task;
1006 chain::first([&](auto nextTask) {
1007
1008 ServiceRegistry::Operate op(token);
1009 CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1010 }
1011 for (auto& c : all_output_communicators_) {
1012 c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1013 }
1014 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1015
1016 ServiceRegistry::Operate op(token);
1017
1018 activityRegistry->postWriteProcessBlockSignal_(globalContext);
1019 })) |
1020 chain::runLast(std::move(task));
1021 }
1022
1023 void Schedule::writeLumiAsync(WaitingTaskHolder task,
1024 LuminosityBlockPrincipal const& lbp,
1025 ProcessContext const* processContext,
1026 ActivityRegistry* activityRegistry) {
1027 auto token = ServiceRegistry::instance().presentToken();
1028 GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1029 lbp.id(),
1030 lbp.runPrincipal().index(),
1031 lbp.index(),
1032 lbp.beginTime(),
1033 processContext);
1034
1035 using namespace edm::waiting_task;
1036 chain::first([&](auto nextTask) {
1037 ServiceRegistry::Operate op(token);
1038 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1039 }
1040 for (auto& c : all_output_communicators_) {
1041 c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1042 }
1043 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1044
1045 ServiceRegistry::Operate op(token);
1046
1047 activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1048 })) |
1049 chain::runLast(task);
1050 }
1051
1052 bool Schedule::shouldWeCloseOutput() const {
1053 using std::placeholders::_1;
1054
1055 return (std::find_if(all_output_communicators_.begin(),
1056 all_output_communicators_.end(),
1057 std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1058 all_output_communicators_.end());
1059 }
1060
1061 void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1062 moduleRegistry_->forAllModuleHolders([&fb](maker::ModuleHolder* iHolder) { iHolder->respondToOpenInputFile(fb); });
1063 }
1064
1065 void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1066 moduleRegistry_->forAllModuleHolders([&fb](maker::ModuleHolder* iHolder) { iHolder->respondToCloseInputFile(fb); });
1067 }
1068
1069 void Schedule::beginJob(ProductRegistry const& iRegistry,
1070 eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
1071 ProcessBlockHelperBase const& processBlockHelperBase,
1072 std::string const& iProcessName) {
1073 finishModulesInitialization(*moduleRegistry_, iRegistry, iESIndices, processBlockHelperBase, iProcessName);
1074 globalSchedule_->beginJob(*moduleRegistry_);
1075 }
1076
1077 void Schedule::beginStream(unsigned int streamID) {
1078 assert(streamID < streamSchedules_.size());
1079 streamSchedules_[streamID]->beginStream(*moduleRegistry_);
1080 }
1081
1082 void Schedule::endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept {
1083 assert(streamID < streamSchedules_.size());
1084 streamSchedules_[streamID]->endStream(*moduleRegistry_, collector, collectorMutex);
1085 }
1086
1087 void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1088 unsigned int iStreamID,
1089 EventTransitionInfo& info,
1090 ServiceToken const& token) {
1091 assert(iStreamID < streamSchedules_.size());
1092 streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1093 }
1094
1095 bool Schedule::changeModule(std::string const& iLabel,
1096 ParameterSet const& iPSet,
1097 const SignallingProductRegistryFiller& iRegistry,
1098 eventsetup::ESRecordsToProductResolverIndices const& iIndices) {
1099 Worker* found = nullptr;
1100 for (auto const& worker : allWorkers()) {
1101 if (worker->description()->moduleLabel() == iLabel) {
1102 found = worker;
1103 break;
1104 }
1105 }
1106 if (nullptr == found) {
1107 return false;
1108 }
1109
1110 auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1111
1112 globalSchedule_->replaceModule(newMod, iLabel);
1113
1114 for (auto& s : streamSchedules_) {
1115 s->replaceModule(newMod, iLabel);
1116 }
1117
1118 {
1119
1120 auto const processBlockLookup = iRegistry.registry().productLookup(InProcess);
1121 auto const runLookup = iRegistry.registry().productLookup(InRun);
1122 auto const lumiLookup = iRegistry.registry().productLookup(InLumi);
1123 auto const eventLookup = iRegistry.registry().productLookup(InEvent);
1124 newMod->updateLookup(InProcess, *runLookup);
1125 newMod->updateLookup(InRun, *runLookup);
1126 newMod->updateLookup(InLumi, *lumiLookup);
1127 newMod->updateLookup(InEvent, *eventLookup);
1128 newMod->updateLookup(iIndices);
1129
1130 auto const& processName = newMod->moduleDescription().processName();
1131 auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1132 auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1133 auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1134 auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1135 newMod->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1136 newMod->resolvePutIndicies(InRun, runModuleToIndicies);
1137 newMod->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1138 newMod->resolvePutIndicies(InEvent, eventModuleToIndicies);
1139 }
1140
1141 return true;
1142 }
1143
1144 void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1145 globalSchedule_->deleteModule(iLabel);
1146 for (auto& stream : streamSchedules_) {
1147 stream->deleteModule(iLabel);
1148 }
1149 moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1150 }
1151
1152 void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1153 std::multimap<std::string, std::string> const& referencesToBranches,
1154 std::vector<std::string> const& modulesToSkip,
1155 edm::ProductRegistry const& preg) {
1156 for (auto& stream : streamSchedules_) {
1157 stream->initializeEarlyDelete(*moduleRegistry_, branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1158 }
1159 }
1160
1161 std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1162 std::vector<ModuleDescription const*> result;
1163 result.reserve(moduleRegistry_->maxModuleID() + 1);
1164
1165 moduleRegistry_->forAllModuleHolders([&](auto const* iHolder) { result.push_back(&iHolder->moduleDescription()); });
1166 return result;
1167 }
1168 Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1169
1170 void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1171 moduleRegistry_->forAllModuleHolders([&](auto& iHolder) { iHolder->convertCurrentProcessAlias(processName); });
1172 }
1173
1174 void Schedule::releaseMemoryPostLookupSignal() {
1175 moduleRegistry_->forAllModuleHolders([&](auto& iHolder) { iHolder->releaseMemoryPostLookupSignal(); });
1176 }
1177
1178 void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1179 streamSchedules_[0]->availablePaths(oLabelsToFill);
1180 }
1181
1182 void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1183
1184 void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1185
1186 void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1187 streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1188 }
1189
1190 void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1191 std::vector<ModuleDescription const*>& descriptions,
1192 unsigned int hint) const {
1193 streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1194 }
1195
1196 void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1197 std::vector<ModuleDescription const*>& descriptions,
1198 unsigned int hint) const {
1199 streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1200 }
1201
1202 void Schedule::getTriggerReport(TriggerReport& rep) const {
1203 rep.eventSummary.totalEvents = 0;
1204 rep.eventSummary.totalEventsPassed = 0;
1205 rep.eventSummary.totalEventsFailed = 0;
1206 for (auto& s : streamSchedules_) {
1207 s->getTriggerReport(rep);
1208 }
1209 sort_all(rep.workerSummaries);
1210 }
1211
1212 void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1213 rep.eventSummary.totalEvents = 0;
1214 rep.eventSummary.cpuTime = 0.;
1215 rep.eventSummary.realTime = 0.;
1216 summaryTimeKeeper_->fillTriggerTimingReport(rep);
1217 }
1218
1219 int Schedule::totalEvents() const {
1220 int returnValue = 0;
1221 for (auto& s : streamSchedules_) {
1222 returnValue += s->totalEvents();
1223 }
1224 return returnValue;
1225 }
1226
1227 int Schedule::totalEventsPassed() const {
1228 int returnValue = 0;
1229 for (auto& s : streamSchedules_) {
1230 returnValue += s->totalEventsPassed();
1231 }
1232 return returnValue;
1233 }
1234
1235 int Schedule::totalEventsFailed() const {
1236 int returnValue = 0;
1237 for (auto& s : streamSchedules_) {
1238 returnValue += s->totalEventsFailed();
1239 }
1240 return returnValue;
1241 }
1242
1243 void Schedule::clearCounters() {
1244 for (auto& s : streamSchedules_) {
1245 s->clearCounters();
1246 }
1247 }
1248 }