File indexing completed on 2022-06-03 00:59:03
0001 #include "FWCore/Framework/interface/Schedule.h"
0002
0003 #include "DataFormats/Common/interface/setIsMergeable.h"
0004 #include "DataFormats/Common/interface/TriggerResults.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0008 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0011 #include "FWCore/Framework/interface/EDConsumerBase.h"
0012 #include "FWCore/Framework/src/OutputModuleDescription.h"
0013 #include "FWCore/Framework/interface/SubProcess.h"
0014 #include "FWCore/Framework/interface/TriggerNamesService.h"
0015 #include "FWCore/Framework/src/TriggerReport.h"
0016 #include "FWCore/Framework/src/TriggerTimingReport.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/src/Factory.h"
0019 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0020 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0021 #include "FWCore/Framework/interface/ModuleRegistry.h"
0022 #include "FWCore/Framework/src/TriggerResultInserter.h"
0023 #include "FWCore/Framework/src/PathStatusInserter.h"
0024 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0025 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0026 #include "FWCore/Concurrency/interface/chain_first.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0031 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0032 #include "FWCore/Utilities/interface/Algorithms.h"
0033 #include "FWCore/Utilities/interface/ConvertException.h"
0034 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0035 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0036 #include "FWCore/Utilities/interface/TypeID.h"
0037 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0038
0039 #include <array>
0040 #include <algorithm>
0041 #include <cassert>
0042 #include <cstdlib>
0043 #include <functional>
0044 #include <iomanip>
0045 #include <list>
0046 #include <map>
0047 #include <set>
0048 #include <exception>
0049 #include <sstream>
0050
0051 #include "make_shared_noexcept_false.h"
0052 #include "processEDAliases.h"
0053
0054 namespace edm {
0055
0056 class Maker;
0057
0058 namespace {
0059 using std::placeholders::_1;
0060
0061 bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0062 return std::binary_search(v.begin(), v.end(), s);
0063 }
0064
0065
0066
0067
0068 std::shared_ptr<TriggerResultInserter> makeInserter(ParameterSet& proc_pset,
0069 PreallocationConfiguration const& iPrealloc,
0070 ProductRegistry& preg,
0071 ExceptionToActionTable const& actions,
0072 std::shared_ptr<ActivityRegistry> areg,
0073 std::shared_ptr<ProcessConfiguration> processConfiguration) {
0074 ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
0075 trig_pset->registerIt();
0076
0077 WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
0078 ModuleDescription md(trig_pset->id(),
0079 "TriggerResultInserter",
0080 "TriggerResults",
0081 processConfiguration.get(),
0082 ModuleDescription::getUniqueID());
0083
0084 areg->preModuleConstructionSignal_(md);
0085 bool postCalled = false;
0086 std::shared_ptr<TriggerResultInserter> returnValue;
0087
0088 CMS_SA_ALLOW try {
0089 maker::ModuleHolderT<TriggerResultInserter> holder(
0090 make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),
0091 static_cast<Maker const*>(nullptr));
0092 holder.setModuleDescription(md);
0093 holder.registerProductsAndCallbacks(&preg);
0094 returnValue = holder.module();
0095 postCalled = true;
0096
0097 areg->postModuleConstructionSignal_(md);
0098 } catch (...) {
0099 if (!postCalled) {
0100 CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0101
0102 }
0103 }
0104 throw;
0105 }
0106 return returnValue;
0107 }
0108
0109 template <typename T>
0110 void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
0111 std::vector<std::string> const& pathNames,
0112 PreallocationConfiguration const& iPrealloc,
0113 ProductRegistry& preg,
0114 std::shared_ptr<ActivityRegistry> areg,
0115 std::shared_ptr<ProcessConfiguration> processConfiguration,
0116 std::string const& moduleTypeName) {
0117 ParameterSet pset;
0118 pset.addParameter<std::string>("@module_type", moduleTypeName);
0119 pset.addParameter<std::string>("@module_edm_type", "EDProducer");
0120 pset.registerIt();
0121
0122 pathStatusInserters.reserve(pathNames.size());
0123
0124 for (auto const& pathName : pathNames) {
0125 ModuleDescription md(
0126 pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
0127
0128 areg->preModuleConstructionSignal_(md);
0129 bool postCalled = false;
0130
0131 CMS_SA_ALLOW try {
0132 maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
0133 static_cast<Maker const*>(nullptr));
0134 holder.setModuleDescription(md);
0135 holder.registerProductsAndCallbacks(&preg);
0136 pathStatusInserters.emplace_back(holder.module());
0137 postCalled = true;
0138
0139 areg->postModuleConstructionSignal_(md);
0140 } catch (...) {
0141 if (!postCalled) {
0142 CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0143
0144 }
0145 }
0146 throw;
0147 }
0148 }
0149 }
0150
0151 typedef std::vector<std::string> vstring;
0152
0153 void processSwitchProducers(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
0154
0155 struct BranchesCases {
0156 BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0157 std::vector<BranchKey> chosenBranches;
0158 std::vector<std::string> caseLabels;
0159 };
0160 std::map<std::string, BranchesCases> switchMap;
0161 for (auto& prod : preg.productListUpdator()) {
0162 if (prod.second.isSwitchAlias()) {
0163 auto it = switchMap.find(prod.second.moduleLabel());
0164 if (it == switchMap.end()) {
0165 auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0166 auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0167 switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0168 assert(inserted.second);
0169 it = inserted.first;
0170 }
0171
0172 bool found = false;
0173 for (auto const& productIter : preg.productList()) {
0174 BranchKey const& branchKey = productIter.first;
0175
0176
0177
0178
0179 if (branchKey.processName() != processName) {
0180 continue;
0181 }
0182
0183 BranchDescription const& desc = productIter.second;
0184 if (desc.branchType() == prod.second.branchType() and
0185 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0186 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0187 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0188 prod.second.setSwitchAliasForBranch(desc);
0189 it->second.chosenBranches.push_back(prod.first);
0190 found = true;
0191 }
0192 }
0193 if (not found) {
0194 Exception ex(errors::LogicError);
0195 ex << "Trying to find a BranchDescription to be aliased-for by SwitchProducer with\n"
0196 << " friendly class name = " << prod.second.friendlyClassName() << "\n"
0197 << " module label = " << prod.second.moduleLabel() << "\n"
0198 << " product instance name = " << prod.second.productInstanceName() << "\n"
0199 << " process name = " << processName
0200 << "\n\nbut did not find any. Please contact a framework developer.";
0201 ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0202 throw ex;
0203 }
0204 }
0205 }
0206 if (switchMap.empty())
0207 return;
0208
0209 for (auto& elem : switchMap) {
0210 std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0211 }
0212
0213 auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0214 std::map<std::string, std::vector<BranchKey>> caseBranches;
0215 for (auto const& item : preg.productList()) {
0216 if (item.first.processName() != processName)
0217 continue;
0218
0219 if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0220 found != caseLabels.end()) {
0221 caseBranches[*found].push_back(item.first);
0222 }
0223 }
0224
0225 for (auto const& caseLabel : caseLabels) {
0226 ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0227 auto& branches = caseBranches[caseLabel];
0228 std::sort(branches.begin(), branches.end());
0229 for (auto const& branch : branches) {
0230 ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0231 }
0232 }
0233 };
0234
0235
0236
0237 std::vector<bool> foundBranches;
0238 for (auto const& switchItem : switchMap) {
0239 auto const& switchLabel = switchItem.first;
0240 auto const& chosenBranches = switchItem.second.chosenBranches;
0241 auto const& caseLabels = switchItem.second.caseLabels;
0242 foundBranches.resize(chosenBranches.size());
0243 for (auto const& caseLabel : caseLabels) {
0244 std::fill(foundBranches.begin(), foundBranches.end(), false);
0245 for (auto& nonConstItem : preg.productListUpdator()) {
0246 auto const& item = nonConstItem;
0247 if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0248
0249
0250
0251
0252
0253
0254
0255
0256 nonConstItem.second.setTransient(true);
0257
0258 auto range = std::equal_range(chosenBranches.begin(),
0259 chosenBranches.end(),
0260 BranchKey(item.first.friendlyClassName(),
0261 switchLabel,
0262 item.first.productInstanceName(),
0263 item.first.processName()));
0264 if (range.first == range.second) {
0265 Exception ex(errors::Configuration);
0266 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0267 << item.first << " that is not produced by the chosen case "
0268 << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0269 .getUntrackedParameter<std::string>("@chosen_case")
0270 << ". If the intention is to produce only a subset of the products listed below, each case with "
0271 "more products needs to be replaced with an EDAlias to only the necessary products, and the "
0272 "EDProducer itself needs to be moved to a Task.\n\n";
0273 addProductsToException(caseLabels, ex);
0274 throw ex;
0275 }
0276 assert(std::distance(range.first, range.second) == 1);
0277 foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0278
0279
0280 auto const& bd = item.second;
0281 if (not bd.branchAliases().empty()) {
0282 auto ex = Exception(errors::UnimplementedFeature)
0283 << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0284 "aliases for SwitchProducer with label "
0285 << switchLabel << " for case " << caseLabel << ":";
0286 for (auto const& branchAlias : bd.branchAliases()) {
0287 ex << " " << branchAlias;
0288 }
0289 throw ex;
0290 }
0291 }
0292 }
0293
0294 for (size_t i = 0; i < chosenBranches.size(); i++) {
0295 if (not foundBranches[i]) {
0296 auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0297 .getUntrackedParameter<std::string>("@chosen_case");
0298 Exception ex(errors::Configuration);
0299 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0300 << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0301 << chosenLabel
0302 << ". If the intention is to produce only a subset of the products listed below, each case with more "
0303 "products needs to be replaced with an EDAlias to only the necessary products, and the "
0304 "EDProducer itself needs to be moved to a Task.\n\n";
0305 addProductsToException(caseLabels, ex);
0306 throw ex;
0307 }
0308 }
0309 }
0310 }
0311 }
0312
0313 void reduceParameterSet(ParameterSet& proc_pset,
0314 vstring const& end_path_name_list,
0315 vstring& modulesInConfig,
0316 std::set<std::string> const& usedModuleLabels,
0317 std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0318
0319
0320
0321
0322
0323
0324
0325
0326
0327
0328
0329 vstring outputModuleLabels;
0330 std::string edmType;
0331 std::string const moduleEdmType("@module_edm_type");
0332 std::string const outputModule("OutputModule");
0333 std::string const edAnalyzer("EDAnalyzer");
0334 std::string const edFilter("EDFilter");
0335 std::string const edProducer("EDProducer");
0336
0337 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0338
0339
0340
0341 vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0342 std::set<std::string> modulesOnPaths;
0343 {
0344 std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0345 for (auto const& endPath : end_path_name_list) {
0346 noEndPaths.erase(endPath);
0347 }
0348 {
0349 vstring labels;
0350 for (auto const& path : noEndPaths) {
0351 labels = proc_pset.getParameter<vstring>(path);
0352 modulesOnPaths.insert(labels.begin(), labels.end());
0353 }
0354 }
0355 }
0356
0357
0358 std::vector<std::string> labelsToBeDropped;
0359 labelsToBeDropped.reserve(modulesInConfigSet.size());
0360 std::set_difference(modulesInConfigSet.begin(),
0361 modulesInConfigSet.end(),
0362 usedModuleLabels.begin(),
0363 usedModuleLabels.end(),
0364 std::back_inserter(labelsToBeDropped));
0365
0366 const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0367 for (auto const& modLabel : usedModuleLabels) {
0368
0369
0370 if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0371 edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0372 if (edmType == outputModule) {
0373 outputModuleLabels.push_back(modLabel);
0374 labelsToBeDropped.push_back(modLabel);
0375 }
0376 if (edmType == edAnalyzer) {
0377 if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0378 labelsToBeDropped.push_back(modLabel);
0379 }
0380 }
0381 }
0382 }
0383
0384 std::inplace_merge(
0385 labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0386
0387
0388 for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0389
0390
0391 vstring::iterator endAfterRemove =
0392 std::remove_if(modulesInConfig.begin(),
0393 modulesInConfig.end(),
0394 std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0395 modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0396 proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0397
0398
0399 vstring endPathsToBeDropped;
0400 vstring labels;
0401 for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0402 iEndPath != endEndPath;
0403 ++iEndPath) {
0404 labels = proc_pset.getParameter<vstring>(*iEndPath);
0405 vstring::iterator iSave = labels.begin();
0406 vstring::iterator iBegin = labels.begin();
0407
0408 for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0409 if (binary_search_string(labelsToBeDropped, *iLabel)) {
0410 if (binary_search_string(outputModuleLabels, *iLabel)) {
0411 outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0412 }
0413 } else {
0414 if (iSave != iLabel) {
0415 iSave->swap(*iLabel);
0416 }
0417 ++iSave;
0418 }
0419 }
0420 labels.erase(iSave, labels.end());
0421 if (labels.empty()) {
0422
0423 proc_pset.eraseSimpleParameter(*iEndPath);
0424 endPathsToBeDropped.push_back(*iEndPath);
0425 } else {
0426 proc_pset.addParameter<vstring>(*iEndPath, labels);
0427 }
0428 }
0429 sort_all(endPathsToBeDropped);
0430
0431
0432 endAfterRemove = std::remove_if(scheduledPaths.begin(),
0433 scheduledPaths.end(),
0434 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0435 scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0436 proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0437
0438
0439 vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0440 endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0441 scheduledEndPaths.end(),
0442 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0443 scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0444 proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0445 }
0446
0447 class RngEDConsumer : public EDConsumerBase {
0448 public:
0449 explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
0450 Service<RandomNumberGenerator> rng;
0451 if (rng.isAvailable()) {
0452 rng->consumes(consumesCollector());
0453 for (auto const& consumesInfo : this->consumesInfo()) {
0454 typesConsumed.emplace(consumesInfo.type());
0455 }
0456 }
0457 }
0458 };
0459
0460 template <typename F>
0461 auto doCleanup(F&& iF) {
0462 auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0463 CMS_SA_ALLOW try { f(); } catch (...) {
0464 }
0465 if (iPtr) {
0466 iTask.doneWaiting(*iPtr);
0467 }
0468 };
0469 return wrapped;
0470 }
0471 }
0472
0473
0474 typedef std::vector<std::string> vstring;
0475
0476
0477
0478 Schedule::Schedule(ParameterSet& proc_pset,
0479 service::TriggerNamesService const& tns,
0480 ProductRegistry& preg,
0481 BranchIDListHelper& branchIDListHelper,
0482 ProcessBlockHelperBase& processBlockHelper,
0483 ThinnedAssociationsHelper& thinnedAssociationsHelper,
0484 SubProcessParentageHelper const* subProcessParentageHelper,
0485 ExceptionToActionTable const& actions,
0486 std::shared_ptr<ActivityRegistry> areg,
0487 std::shared_ptr<ProcessConfiguration> processConfiguration,
0488 bool hasSubprocesses,
0489 PreallocationConfiguration const& prealloc,
0490 ProcessContext const* processContext)
0491 :
0492 resultsInserter_{tns.getTrigPaths().empty()
0493 ? std::shared_ptr<TriggerResultInserter>{}
0494 : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
0495 moduleRegistry_(new ModuleRegistry()),
0496 all_output_communicators_(),
0497 preallocConfig_(prealloc),
0498 pathNames_(&tns.getTrigPaths()),
0499 endPathNames_(&tns.getEndPaths()),
0500 wantSummary_(tns.wantSummary()) {
0501 makePathStatusInserters(pathStatusInserters_,
0502 *pathNames_,
0503 prealloc,
0504 preg,
0505 areg,
0506 processConfiguration,
0507 std::string("PathStatusInserter"));
0508
0509 makePathStatusInserters(endPathStatusInserters_,
0510 *endPathNames_,
0511 prealloc,
0512 preg,
0513 areg,
0514 processConfiguration,
0515 std::string("EndPathStatusInserter"));
0516
0517 assert(0 < prealloc.numberOfStreams());
0518 streamSchedules_.reserve(prealloc.numberOfStreams());
0519 for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0520 streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
0521 pathStatusInserters_,
0522 endPathStatusInserters_,
0523 moduleRegistry(),
0524 proc_pset,
0525 tns,
0526 prealloc,
0527 preg,
0528 branchIDListHelper,
0529 actions,
0530 areg,
0531 processConfiguration,
0532 StreamID{i},
0533 processContext));
0534 }
0535
0536
0537
0538 const std::string kTriggerResults("TriggerResults");
0539 std::vector<std::string> modulesToUse;
0540 modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
0541 for (auto const& worker : streamSchedules_[0]->allWorkers()) {
0542 if (worker->description()->moduleLabel() != kTriggerResults) {
0543 modulesToUse.push_back(worker->description()->moduleLabel());
0544 }
0545 }
0546
0547 unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
0548 if (nUnscheduledModules > 0) {
0549 std::vector<std::string> temp;
0550 temp.reserve(modulesToUse.size());
0551 auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
0552 std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
0553 std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
0554 temp.swap(modulesToUse);
0555 }
0556
0557
0558 globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0559 pathStatusInserters_,
0560 endPathStatusInserters_,
0561 moduleRegistry(),
0562 modulesToUse,
0563 proc_pset,
0564 preg,
0565 prealloc,
0566 actions,
0567 areg,
0568 processConfiguration,
0569 processContext);
0570
0571
0572
0573 std::set<std::string> usedModuleLabels;
0574 for (auto const& worker : allWorkers()) {
0575 if (worker->description()->moduleLabel() != kTriggerResults) {
0576 usedModuleLabels.insert(worker->description()->moduleLabel());
0577 }
0578 }
0579 std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0580 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0581 reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0582 {
0583 std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0584 detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0585 }
0586
0587
0588
0589 if (nUnscheduledModules > 0) {
0590 std::set<std::string> unscheduledModules(modulesToUse.begin(), modulesToUse.begin() + nUnscheduledModules);
0591 preg.setUnscheduledProducts(unscheduledModules);
0592 }
0593
0594 processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0595 proc_pset.registerIt();
0596 processConfiguration->setParameterSetID(proc_pset.id());
0597 processConfiguration->setProcessConfigurationID();
0598
0599
0600
0601 size_t all_workers_count = allWorkers().size();
0602
0603 moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0604 auto comm = iHolder->createOutputModuleCommunicator();
0605 if (comm) {
0606 all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0607 }
0608 });
0609
0610 limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
0611
0612
0613
0614 assert(all_workers_count == allWorkers().size());
0615
0616 branchIDListHelper.updateFromRegistry(preg);
0617
0618 for (auto const& worker : streamSchedules_[0]->allWorkers()) {
0619 worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
0620 }
0621
0622 processBlockHelper.updateForNewProcess(preg, processConfiguration->processName());
0623
0624
0625
0626 for (auto& c : all_output_communicators_) {
0627 c->selectProducts(preg, thinnedAssociationsHelper, processBlockHelper);
0628 }
0629
0630 for (auto& product : preg.productListUpdator()) {
0631 setIsMergeable(product.second);
0632 }
0633
0634 {
0635
0636 std::set<TypeID> productTypesConsumed;
0637 std::set<TypeID> elementTypesConsumed;
0638
0639 for (auto const& worker : allWorkers()) {
0640 for (auto const& consumesInfo : worker->consumesInfo()) {
0641 if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0642 productTypesConsumed.emplace(consumesInfo.type());
0643 } else {
0644 elementTypesConsumed.emplace(consumesInfo.type());
0645 }
0646 }
0647 }
0648
0649 if (hasSubprocesses) {
0650 productTypesConsumed.emplace(typeid(TriggerResults));
0651 }
0652
0653 { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
0654 preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0655 }
0656
0657 for (auto& c : all_output_communicators_) {
0658 c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
0659 }
0660
0661 if (wantSummary_) {
0662 std::vector<const ModuleDescription*> modDesc;
0663 const auto& workers = allWorkers();
0664 modDesc.reserve(workers.size());
0665
0666 std::transform(workers.begin(),
0667 workers.end(),
0668 std::back_inserter(modDesc),
0669 [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0670
0671
0672 summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0673 auto timeKeeperPtr = summaryTimeKeeper_.get();
0674
0675 areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0676
0677 areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0678 areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0679 areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0680 areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0681 areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0682 areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0683
0684 areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0685 areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0686
0687 areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0688 areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0689
0690 areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0691 areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0692
0693
0694
0695 }
0696
0697 }
0698
0699 void Schedule::limitOutput(ParameterSet const& proc_pset,
0700 BranchIDLists const& branchIDLists,
0701 SubProcessParentageHelper const* subProcessParentageHelper) {
0702 std::string const output("output");
0703
0704 ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0705 int maxEventSpecs = 0;
0706 int maxEventsOut = -1;
0707 ParameterSet const* vMaxEventsOut = nullptr;
0708 std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0709 if (search_all(intNamesE, output)) {
0710 maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0711 ++maxEventSpecs;
0712 }
0713 std::vector<std::string> psetNamesE;
0714 maxEventsPSet.getParameterSetNames(psetNamesE, false);
0715 if (search_all(psetNamesE, output)) {
0716 vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0717 ++maxEventSpecs;
0718 }
0719
0720 if (maxEventSpecs > 1) {
0721 throw Exception(errors::Configuration)
0722 << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0723 }
0724
0725 for (auto& c : all_output_communicators_) {
0726 OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
0727 if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0728 std::string const& moduleLabel = c->description().moduleLabel();
0729 try {
0730 desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0731 } catch (Exception const&) {
0732 throw Exception(errors::Configuration)
0733 << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0734 }
0735 }
0736 c->configure(desc);
0737 }
0738 }
0739
0740 bool Schedule::terminate() const {
0741 if (all_output_communicators_.empty()) {
0742 return false;
0743 }
0744 for (auto& c : all_output_communicators_) {
0745 if (!c->limitReached()) {
0746
0747 return false;
0748 }
0749 }
0750 LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0751 << "has reached its configured limit.\n";
0752 return true;
0753 }
0754
0755 void Schedule::endJob(ExceptionCollector& collector) {
0756 globalSchedule_->endJob(collector);
0757 if (collector.hasThrown()) {
0758 return;
0759 }
0760
0761 if (wantSummary_ == false)
0762 return;
0763 {
0764 TriggerReport tr;
0765 getTriggerReport(tr);
0766
0767
0768
0769 LogFwkVerbatim("FwkSummary") << "";
0770 if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
0771 LogFwkVerbatim("FwkSummary") << "TrigReport Process: "
0772 << streamSchedules_[0]->context().processContext()->processName();
0773 }
0774 LogFwkVerbatim("FwkSummary") << "TrigReport "
0775 << "---------- Event Summary ------------";
0776 if (!tr.trigPathSummaries.empty()) {
0777 LogFwkVerbatim("FwkSummary") << "TrigReport"
0778 << " Events total = " << tr.eventSummary.totalEvents
0779 << " passed = " << tr.eventSummary.totalEventsPassed
0780 << " failed = " << tr.eventSummary.totalEventsFailed << "";
0781 } else {
0782 LogFwkVerbatim("FwkSummary") << "TrigReport"
0783 << " Events total = " << tr.eventSummary.totalEvents
0784 << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0785 }
0786
0787 LogFwkVerbatim("FwkSummary") << "";
0788 LogFwkVerbatim("FwkSummary") << "TrigReport "
0789 << "---------- Path Summary ------------";
0790 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0791 << " " << std::right << std::setw(10) << "Executed"
0792 << " " << std::right << std::setw(10) << "Passed"
0793 << " " << std::right << std::setw(10) << "Failed"
0794 << " " << std::right << std::setw(10) << "Error"
0795 << " "
0796 << "Name"
0797 << "";
0798 for (auto const& p : tr.trigPathSummaries) {
0799 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
0800 << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
0801 << std::right << std::setw(10) << p.timesPassed << " " << std::right
0802 << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0803 << p.timesExcept << " " << p.name << "";
0804 }
0805
0806
0807
0808
0809
0810
0811
0812
0813
0814
0815
0816
0817
0818
0819
0820
0821
0822
0823 LogFwkVerbatim("FwkSummary") << "";
0824 LogFwkVerbatim("FwkSummary") << "TrigReport "
0825 << "-------End-Path Summary ------------";
0826 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0827 << " " << std::right << std::setw(10) << "Executed"
0828 << " " << std::right << std::setw(10) << "Passed"
0829 << " " << std::right << std::setw(10) << "Failed"
0830 << " " << std::right << std::setw(10) << "Error"
0831 << " "
0832 << "Name"
0833 << "";
0834 for (auto const& p : tr.endPathSummaries) {
0835 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
0836 << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
0837 << std::right << std::setw(10) << p.timesPassed << " " << std::right
0838 << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0839 << p.timesExcept << " " << p.name << "";
0840 }
0841
0842 for (auto const& p : tr.trigPathSummaries) {
0843 LogFwkVerbatim("FwkSummary") << "";
0844 LogFwkVerbatim("FwkSummary") << "TrigReport "
0845 << "---------- Modules in Path: " << p.name << " ------------";
0846 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0847 << " " << std::right << std::setw(10) << "Visited"
0848 << " " << std::right << std::setw(10) << "Passed"
0849 << " " << std::right << std::setw(10) << "Failed"
0850 << " " << std::right << std::setw(10) << "Error"
0851 << " "
0852 << "Name"
0853 << "";
0854
0855 for (auto const& mod : p.moduleInPathSummaries) {
0856 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
0857 << mod.bitPosition << " " << std::right << std::setw(10) << mod.timesVisited
0858 << " " << std::right << std::setw(10) << mod.timesPassed << " " << std::right
0859 << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
0860 << mod.timesExcept << " " << mod.moduleLabel << "";
0861 }
0862 }
0863
0864 for (auto const& p : tr.endPathSummaries) {
0865 LogFwkVerbatim("FwkSummary") << "";
0866 LogFwkVerbatim("FwkSummary") << "TrigReport "
0867 << "------ Modules in End-Path: " << p.name << " ------------";
0868 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0869 << " " << std::right << std::setw(10) << "Visited"
0870 << " " << std::right << std::setw(10) << "Passed"
0871 << " " << std::right << std::setw(10) << "Failed"
0872 << " " << std::right << std::setw(10) << "Error"
0873 << " "
0874 << "Name"
0875 << "";
0876
0877 unsigned int bitpos = 0;
0878 for (auto const& mod : p.moduleInPathSummaries) {
0879 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
0880 << bitpos << " " << std::right << std::setw(10) << mod.timesVisited << " "
0881 << std::right << std::setw(10) << mod.timesPassed << " " << std::right
0882 << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
0883 << mod.timesExcept << " " << mod.moduleLabel << "";
0884 ++bitpos;
0885 }
0886 }
0887
0888 LogFwkVerbatim("FwkSummary") << "";
0889 LogFwkVerbatim("FwkSummary") << "TrigReport "
0890 << "---------- Module Summary ------------";
0891 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Visited"
0892 << " " << std::right << std::setw(10) << "Executed"
0893 << " " << std::right << std::setw(10) << "Passed"
0894 << " " << std::right << std::setw(10) << "Failed"
0895 << " " << std::right << std::setw(10) << "Error"
0896 << " "
0897 << "Name"
0898 << "";
0899 for (auto const& worker : tr.workerSummaries) {
0900 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " "
0901 << std::right << std::setw(10) << worker.timesRun << " " << std::right
0902 << std::setw(10) << worker.timesPassed << " " << std::right << std::setw(10)
0903 << worker.timesFailed << " " << std::right << std::setw(10) << worker.timesExcept
0904 << " " << worker.moduleLabel << "";
0905 }
0906 LogFwkVerbatim("FwkSummary") << "";
0907 }
0908
0909 TriggerTimingReport tr;
0910 getTriggerTimingReport(tr);
0911
0912 const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0913
0914 LogFwkVerbatim("FwkSummary") << "TimeReport "
0915 << "---------- Event Summary ---[sec]----";
0916 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0917 << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0918 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0919 << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0920 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0921 << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0922 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0923 << " efficiency CPU/Real/thread = "
0924 << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0925 preallocConfig_.numberOfThreads();
0926
0927 constexpr int kColumn1Size = 10;
0928 constexpr int kColumn2Size = 12;
0929 constexpr int kColumn3Size = 12;
0930 LogFwkVerbatim("FwkSummary") << "";
0931 LogFwkVerbatim("FwkSummary") << "TimeReport "
0932 << "---------- Path Summary ---[Real sec]----";
0933 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0934 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0935 << " Name";
0936 for (auto const& p : tr.trigPathSummaries) {
0937 const int timesRun = std::max(1, p.timesRun);
0938 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0939 << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
0940 << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
0941 }
0942 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0943 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0944 << " Name"
0945 << "";
0946
0947 LogFwkVerbatim("FwkSummary") << "";
0948 LogFwkVerbatim("FwkSummary") << "TimeReport "
0949 << "-------End-Path Summary ---[Real sec]----";
0950 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0951 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0952 << " Name"
0953 << "";
0954 for (auto const& p : tr.endPathSummaries) {
0955 const int timesRun = std::max(1, p.timesRun);
0956
0957 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0958 << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
0959 << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
0960 }
0961 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0962 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0963 << " Name"
0964 << "";
0965
0966 for (auto const& p : tr.trigPathSummaries) {
0967 LogFwkVerbatim("FwkSummary") << "";
0968 LogFwkVerbatim("FwkSummary") << "TimeReport "
0969 << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
0970 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0971 << " " << std::right << std::setw(kColumn2Size) << "per visit"
0972 << " Name"
0973 << "";
0974 for (auto const& mod : p.moduleInPathSummaries) {
0975 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0976 << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
0977 << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
0978 << mod.moduleLabel << "";
0979 }
0980 }
0981 if (not tr.trigPathSummaries.empty()) {
0982 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0983 << " " << std::right << std::setw(kColumn2Size) << "per visit"
0984 << " Name"
0985 << "";
0986 }
0987 for (auto const& p : tr.endPathSummaries) {
0988 LogFwkVerbatim("FwkSummary") << "";
0989 LogFwkVerbatim("FwkSummary") << "TimeReport "
0990 << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
0991 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0992 << " " << std::right << std::setw(kColumn2Size) << "per visit"
0993 << " Name"
0994 << "";
0995 for (auto const& mod : p.moduleInPathSummaries) {
0996 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0997 << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
0998 << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
0999 << mod.moduleLabel << "";
1000 }
1001 }
1002 if (not tr.endPathSummaries.empty()) {
1003 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1004 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1005 << " Name"
1006 << "";
1007 }
1008 LogFwkVerbatim("FwkSummary") << "";
1009 LogFwkVerbatim("FwkSummary") << "TimeReport "
1010 << "---------- Module Summary ---[Real sec]----";
1011 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1012 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1013 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1014 << " Name"
1015 << "";
1016 for (auto const& worker : tr.workerSummaries) {
1017 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1018 << std::setw(kColumn1Size) << worker.realTime / totalEvents << " " << std::right
1019 << std::setw(kColumn2Size) << worker.realTime / std::max(1, worker.timesRun) << " "
1020 << std::right << std::setw(kColumn3Size)
1021 << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel
1022 << "";
1023 }
1024 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1025 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1026 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1027 << " Name"
1028 << "";
1029
1030 LogFwkVerbatim("FwkSummary") << "";
1031 LogFwkVerbatim("FwkSummary") << "T---Report end!"
1032 << "";
1033 LogFwkVerbatim("FwkSummary") << "";
1034 }
1035
1036 void Schedule::closeOutputFiles() {
1037 using std::placeholders::_1;
1038 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
1039 for (auto& worker : allWorkers()) {
1040 worker->respondToCloseOutputFile();
1041 }
1042 }
1043
1044 void Schedule::openOutputFiles(FileBlock& fb) {
1045 using std::placeholders::_1;
1046 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1047 }
1048
1049 void Schedule::writeRunAsync(WaitingTaskHolder task,
1050 RunPrincipal const& rp,
1051 ProcessContext const* processContext,
1052 ActivityRegistry* activityRegistry,
1053 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1054 auto token = ServiceRegistry::instance().presentToken();
1055 GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
1056 LuminosityBlockID(rp.run(), 0),
1057 rp.index(),
1058 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1059 rp.endTime(),
1060 processContext);
1061
1062 using namespace edm::waiting_task;
1063 chain::first([&](auto nextTask) {
1064
1065 ServiceRegistry::Operate op(token);
1066
1067
1068 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1069 }
1070 for (auto& c : all_output_communicators_) {
1071 c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1072 }
1073 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1074
1075 ServiceRegistry::Operate op(token);
1076
1077 activityRegistry->postGlobalWriteRunSignal_(globalContext);
1078 })) |
1079 chain::runLast(task);
1080 }
1081
1082 void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
1083 ProcessBlockPrincipal const& pbp,
1084 ProcessContext const* processContext,
1085 ActivityRegistry* activityRegistry) {
1086 auto token = ServiceRegistry::instance().presentToken();
1087 GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
1088 LuminosityBlockID(),
1089 RunIndex::invalidRunIndex(),
1090 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1091 Timestamp::invalidTimestamp(),
1092 processContext);
1093
1094 using namespace edm::waiting_task;
1095 chain::first([&](auto nextTask) {
1096
1097 ServiceRegistry::Operate op(token);
1098 CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1099 }
1100 for (auto& c : all_output_communicators_) {
1101 c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1102 }
1103 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1104
1105 ServiceRegistry::Operate op(token);
1106
1107 activityRegistry->postWriteProcessBlockSignal_(globalContext);
1108 })) |
1109 chain::runLast(std::move(task));
1110 }
1111
1112 void Schedule::writeLumiAsync(WaitingTaskHolder task,
1113 LuminosityBlockPrincipal const& lbp,
1114 ProcessContext const* processContext,
1115 ActivityRegistry* activityRegistry) {
1116 auto token = ServiceRegistry::instance().presentToken();
1117 GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1118 lbp.id(),
1119 lbp.runPrincipal().index(),
1120 lbp.index(),
1121 lbp.beginTime(),
1122 processContext);
1123
1124 using namespace edm::waiting_task;
1125 chain::first([&](auto nextTask) {
1126 ServiceRegistry::Operate op(token);
1127 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1128 }
1129 for (auto& c : all_output_communicators_) {
1130 c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1131 }
1132 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1133
1134 ServiceRegistry::Operate op(token);
1135
1136 activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1137 })) |
1138 chain::runLast(task);
1139 }
1140
1141 bool Schedule::shouldWeCloseOutput() const {
1142 using std::placeholders::_1;
1143
1144 return (std::find_if(all_output_communicators_.begin(),
1145 all_output_communicators_.end(),
1146 std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1147 all_output_communicators_.end());
1148 }
1149
1150 void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1151 using std::placeholders::_1;
1152 for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1153 }
1154
1155 void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1156 using std::placeholders::_1;
1157 for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1158 }
1159
1160 void Schedule::beginJob(ProductRegistry const& iRegistry,
1161 eventsetup::ESRecordsToProxyIndices const& iESIndices,
1162 ProcessBlockHelperBase const& processBlockHelperBase) {
1163 globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase);
1164 }
1165
1166 void Schedule::beginStream(unsigned int iStreamID) {
1167 assert(iStreamID < streamSchedules_.size());
1168 streamSchedules_[iStreamID]->beginStream();
1169 }
1170
1171 void Schedule::endStream(unsigned int iStreamID) {
1172 assert(iStreamID < streamSchedules_.size());
1173 streamSchedules_[iStreamID]->endStream();
1174 }
1175
1176 void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1177 unsigned int iStreamID,
1178 EventTransitionInfo& info,
1179 ServiceToken const& token) {
1180 assert(iStreamID < streamSchedules_.size());
1181 streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1182 }
1183
1184 bool Schedule::changeModule(std::string const& iLabel,
1185 ParameterSet const& iPSet,
1186 const ProductRegistry& iRegistry,
1187 eventsetup::ESRecordsToProxyIndices const& iIndices) {
1188 Worker* found = nullptr;
1189 for (auto const& worker : allWorkers()) {
1190 if (worker->description()->moduleLabel() == iLabel) {
1191 found = worker;
1192 break;
1193 }
1194 }
1195 if (nullptr == found) {
1196 return false;
1197 }
1198
1199 auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1200
1201 globalSchedule_->replaceModule(newMod, iLabel);
1202
1203 for (auto& s : streamSchedules_) {
1204 s->replaceModule(newMod, iLabel);
1205 }
1206
1207 {
1208
1209 auto const processBlockLookup = iRegistry.productLookup(InProcess);
1210 auto const runLookup = iRegistry.productLookup(InRun);
1211 auto const lumiLookup = iRegistry.productLookup(InLumi);
1212 auto const eventLookup = iRegistry.productLookup(InEvent);
1213 found->updateLookup(InProcess, *runLookup);
1214 found->updateLookup(InRun, *runLookup);
1215 found->updateLookup(InLumi, *lumiLookup);
1216 found->updateLookup(InEvent, *eventLookup);
1217 found->updateLookup(iIndices);
1218
1219 auto const& processName = newMod->moduleDescription().processName();
1220 auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1221 auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1222 auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1223 auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1224 found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1225 found->resolvePutIndicies(InRun, runModuleToIndicies);
1226 found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1227 found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1228 }
1229
1230 return true;
1231 }
1232
1233 void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1234 globalSchedule_->deleteModule(iLabel);
1235 for (auto& stream : streamSchedules_) {
1236 stream->deleteModule(iLabel);
1237 }
1238 moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1239 }
1240
1241 void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1242 edm::ProductRegistry const& preg) {
1243 for (auto& stream : streamSchedules_) {
1244 stream->initializeEarlyDelete(*moduleRegistry(), branchesToDeleteEarly, preg);
1245 }
1246 }
1247
1248 std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1249 std::vector<ModuleDescription const*> result;
1250 result.reserve(allWorkers().size());
1251
1252 for (auto const& worker : allWorkers()) {
1253 ModuleDescription const* p = worker->description();
1254 result.push_back(p);
1255 }
1256 return result;
1257 }
1258
1259 Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1260
1261 void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1262 for (auto const& worker : allWorkers()) {
1263 worker->convertCurrentProcessAlias(processName);
1264 }
1265 }
1266
1267 void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1268 streamSchedules_[0]->availablePaths(oLabelsToFill);
1269 }
1270
1271 void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1272
1273 void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1274
1275 void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1276 streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1277 }
1278
1279 void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1280 std::vector<ModuleDescription const*>& descriptions,
1281 unsigned int hint) const {
1282 streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1283 }
1284
1285 void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1286 std::vector<ModuleDescription const*>& descriptions,
1287 unsigned int hint) const {
1288 streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1289 }
1290
1291 void Schedule::fillModuleAndConsumesInfo(
1292 std::vector<ModuleDescription const*>& allModuleDescriptions,
1293 std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
1294 std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>& modulesWhoseProductsAreConsumedBy,
1295 std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
1296 ProductRegistry const& preg) const {
1297 allModuleDescriptions.clear();
1298 moduleIDToIndex.clear();
1299 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1300 modulesWhoseProductsAreConsumedBy[iBranchType].clear();
1301 }
1302 modulesInPreviousProcessesWhoseProductsAreConsumedBy.clear();
1303
1304 allModuleDescriptions.reserve(allWorkers().size());
1305 moduleIDToIndex.reserve(allWorkers().size());
1306 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1307 modulesWhoseProductsAreConsumedBy[iBranchType].resize(allWorkers().size());
1308 }
1309 modulesInPreviousProcessesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1310
1311 std::map<std::string, ModuleDescription const*> labelToDesc;
1312 unsigned int i = 0;
1313 for (auto const& worker : allWorkers()) {
1314 ModuleDescription const* p = worker->description();
1315 allModuleDescriptions.push_back(p);
1316 moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1317 labelToDesc[p->moduleLabel()] = p;
1318 ++i;
1319 }
1320 sort_all(moduleIDToIndex);
1321
1322 i = 0;
1323 for (auto const& worker : allWorkers()) {
1324 std::array<std::vector<ModuleDescription const*>*, NumBranchTypes> modules;
1325 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1326 modules[iBranchType] = &modulesWhoseProductsAreConsumedBy[iBranchType].at(i);
1327 }
1328
1329 std::vector<ModuleProcessName>& modulesInPreviousProcesses =
1330 modulesInPreviousProcessesWhoseProductsAreConsumedBy.at(i);
1331 try {
1332 worker->modulesWhoseProductsAreConsumed(modules, modulesInPreviousProcesses, preg, labelToDesc);
1333 } catch (cms::Exception& ex) {
1334 ex.addContext("Calling Worker::modulesWhoseProductsAreConsumed() for module " +
1335 worker->description()->moduleLabel());
1336 throw;
1337 }
1338 ++i;
1339 }
1340 }
1341
1342 void Schedule::getTriggerReport(TriggerReport& rep) const {
1343 rep.eventSummary.totalEvents = 0;
1344 rep.eventSummary.totalEventsPassed = 0;
1345 rep.eventSummary.totalEventsFailed = 0;
1346 for (auto& s : streamSchedules_) {
1347 s->getTriggerReport(rep);
1348 }
1349 sort_all(rep.workerSummaries);
1350 }
1351
1352 void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1353 rep.eventSummary.totalEvents = 0;
1354 rep.eventSummary.cpuTime = 0.;
1355 rep.eventSummary.realTime = 0.;
1356 summaryTimeKeeper_->fillTriggerTimingReport(rep);
1357 }
1358
1359 int Schedule::totalEvents() const {
1360 int returnValue = 0;
1361 for (auto& s : streamSchedules_) {
1362 returnValue += s->totalEvents();
1363 }
1364 return returnValue;
1365 }
1366
1367 int Schedule::totalEventsPassed() const {
1368 int returnValue = 0;
1369 for (auto& s : streamSchedules_) {
1370 returnValue += s->totalEventsPassed();
1371 }
1372 return returnValue;
1373 }
1374
1375 int Schedule::totalEventsFailed() const {
1376 int returnValue = 0;
1377 for (auto& s : streamSchedules_) {
1378 returnValue += s->totalEventsFailed();
1379 }
1380 return returnValue;
1381 }
1382
1383 void Schedule::clearCounters() {
1384 for (auto& s : streamSchedules_) {
1385 s->clearCounters();
1386 }
1387 }
1388 }