File indexing completed on 2025-06-17 01:30:21
0001 #include "FWCore/Framework/interface/Schedule.h"
0002
0003 #include "DataFormats/Common/interface/setIsMergeable.h"
0004 #include "DataFormats/Common/interface/TriggerResults.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0008 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 #include "FWCore/AbstractServices/interface/RandomNumberGenerator.h"
0011 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0012 #include "FWCore/Framework/interface/ConsumesCollector.h"
0013 #include "FWCore/Framework/interface/EDConsumerBase.h"
0014 #include "FWCore/Framework/src/OutputModuleDescription.h"
0015 #include "FWCore/Framework/interface/TriggerNamesService.h"
0016 #include "FWCore/Framework/src/TriggerReport.h"
0017 #include "FWCore/Framework/src/TriggerTimingReport.h"
0018 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0019 #include "FWCore/Framework/src/ModuleHolderFactory.h"
0020 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0021 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0022 #include "FWCore/Framework/interface/ModuleRegistry.h"
0023 #include "FWCore/Framework/src/TriggerResultInserter.h"
0024 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0025 #include "FWCore/Framework/src/PathStatusInserter.h"
0026 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0027 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0028 #include "FWCore/Framework/interface/ComponentDescription.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 #include "FWCore/Concurrency/interface/chain_first.h"
0031 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0032 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0033 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0034 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0035 #include "FWCore/ServiceRegistry/interface/ModuleConsumesInfo.h"
0036 #include "FWCore/Utilities/interface/Algorithms.h"
0037 #include "FWCore/Utilities/interface/ConvertException.h"
0038 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0039 #include "FWCore/Utilities/interface/TypeID.h"
0040 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0041
0042 #include <array>
0043 #include <algorithm>
0044 #include <cassert>
0045 #include <cstdlib>
0046 #include <functional>
0047 #include <iomanip>
0048 #include <list>
0049 #include <map>
0050 #include <set>
0051 #include <exception>
0052 #include <sstream>
0053
0054 #include "make_shared_noexcept_false.h"
0055 #include "processEDAliases.h"
0056
0057 namespace edm {
0058
0059 class ModuleMakerBase;
0060
0061 namespace {
0062 using std::placeholders::_1;
0063
0064 bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0065 return std::binary_search(v.begin(), v.end(), s);
0066 }
0067
0068
0069
0070
0071 std::shared_ptr<TriggerResultInserter> makeInserter(
0072 ParameterSet& proc_pset,
0073 PreallocationConfiguration const& iPrealloc,
0074 SignallingProductRegistryFiller& preg,
0075 ExceptionToActionTable const& actions,
0076 std::shared_ptr<ActivityRegistry> areg,
0077 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0078 ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
0079 trig_pset->registerIt();
0080
0081 WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
0082 ModuleDescription md(trig_pset->id(),
0083 "TriggerResultInserter",
0084 "TriggerResults",
0085 processConfiguration.get(),
0086 ModuleDescription::getUniqueID());
0087
0088 areg->preModuleConstructionSignal_(md);
0089 bool postCalled = false;
0090 std::shared_ptr<TriggerResultInserter> returnValue;
0091
0092 CMS_SA_ALLOW try {
0093 auto module = make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams());
0094 maker::ModuleHolderT<TriggerResultInserter::ModuleType>::finishModuleInitialization(
0095 *module, md, iPrealloc, &preg);
0096 returnValue = module;
0097 postCalled = true;
0098
0099 areg->postModuleConstructionSignal_(md);
0100 } catch (...) {
0101 if (!postCalled) {
0102 CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0103
0104 }
0105 }
0106 throw;
0107 }
0108 return returnValue;
0109 }
0110
0111 template <typename T>
0112 void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
0113 std::vector<std::string> const& pathNames,
0114 PreallocationConfiguration const& iPrealloc,
0115 SignallingProductRegistryFiller& preg,
0116 std::shared_ptr<ActivityRegistry> areg,
0117 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0118 std::string const& moduleTypeName) {
0119 ParameterSet pset;
0120 pset.addParameter<std::string>("@module_type", moduleTypeName);
0121 pset.addParameter<std::string>("@module_edm_type", "EDProducer");
0122 pset.registerIt();
0123
0124 pathStatusInserters.reserve(pathNames.size());
0125
0126 for (auto const& pathName : pathNames) {
0127 ModuleDescription md(
0128 pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
0129
0130 areg->preModuleConstructionSignal_(md);
0131 bool postCalled = false;
0132
0133 CMS_SA_ALLOW try {
0134 auto module = make_shared_noexcept_false<T>(iPrealloc.numberOfStreams());
0135 maker::ModuleHolderT<typename T::ModuleType>::finishModuleInitialization(*module, md, iPrealloc, &preg);
0136 pathStatusInserters.emplace_back(module);
0137 postCalled = true;
0138
0139 areg->postModuleConstructionSignal_(md);
0140 } catch (...) {
0141 if (!postCalled) {
0142 CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0143
0144 }
0145 }
0146 throw;
0147 }
0148 }
0149 }
0150
0151 typedef std::vector<std::string> vstring;
0152
0153 void processSwitchProducers(ParameterSet const& proc_pset,
0154 std::string const& processName,
0155 SignallingProductRegistryFiller& preg) {
0156
0157 struct BranchesCases {
0158 BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0159 std::vector<BranchKey> chosenBranches;
0160 std::vector<std::string> caseLabels;
0161 };
0162 std::map<std::string, BranchesCases> switchMap;
0163 for (auto& prod : preg.productListUpdator()) {
0164 if (prod.second.isSwitchAlias()) {
0165 auto it = switchMap.find(prod.second.moduleLabel());
0166 if (it == switchMap.end()) {
0167 auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0168 auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0169 switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0170 assert(inserted.second);
0171 it = inserted.first;
0172 }
0173
0174 bool found = false;
0175 for (auto const& productIter : preg.registry().productList()) {
0176 BranchKey const& branchKey = productIter.first;
0177
0178
0179
0180
0181 if (branchKey.processName() != processName) {
0182 continue;
0183 }
0184
0185 ProductDescription const& desc = productIter.second;
0186 if (desc.branchType() == prod.second.branchType() and
0187 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0188 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0189 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0190 prod.second.setSwitchAliasForBranch(desc);
0191 if (!prod.second.transient()) {
0192 it->second.chosenBranches.push_back(prod.first);
0193 }
0194 found = true;
0195 }
0196 }
0197 if (not found) {
0198 Exception ex(errors::LogicError);
0199 ex << "Trying to find a ProductDescription to be aliased-for by SwitchProducer with\n"
0200 << " friendly class name = " << prod.second.friendlyClassName() << "\n"
0201 << " module label = " << prod.second.moduleLabel() << "\n"
0202 << " product instance name = " << prod.second.productInstanceName() << "\n"
0203 << " process name = " << processName
0204 << "\n\nbut did not find any. Please contact a framework developer.";
0205 ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0206 throw ex;
0207 }
0208 }
0209 }
0210 if (switchMap.empty())
0211 return;
0212
0213 for (auto& elem : switchMap) {
0214 std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0215 }
0216
0217 auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0218 std::map<std::string, std::vector<BranchKey>> caseBranches;
0219 for (auto const& item : preg.registry().productList()) {
0220 if (item.first.processName() != processName)
0221 continue;
0222
0223 if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0224 found != caseLabels.end()) {
0225 caseBranches[*found].push_back(item.first);
0226 }
0227 }
0228
0229 for (auto const& caseLabel : caseLabels) {
0230 ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0231 auto& branches = caseBranches[caseLabel];
0232 std::sort(branches.begin(), branches.end());
0233 for (auto const& branch : branches) {
0234 ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0235 }
0236 }
0237 };
0238
0239
0240
0241 std::vector<bool> foundBranches;
0242 for (auto const& switchItem : switchMap) {
0243 auto const& switchLabel = switchItem.first;
0244 auto const& chosenBranches = switchItem.second.chosenBranches;
0245 auto const& caseLabels = switchItem.second.caseLabels;
0246 foundBranches.resize(chosenBranches.size());
0247 for (auto const& caseLabel : caseLabels) {
0248 std::fill(foundBranches.begin(), foundBranches.end(), false);
0249 for (auto& nonConstItem : preg.productListUpdator()) {
0250 auto const& item = nonConstItem;
0251 if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0252
0253
0254 if (!item.second.transient()) {
0255 auto range = std::equal_range(chosenBranches.begin(),
0256 chosenBranches.end(),
0257 BranchKey(item.first.friendlyClassName(),
0258 switchLabel,
0259 item.first.productInstanceName(),
0260 item.first.processName()));
0261 if (range.first == range.second) {
0262 Exception ex(errors::Configuration);
0263 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0264 << item.first << " that is not produced by the chosen case "
0265 << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0266 .getUntrackedParameter<std::string>("@chosen_case")
0267 << " and that product is not transient. "
0268 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0269 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0270 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0271 addProductsToException(caseLabels, ex);
0272 throw ex;
0273 }
0274 assert(std::distance(range.first, range.second) == 1);
0275 foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0276 }
0277
0278
0279
0280
0281
0282
0283
0284
0285
0286 nonConstItem.second.setTransient(true);
0287
0288
0289 auto const& bd = item.second;
0290 if (not bd.branchAliases().empty()) {
0291 auto ex = Exception(errors::UnimplementedFeature)
0292 << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0293 "aliases for SwitchProducer with label "
0294 << switchLabel << " for case " << caseLabel << ":";
0295 for (auto const& branchAlias : bd.branchAliases()) {
0296 ex << " " << branchAlias;
0297 }
0298 throw ex;
0299 }
0300 }
0301 }
0302
0303 for (size_t i = 0; i < chosenBranches.size(); i++) {
0304 if (not foundBranches[i]) {
0305 auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0306 .getUntrackedParameter<std::string>("@chosen_case");
0307 Exception ex(errors::Configuration);
0308 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0309 << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0310 << chosenLabel << " and that product is not transient. "
0311 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0312 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0313 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0314 addProductsToException(caseLabels, ex);
0315 throw ex;
0316 }
0317 }
0318 }
0319 }
0320 }
0321
0322 void reduceParameterSet(ParameterSet& proc_pset,
0323 vstring const& end_path_name_list,
0324 vstring& modulesInConfig,
0325 std::set<std::string> const& usedModuleLabels,
0326 std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0327
0328
0329
0330
0331
0332
0333
0334
0335
0336
0337
0338 vstring outputModuleLabels;
0339 std::string edmType;
0340 std::string const moduleEdmType("@module_edm_type");
0341 std::string const outputModule("OutputModule");
0342 std::string const edAnalyzer("EDAnalyzer");
0343 std::string const edFilter("EDFilter");
0344 std::string const edProducer("EDProducer");
0345
0346 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0347
0348
0349
0350 vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0351 std::set<std::string> modulesOnPaths;
0352 {
0353 std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0354 for (auto const& endPath : end_path_name_list) {
0355 noEndPaths.erase(endPath);
0356 }
0357 {
0358 vstring labels;
0359 for (auto const& path : noEndPaths) {
0360 labels = proc_pset.getParameter<vstring>(path);
0361 modulesOnPaths.insert(labels.begin(), labels.end());
0362 }
0363 }
0364 }
0365
0366
0367 std::vector<std::string> labelsToBeDropped;
0368 labelsToBeDropped.reserve(modulesInConfigSet.size());
0369 std::set_difference(modulesInConfigSet.begin(),
0370 modulesInConfigSet.end(),
0371 usedModuleLabels.begin(),
0372 usedModuleLabels.end(),
0373 std::back_inserter(labelsToBeDropped));
0374
0375 const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0376 for (auto const& modLabel : usedModuleLabels) {
0377
0378
0379 if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0380 edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0381 if (edmType == outputModule) {
0382 outputModuleLabels.push_back(modLabel);
0383 labelsToBeDropped.push_back(modLabel);
0384 }
0385 if (edmType == edAnalyzer) {
0386 if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0387 labelsToBeDropped.push_back(modLabel);
0388 }
0389 }
0390 }
0391 }
0392
0393 std::inplace_merge(
0394 labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0395
0396
0397 for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0398
0399
0400 vstring::iterator endAfterRemove =
0401 std::remove_if(modulesInConfig.begin(),
0402 modulesInConfig.end(),
0403 std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0404 modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0405 proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0406
0407
0408 vstring endPathsToBeDropped;
0409 vstring labels;
0410 for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0411 iEndPath != endEndPath;
0412 ++iEndPath) {
0413 labels = proc_pset.getParameter<vstring>(*iEndPath);
0414 vstring::iterator iSave = labels.begin();
0415 vstring::iterator iBegin = labels.begin();
0416
0417 for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0418 if (binary_search_string(labelsToBeDropped, *iLabel)) {
0419 if (binary_search_string(outputModuleLabels, *iLabel)) {
0420 outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0421 }
0422 } else {
0423 if (iSave != iLabel) {
0424 iSave->swap(*iLabel);
0425 }
0426 ++iSave;
0427 }
0428 }
0429 labels.erase(iSave, labels.end());
0430 if (labels.empty()) {
0431
0432 proc_pset.eraseSimpleParameter(*iEndPath);
0433 endPathsToBeDropped.push_back(*iEndPath);
0434 } else {
0435 proc_pset.addParameter<vstring>(*iEndPath, labels);
0436 }
0437 }
0438 sort_all(endPathsToBeDropped);
0439
0440
0441 endAfterRemove = std::remove_if(scheduledPaths.begin(),
0442 scheduledPaths.end(),
0443 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0444 scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0445 proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0446
0447
0448 vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0449 endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0450 scheduledEndPaths.end(),
0451 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0452 scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0453 proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0454 }
0455
0456 class RngEDConsumer : public EDConsumerBase {
0457 public:
0458 explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
0459 Service<RandomNumberGenerator> rng;
0460 if (rng.isAvailable()) {
0461 rng->consumes(consumesCollector());
0462 for (auto const& consumesInfo : this->moduleConsumesInfos()) {
0463 typesConsumed.emplace(consumesInfo.type());
0464 }
0465 }
0466 }
0467 };
0468
0469 template <typename F>
0470 auto doCleanup(F&& iF) {
0471 auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0472 CMS_SA_ALLOW try { f(); } catch (...) {
0473 }
0474 if (iPtr) {
0475 iTask.doneWaiting(*iPtr);
0476 }
0477 };
0478 return wrapped;
0479 }
0480 }
0481
0482
0483 typedef std::vector<std::string> vstring;
0484
0485
0486
0487 Schedule::Schedule(ParameterSet& proc_pset,
0488 service::TriggerNamesService const& tns,
0489 SignallingProductRegistryFiller& preg,
0490 ExceptionToActionTable const& actions,
0491 std::shared_ptr<ActivityRegistry> areg,
0492 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0493 PreallocationConfiguration const& prealloc,
0494 ProcessContext const* processContext,
0495 ModuleTypeResolverMaker const* resolverMaker)
0496 :
0497 resultsInserter_{tns.getTrigPaths().empty()
0498 ? std::shared_ptr<TriggerResultInserter>{}
0499 : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
0500 moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
0501 all_output_communicators_(),
0502 preallocConfig_(prealloc),
0503 pathNames_(&tns.getTrigPaths()),
0504 endPathNames_(&tns.getEndPaths()),
0505 wantSummary_(tns.wantSummary()) {
0506 makePathStatusInserters(pathStatusInserters_,
0507 *pathNames_,
0508 prealloc,
0509 preg,
0510 areg,
0511 processConfiguration,
0512 std::string("PathStatusInserter"));
0513
0514 if (endPathNames_->size() > 1) {
0515 makePathStatusInserters(endPathStatusInserters_,
0516 *endPathNames_,
0517 prealloc,
0518 preg,
0519 areg,
0520 processConfiguration,
0521 std::string("EndPathStatusInserter"));
0522 }
0523
0524 assert(0 < prealloc.numberOfStreams());
0525 streamSchedules_.reserve(prealloc.numberOfStreams());
0526 for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0527 streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
0528 pathStatusInserters_,
0529 endPathStatusInserters_,
0530 moduleRegistry(),
0531 proc_pset,
0532 tns,
0533 prealloc,
0534 preg,
0535 actions,
0536 areg,
0537 processConfiguration,
0538 StreamID{i},
0539 processContext));
0540 }
0541
0542
0543
0544 const std::string kTriggerResults("TriggerResults");
0545 std::vector<std::string> modulesToUse;
0546 modulesToUse.reserve(streamSchedules_[0]->allWorkersLumisAndEvents().size());
0547 for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
0548 if (worker->description()->moduleLabel() != kTriggerResults) {
0549 modulesToUse.push_back(worker->description()->moduleLabel());
0550 }
0551 }
0552
0553 unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
0554 if (nUnscheduledModules > 0) {
0555 std::vector<std::string> temp;
0556 temp.reserve(modulesToUse.size());
0557 auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
0558 std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
0559 std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
0560 temp.swap(modulesToUse);
0561 }
0562
0563
0564 globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0565 pathStatusInserters_,
0566 endPathStatusInserters_,
0567 moduleRegistry(),
0568 modulesToUse,
0569 proc_pset,
0570 preg,
0571 prealloc,
0572 actions,
0573 areg,
0574 processConfiguration,
0575 processContext);
0576 }
0577
0578 void Schedule::finishSetup(ParameterSet& proc_pset,
0579 service::TriggerNamesService const& tns,
0580 SignallingProductRegistryFiller& preg,
0581 BranchIDListHelper& branchIDListHelper,
0582 ProcessBlockHelperBase& processBlockHelper,
0583 ThinnedAssociationsHelper& thinnedAssociationsHelper,
0584 std::shared_ptr<ActivityRegistry> areg,
0585 std::shared_ptr<ProcessConfiguration> processConfiguration,
0586 PreallocationConfiguration const& prealloc,
0587 ProcessContext const* processContext) {
0588
0589
0590 const std::string kTriggerResults("TriggerResults");
0591
0592 std::set<std::string> usedModuleLabels;
0593 for (auto const& worker : allWorkers()) {
0594 if (worker->description()->moduleLabel() != kTriggerResults) {
0595 usedModuleLabels.insert(worker->description()->moduleLabel());
0596 }
0597 }
0598 std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0599 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0600 reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0601 {
0602 std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0603 detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0604 }
0605
0606
0607
0608 {
0609 auto const& unsched = streamSchedules_[0]->unscheduledWorkersLumisAndEvents();
0610 if (not unsched.empty()) {
0611 std::set<std::string> unscheduledModules;
0612 std::transform(unsched.begin(),
0613 unsched.end(),
0614 std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
0615 [](auto worker) { return worker->description()->moduleLabel(); });
0616 preg.setUnscheduledProducts(unscheduledModules);
0617 }
0618 }
0619
0620 processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0621 proc_pset.registerIt();
0622 processConfiguration->setParameterSetID(proc_pset.id());
0623 processConfiguration->setProcessConfigurationID();
0624
0625
0626
0627 size_t all_workers_count = allWorkers().size();
0628
0629 moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0630 auto comm = iHolder->createOutputModuleCommunicator();
0631 if (comm) {
0632 all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0633 }
0634 });
0635
0636 limitOutput(proc_pset, branchIDListHelper.branchIDLists());
0637
0638
0639
0640 assert(all_workers_count == allWorkers().size());
0641
0642 branchIDListHelper.updateFromRegistry(preg.registry());
0643
0644 for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
0645 worker->registerThinnedAssociations(preg.registry(), thinnedAssociationsHelper);
0646 }
0647
0648 processBlockHelper.updateForNewProcess(preg.registry(), processConfiguration->processName());
0649
0650
0651
0652 for (auto& c : all_output_communicators_) {
0653 c->selectProducts(preg.registry(), thinnedAssociationsHelper, processBlockHelper);
0654 }
0655
0656 for (auto& product : preg.productListUpdator()) {
0657 setIsMergeable(product.second);
0658 }
0659
0660 {
0661
0662 std::set<TypeID> productTypesConsumed;
0663 std::set<TypeID> elementTypesConsumed;
0664
0665 for (auto const& worker : allWorkers()) {
0666 for (auto const& consumesInfo : worker->moduleConsumesInfos()) {
0667 if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0668 productTypesConsumed.emplace(consumesInfo.type());
0669 } else {
0670 elementTypesConsumed.emplace(consumesInfo.type());
0671 }
0672 }
0673 }
0674
0675 { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
0676 preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0677 }
0678
0679 for (auto& c : all_output_communicators_) {
0680 c->setEventSelectionInfo(outputModulePathPositions, preg.registry().anyProductProduced());
0681 }
0682
0683 if (wantSummary_) {
0684 std::vector<const ModuleDescription*> modDesc;
0685 const auto& workers = allWorkers();
0686 modDesc.reserve(workers.size());
0687
0688 std::transform(workers.begin(),
0689 workers.end(),
0690 std::back_inserter(modDesc),
0691 [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0692
0693
0694 summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0695 auto timeKeeperPtr = summaryTimeKeeper_.get();
0696
0697 areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0698
0699 areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0700 areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0701 areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0702 areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0703 areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0704 areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0705
0706 areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0707 areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0708
0709 areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0710 areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0711
0712 areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0713 areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0714
0715
0716
0717 }
0718
0719 }
0720
0721 void Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
0722 std::string const output("output");
0723
0724 ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0725 int maxEventSpecs = 0;
0726 int maxEventsOut = -1;
0727 ParameterSet const* vMaxEventsOut = nullptr;
0728 std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0729 if (search_all(intNamesE, output)) {
0730 maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0731 ++maxEventSpecs;
0732 }
0733 std::vector<std::string> psetNamesE;
0734 maxEventsPSet.getParameterSetNames(psetNamesE, false);
0735 if (search_all(psetNamesE, output)) {
0736 vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0737 ++maxEventSpecs;
0738 }
0739
0740 if (maxEventSpecs > 1) {
0741 throw Exception(errors::Configuration)
0742 << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0743 }
0744
0745 for (auto& c : all_output_communicators_) {
0746 OutputModuleDescription desc(branchIDLists, maxEventsOut);
0747 if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0748 std::string const& moduleLabel = c->description().moduleLabel();
0749 try {
0750 desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0751 } catch (Exception const&) {
0752 throw Exception(errors::Configuration)
0753 << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0754 }
0755 }
0756 c->configure(desc);
0757 }
0758 }
0759
0760 bool Schedule::terminate() const {
0761 if (all_output_communicators_.empty()) {
0762 return false;
0763 }
0764 for (auto& c : all_output_communicators_) {
0765 if (!c->limitReached()) {
0766
0767 return false;
0768 }
0769 }
0770 LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0771 << "has reached its configured limit.\n";
0772 return true;
0773 }
0774
0775 void Schedule::endJob(ExceptionCollector& collector) {
0776 globalSchedule_->endJob(collector);
0777 if (collector.hasThrown()) {
0778 return;
0779 }
0780
0781 if (wantSummary_) {
0782 try {
0783 convertException::wrap([this]() { sendFwkSummaryToMessageLogger(); });
0784 } catch (cms::Exception const& ex) {
0785 collector.addException(ex);
0786 }
0787 }
0788 }
0789
0790 void Schedule::sendFwkSummaryToMessageLogger() const {
0791
0792
0793 auto logForEach = [](auto const& iContainer, auto iMessage) {
0794 auto logger = LogFwkVerbatim("FwkSummary");
0795 int count = 0;
0796 for (auto const& element : iContainer) {
0797 iMessage(logger, element);
0798 if (++count == 10) {
0799 logger = LogFwkVerbatim("FwkSummary");
0800 count = 0;
0801 } else {
0802 logger << "\n";
0803 }
0804 }
0805 };
0806
0807 {
0808 TriggerReport tr;
0809 getTriggerReport(tr);
0810
0811
0812
0813 LogFwkVerbatim("FwkSummary") << "";
0814 LogFwkVerbatim("FwkSummary") << "TrigReport "
0815 << "---------- Event Summary ------------";
0816 if (!tr.trigPathSummaries.empty()) {
0817 LogFwkVerbatim("FwkSummary") << "TrigReport"
0818 << " Events total = " << tr.eventSummary.totalEvents
0819 << " passed = " << tr.eventSummary.totalEventsPassed
0820 << " failed = " << tr.eventSummary.totalEventsFailed << "";
0821 } else {
0822 LogFwkVerbatim("FwkSummary") << "TrigReport"
0823 << " Events total = " << tr.eventSummary.totalEvents
0824 << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0825 }
0826
0827 LogFwkVerbatim("FwkSummary") << "";
0828 LogFwkVerbatim("FwkSummary") << "TrigReport "
0829 << "---------- Path Summary ------------";
0830 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0831 << " " << std::right << std::setw(10) << "Executed"
0832 << " " << std::right << std::setw(10) << "Passed"
0833 << " " << std::right << std::setw(10) << "Failed"
0834 << " " << std::right << std::setw(10) << "Error"
0835 << " "
0836 << "Name"
0837 << "";
0838 logForEach(tr.trigPathSummaries, [](auto& logger, auto const& p) {
0839 logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << p.bitPosition << " "
0840 << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0841 << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0842 << p.timesExcept << " " << p.name;
0843 });
0844
0845
0846
0847
0848
0849
0850
0851
0852
0853
0854
0855
0856
0857
0858
0859
0860
0861
0862 LogFwkVerbatim("FwkSummary") << "\n"
0863 << "TrigReport "
0864 << "-------End-Path Summary ------------\n"
0865 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0866 << " " << std::right << std::setw(10) << "Executed"
0867 << " " << std::right << std::setw(10) << "Passed"
0868 << " " << std::right << std::setw(10) << "Failed"
0869 << " " << std::right << std::setw(10) << "Error"
0870 << " "
0871 << "Name";
0872 logForEach(tr.endPathSummaries, [](auto& logger, auto const& p) {
0873 logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << p.bitPosition << " "
0874 << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0875 << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0876 << p.timesExcept << " " << p.name;
0877 });
0878
0879 for (auto const& p : tr.trigPathSummaries) {
0880 LogFwkVerbatim("FwkSummary") << "\n"
0881 << "TrigReport "
0882 << "---------- Modules in Path: " << p.name << " ------------\n"
0883 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0884 << " " << std::right << std::setw(10) << "Visited"
0885 << " " << std::right << std::setw(10) << "Passed"
0886 << " " << std::right << std::setw(10) << "Failed"
0887 << " " << std::right << std::setw(10) << "Error"
0888 << " "
0889 << "Name";
0890
0891 logForEach(p.moduleInPathSummaries, [](auto& logger, auto const& mod) {
0892 logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << mod.bitPosition
0893 << " " << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0894 << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0895 << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0896 });
0897 }
0898
0899 for (auto const& p : tr.endPathSummaries) {
0900 LogFwkVerbatim("FwkSummary") << "\n"
0901 << "TrigReport "
0902 << "------ Modules in End-Path: " << p.name << " ------------\n"
0903 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0904 << " " << std::right << std::setw(10) << "Visited"
0905 << " " << std::right << std::setw(10) << "Passed"
0906 << " " << std::right << std::setw(10) << "Failed"
0907 << " " << std::right << std::setw(10) << "Error"
0908 << " "
0909 << "Name";
0910
0911 unsigned int bitpos = 0;
0912 logForEach(p.moduleInPathSummaries, [&bitpos](auto& logger, auto const& mod) {
0913 logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << bitpos << " "
0914 << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0915 << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0916 << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0917 ++bitpos;
0918 });
0919 }
0920
0921 LogFwkVerbatim("FwkSummary") << "\n"
0922 << "TrigReport "
0923 << "---------- Module Summary ------------\n"
0924 << "TrigReport " << std::right << std::setw(10) << "Visited"
0925 << " " << std::right << std::setw(10) << "Executed"
0926 << " " << std::right << std::setw(10) << "Passed"
0927 << " " << std::right << std::setw(10) << "Failed"
0928 << " " << std::right << std::setw(10) << "Error"
0929 << " "
0930 << "Name";
0931
0932 logForEach(tr.workerSummaries, [](auto& logger, auto const& worker) {
0933 logger << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " " << std::right
0934 << std::setw(10) << worker.timesRun << " " << std::right << std::setw(10) << worker.timesPassed << " "
0935 << std::right << std::setw(10) << worker.timesFailed << " " << std::right << std::setw(10)
0936 << worker.timesExcept << " " << worker.moduleLabel;
0937 });
0938 LogFwkVerbatim("FwkSummary") << "";
0939 }
0940
0941 TriggerTimingReport tr;
0942 getTriggerTimingReport(tr);
0943
0944 const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0945
0946 LogFwkVerbatim("FwkSummary") << "TimeReport "
0947 << "---------- Event Summary ---[sec]----";
0948 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0949 << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0950 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0951 << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0952 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0953 << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0954 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0955 << " efficiency CPU/Real/thread = "
0956 << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0957 preallocConfig_.numberOfThreads();
0958
0959 constexpr int kColumn1Size = 10;
0960 constexpr int kColumn2Size = 12;
0961 constexpr int kColumn3Size = 12;
0962 LogFwkVerbatim("FwkSummary") << "";
0963 LogFwkVerbatim("FwkSummary") << "TimeReport "
0964 << "---------- Path Summary ---[Real sec]----";
0965 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0966 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0967 << " Name";
0968 logForEach(tr.trigPathSummaries, [&](auto& logger, auto const& p) {
0969 const int timesRun = std::max(1, p.timesRun);
0970 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0971 << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0972 << " " << p.name;
0973 });
0974 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0975 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0976 << " Name";
0977
0978 LogFwkVerbatim("FwkSummary") << "\n"
0979 << "TimeReport "
0980 << "-------End-Path Summary ---[Real sec]----\n"
0981 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0982 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0983 << " Name";
0984 logForEach(tr.endPathSummaries, [&](auto& logger, auto const& p) {
0985 const int timesRun = std::max(1, p.timesRun);
0986
0987 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0988 << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0989 << " " << p.name;
0990 });
0991 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0992 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0993 << " Name";
0994
0995 for (auto const& p : tr.trigPathSummaries) {
0996 LogFwkVerbatim("FwkSummary") << "\n"
0997 << "TimeReport "
0998 << "---------- Modules in Path: " << p.name << " ---[Real sec]----\n"
0999 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1000 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1001 << " Name";
1002 logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1003 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1004 << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1005 << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
1006 });
1007 }
1008 if (not tr.trigPathSummaries.empty()) {
1009 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1010 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1011 << " Name";
1012 }
1013 for (auto const& p : tr.endPathSummaries) {
1014 LogFwkVerbatim("FwkSummary") << "\n"
1015 << "TimeReport "
1016 << "------ Modules in End-Path: " << p.name << " ---[Real sec]----\n"
1017 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1018 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1019 << " Name";
1020 logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1021 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1022 << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1023 << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
1024 });
1025 }
1026 if (not tr.endPathSummaries.empty()) {
1027 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1028 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1029 << " Name";
1030 }
1031 LogFwkVerbatim("FwkSummary") << "\n"
1032 << "TimeReport "
1033 << "---------- Module Summary ---[Real sec]----\n"
1034 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1035 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1036 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1037 << " Name";
1038 logForEach(tr.workerSummaries, [&](auto& logger, auto const& worker) {
1039 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1040 << worker.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1041 << worker.realTime / std::max(1, worker.timesRun) << " " << std::right << std::setw(kColumn3Size)
1042 << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel;
1043 });
1044 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1045 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1046 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1047 << " Name";
1048
1049 LogFwkVerbatim("FwkSummary") << "\nT---Report end!\n\n";
1050 }
1051
1052 void Schedule::closeOutputFiles() {
1053 using std::placeholders::_1;
1054 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
1055 for (auto& worker : allWorkers()) {
1056 worker->respondToCloseOutputFile();
1057 }
1058 }
1059
1060 void Schedule::openOutputFiles(FileBlock& fb) {
1061 using std::placeholders::_1;
1062 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1063 }
1064
1065 void Schedule::writeRunAsync(WaitingTaskHolder task,
1066 RunPrincipal const& rp,
1067 ProcessContext const* processContext,
1068 ActivityRegistry* activityRegistry,
1069 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1070 auto token = ServiceRegistry::instance().presentToken();
1071 GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
1072 LuminosityBlockID(rp.run(), 0),
1073 rp.index(),
1074 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1075 rp.endTime(),
1076 processContext);
1077
1078 using namespace edm::waiting_task;
1079 chain::first([&](auto nextTask) {
1080
1081 ServiceRegistry::Operate op(token);
1082
1083
1084 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1085 }
1086 for (auto& c : all_output_communicators_) {
1087 c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1088 }
1089 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1090
1091 ServiceRegistry::Operate op(token);
1092
1093 activityRegistry->postGlobalWriteRunSignal_(globalContext);
1094 })) |
1095 chain::runLast(task);
1096 }
1097
1098 void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
1099 ProcessBlockPrincipal const& pbp,
1100 ProcessContext const* processContext,
1101 ActivityRegistry* activityRegistry) {
1102 auto token = ServiceRegistry::instance().presentToken();
1103 GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
1104 LuminosityBlockID(),
1105 RunIndex::invalidRunIndex(),
1106 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1107 Timestamp::invalidTimestamp(),
1108 processContext);
1109
1110 using namespace edm::waiting_task;
1111 chain::first([&](auto nextTask) {
1112
1113 ServiceRegistry::Operate op(token);
1114 CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1115 }
1116 for (auto& c : all_output_communicators_) {
1117 c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1118 }
1119 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1120
1121 ServiceRegistry::Operate op(token);
1122
1123 activityRegistry->postWriteProcessBlockSignal_(globalContext);
1124 })) |
1125 chain::runLast(std::move(task));
1126 }
1127
1128 void Schedule::writeLumiAsync(WaitingTaskHolder task,
1129 LuminosityBlockPrincipal const& lbp,
1130 ProcessContext const* processContext,
1131 ActivityRegistry* activityRegistry) {
1132 auto token = ServiceRegistry::instance().presentToken();
1133 GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1134 lbp.id(),
1135 lbp.runPrincipal().index(),
1136 lbp.index(),
1137 lbp.beginTime(),
1138 processContext);
1139
1140 using namespace edm::waiting_task;
1141 chain::first([&](auto nextTask) {
1142 ServiceRegistry::Operate op(token);
1143 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1144 }
1145 for (auto& c : all_output_communicators_) {
1146 c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1147 }
1148 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1149
1150 ServiceRegistry::Operate op(token);
1151
1152 activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1153 })) |
1154 chain::runLast(task);
1155 }
1156
1157 bool Schedule::shouldWeCloseOutput() const {
1158 using std::placeholders::_1;
1159
1160 return (std::find_if(all_output_communicators_.begin(),
1161 all_output_communicators_.end(),
1162 std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1163 all_output_communicators_.end());
1164 }
1165
1166 void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1167 using std::placeholders::_1;
1168 for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1169 }
1170
1171 void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1172 using std::placeholders::_1;
1173 for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1174 }
1175
1176 void Schedule::beginJob(ProductRegistry const& iRegistry,
1177 eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
1178 ProcessBlockHelperBase const& processBlockHelperBase,
1179 ProcessContext const& processContext) {
1180 globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase, processContext);
1181 }
1182
1183 void Schedule::beginStream(unsigned int streamID) {
1184 assert(streamID < streamSchedules_.size());
1185 streamSchedules_[streamID]->beginStream();
1186 }
1187
1188 void Schedule::endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept {
1189 assert(streamID < streamSchedules_.size());
1190 streamSchedules_[streamID]->endStream(collector, collectorMutex);
1191 }
1192
1193 void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1194 unsigned int iStreamID,
1195 EventTransitionInfo& info,
1196 ServiceToken const& token) {
1197 assert(iStreamID < streamSchedules_.size());
1198 streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1199 }
1200
1201 bool Schedule::changeModule(std::string const& iLabel,
1202 ParameterSet const& iPSet,
1203 const SignallingProductRegistryFiller& iRegistry,
1204 eventsetup::ESRecordsToProductResolverIndices const& iIndices) {
1205 Worker* found = nullptr;
1206 for (auto const& worker : allWorkers()) {
1207 if (worker->description()->moduleLabel() == iLabel) {
1208 found = worker;
1209 break;
1210 }
1211 }
1212 if (nullptr == found) {
1213 return false;
1214 }
1215
1216 auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1217
1218 globalSchedule_->replaceModule(newMod, iLabel);
1219
1220 for (auto& s : streamSchedules_) {
1221 s->replaceModule(newMod, iLabel);
1222 }
1223
1224 {
1225
1226 auto const processBlockLookup = iRegistry.registry().productLookup(InProcess);
1227 auto const runLookup = iRegistry.registry().productLookup(InRun);
1228 auto const lumiLookup = iRegistry.registry().productLookup(InLumi);
1229 auto const eventLookup = iRegistry.registry().productLookup(InEvent);
1230 found->updateLookup(InProcess, *runLookup);
1231 found->updateLookup(InRun, *runLookup);
1232 found->updateLookup(InLumi, *lumiLookup);
1233 found->updateLookup(InEvent, *eventLookup);
1234 found->updateLookup(iIndices);
1235
1236 auto const& processName = newMod->moduleDescription().processName();
1237 auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1238 auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1239 auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1240 auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1241 found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1242 found->resolvePutIndicies(InRun, runModuleToIndicies);
1243 found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1244 found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1245 }
1246
1247 return true;
1248 }
1249
1250 void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1251 globalSchedule_->deleteModule(iLabel);
1252 for (auto& stream : streamSchedules_) {
1253 stream->deleteModule(iLabel);
1254 }
1255 moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1256 }
1257
1258 void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1259 std::multimap<std::string, std::string> const& referencesToBranches,
1260 std::vector<std::string> const& modulesToSkip,
1261 edm::ProductRegistry const& preg) {
1262 for (auto& stream : streamSchedules_) {
1263 stream->initializeEarlyDelete(
1264 *moduleRegistry(), branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1265 }
1266 }
1267
1268 std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1269 std::vector<ModuleDescription const*> result;
1270 result.reserve(allWorkers().size());
1271
1272 for (auto const& worker : allWorkers()) {
1273 ModuleDescription const* p = worker->description();
1274 result.push_back(p);
1275 }
1276 return result;
1277 }
1278
1279 Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1280
1281 void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1282 for (auto const& worker : allWorkers()) {
1283 worker->convertCurrentProcessAlias(processName);
1284 }
1285 }
1286
1287 void Schedule::releaseMemoryPostLookupSignal() { globalSchedule_->releaseMemoryPostLookupSignal(); }
1288
1289 void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1290 streamSchedules_[0]->availablePaths(oLabelsToFill);
1291 }
1292
1293 void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1294
1295 void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1296
1297 void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1298 streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1299 }
1300
1301 void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1302 std::vector<ModuleDescription const*>& descriptions,
1303 unsigned int hint) const {
1304 streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1305 }
1306
1307 void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1308 std::vector<ModuleDescription const*>& descriptions,
1309 unsigned int hint) const {
1310 streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1311 }
1312
1313 void Schedule::getTriggerReport(TriggerReport& rep) const {
1314 rep.eventSummary.totalEvents = 0;
1315 rep.eventSummary.totalEventsPassed = 0;
1316 rep.eventSummary.totalEventsFailed = 0;
1317 for (auto& s : streamSchedules_) {
1318 s->getTriggerReport(rep);
1319 }
1320 sort_all(rep.workerSummaries);
1321 }
1322
1323 void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1324 rep.eventSummary.totalEvents = 0;
1325 rep.eventSummary.cpuTime = 0.;
1326 rep.eventSummary.realTime = 0.;
1327 summaryTimeKeeper_->fillTriggerTimingReport(rep);
1328 }
1329
1330 int Schedule::totalEvents() const {
1331 int returnValue = 0;
1332 for (auto& s : streamSchedules_) {
1333 returnValue += s->totalEvents();
1334 }
1335 return returnValue;
1336 }
1337
1338 int Schedule::totalEventsPassed() const {
1339 int returnValue = 0;
1340 for (auto& s : streamSchedules_) {
1341 returnValue += s->totalEventsPassed();
1342 }
1343 return returnValue;
1344 }
1345
1346 int Schedule::totalEventsFailed() const {
1347 int returnValue = 0;
1348 for (auto& s : streamSchedules_) {
1349 returnValue += s->totalEventsFailed();
1350 }
1351 return returnValue;
1352 }
1353
1354 void Schedule::clearCounters() {
1355 for (auto& s : streamSchedules_) {
1356 s->clearCounters();
1357 }
1358 }
1359 }