File indexing completed on 2024-07-28 22:48:24
0001 #include "FWCore/Framework/interface/Schedule.h"
0002
0003 #include "DataFormats/Common/interface/setIsMergeable.h"
0004 #include "DataFormats/Common/interface/TriggerResults.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0008 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0011 #include "FWCore/Framework/interface/EDConsumerBase.h"
0012 #include "FWCore/Framework/src/OutputModuleDescription.h"
0013 #include "FWCore/Framework/interface/SubProcess.h"
0014 #include "FWCore/Framework/interface/TriggerNamesService.h"
0015 #include "FWCore/Framework/src/TriggerReport.h"
0016 #include "FWCore/Framework/src/TriggerTimingReport.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/src/Factory.h"
0019 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0020 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0021 #include "FWCore/Framework/interface/ModuleRegistry.h"
0022 #include "FWCore/Framework/src/TriggerResultInserter.h"
0023 #include "FWCore/Framework/src/PathStatusInserter.h"
0024 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0025 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0026 #include "FWCore/Concurrency/interface/chain_first.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0031 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0032 #include "FWCore/Utilities/interface/Algorithms.h"
0033 #include "FWCore/Utilities/interface/ConvertException.h"
0034 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0035 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0036 #include "FWCore/Utilities/interface/TypeID.h"
0037 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0038
0039 #include <array>
0040 #include <algorithm>
0041 #include <cassert>
0042 #include <cstdlib>
0043 #include <functional>
0044 #include <iomanip>
0045 #include <list>
0046 #include <map>
0047 #include <set>
0048 #include <exception>
0049 #include <sstream>
0050
0051 #include "make_shared_noexcept_false.h"
0052 #include "processEDAliases.h"
0053
0054 namespace edm {
0055
0056 class Maker;
0057
0058 namespace {
0059 using std::placeholders::_1;
0060
0061 bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0062 return std::binary_search(v.begin(), v.end(), s);
0063 }
0064
0065
0066
0067
0068 std::shared_ptr<TriggerResultInserter> makeInserter(
0069 ParameterSet& proc_pset,
0070 PreallocationConfiguration const& iPrealloc,
0071 ProductRegistry& preg,
0072 ExceptionToActionTable const& actions,
0073 std::shared_ptr<ActivityRegistry> areg,
0074 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0075 ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
0076 trig_pset->registerIt();
0077
0078 WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
0079 ModuleDescription md(trig_pset->id(),
0080 "TriggerResultInserter",
0081 "TriggerResults",
0082 processConfiguration.get(),
0083 ModuleDescription::getUniqueID());
0084
0085 areg->preModuleConstructionSignal_(md);
0086 bool postCalled = false;
0087 std::shared_ptr<TriggerResultInserter> returnValue;
0088
0089 CMS_SA_ALLOW try {
0090 maker::ModuleHolderT<TriggerResultInserter> holder(
0091 make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),
0092 static_cast<Maker const*>(nullptr));
0093 holder.setModuleDescription(md);
0094 holder.registerProductsAndCallbacks(&preg);
0095 returnValue = holder.module();
0096 postCalled = true;
0097
0098 areg->postModuleConstructionSignal_(md);
0099 } catch (...) {
0100 if (!postCalled) {
0101 CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0102
0103 }
0104 }
0105 throw;
0106 }
0107 return returnValue;
0108 }
0109
0110 template <typename T>
0111 void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
0112 std::vector<std::string> const& pathNames,
0113 PreallocationConfiguration const& iPrealloc,
0114 ProductRegistry& preg,
0115 std::shared_ptr<ActivityRegistry> areg,
0116 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0117 std::string const& moduleTypeName) {
0118 ParameterSet pset;
0119 pset.addParameter<std::string>("@module_type", moduleTypeName);
0120 pset.addParameter<std::string>("@module_edm_type", "EDProducer");
0121 pset.registerIt();
0122
0123 pathStatusInserters.reserve(pathNames.size());
0124
0125 for (auto const& pathName : pathNames) {
0126 ModuleDescription md(
0127 pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
0128
0129 areg->preModuleConstructionSignal_(md);
0130 bool postCalled = false;
0131
0132 CMS_SA_ALLOW try {
0133 maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
0134 static_cast<Maker const*>(nullptr));
0135 holder.setModuleDescription(md);
0136 holder.registerProductsAndCallbacks(&preg);
0137 pathStatusInserters.emplace_back(holder.module());
0138 postCalled = true;
0139
0140 areg->postModuleConstructionSignal_(md);
0141 } catch (...) {
0142 if (!postCalled) {
0143 CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0144
0145 }
0146 }
0147 throw;
0148 }
0149 }
0150 }
0151
0152 typedef std::vector<std::string> vstring;
0153
0154 void processSwitchProducers(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
0155
0156 struct BranchesCases {
0157 BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0158 std::vector<BranchKey> chosenBranches;
0159 std::vector<std::string> caseLabels;
0160 };
0161 std::map<std::string, BranchesCases> switchMap;
0162 for (auto& prod : preg.productListUpdator()) {
0163 if (prod.second.isSwitchAlias()) {
0164 auto it = switchMap.find(prod.second.moduleLabel());
0165 if (it == switchMap.end()) {
0166 auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0167 auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0168 switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0169 assert(inserted.second);
0170 it = inserted.first;
0171 }
0172
0173 bool found = false;
0174 for (auto const& productIter : preg.productList()) {
0175 BranchKey const& branchKey = productIter.first;
0176
0177
0178
0179
0180 if (branchKey.processName() != processName) {
0181 continue;
0182 }
0183
0184 BranchDescription const& desc = productIter.second;
0185 if (desc.branchType() == prod.second.branchType() and
0186 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0187 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0188 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0189 prod.second.setSwitchAliasForBranch(desc);
0190 if (!prod.second.transient()) {
0191 it->second.chosenBranches.push_back(prod.first);
0192 }
0193 found = true;
0194 }
0195 }
0196 if (not found) {
0197 Exception ex(errors::LogicError);
0198 ex << "Trying to find a BranchDescription to be aliased-for by SwitchProducer with\n"
0199 << " friendly class name = " << prod.second.friendlyClassName() << "\n"
0200 << " module label = " << prod.second.moduleLabel() << "\n"
0201 << " product instance name = " << prod.second.productInstanceName() << "\n"
0202 << " process name = " << processName
0203 << "\n\nbut did not find any. Please contact a framework developer.";
0204 ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0205 throw ex;
0206 }
0207 }
0208 }
0209 if (switchMap.empty())
0210 return;
0211
0212 for (auto& elem : switchMap) {
0213 std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0214 }
0215
0216 auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0217 std::map<std::string, std::vector<BranchKey>> caseBranches;
0218 for (auto const& item : preg.productList()) {
0219 if (item.first.processName() != processName)
0220 continue;
0221
0222 if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0223 found != caseLabels.end()) {
0224 caseBranches[*found].push_back(item.first);
0225 }
0226 }
0227
0228 for (auto const& caseLabel : caseLabels) {
0229 ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0230 auto& branches = caseBranches[caseLabel];
0231 std::sort(branches.begin(), branches.end());
0232 for (auto const& branch : branches) {
0233 ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0234 }
0235 }
0236 };
0237
0238
0239
0240 std::vector<bool> foundBranches;
0241 for (auto const& switchItem : switchMap) {
0242 auto const& switchLabel = switchItem.first;
0243 auto const& chosenBranches = switchItem.second.chosenBranches;
0244 auto const& caseLabels = switchItem.second.caseLabels;
0245 foundBranches.resize(chosenBranches.size());
0246 for (auto const& caseLabel : caseLabels) {
0247 std::fill(foundBranches.begin(), foundBranches.end(), false);
0248 for (auto& nonConstItem : preg.productListUpdator()) {
0249 auto const& item = nonConstItem;
0250 if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0251
0252
0253 if (!item.second.transient()) {
0254 auto range = std::equal_range(chosenBranches.begin(),
0255 chosenBranches.end(),
0256 BranchKey(item.first.friendlyClassName(),
0257 switchLabel,
0258 item.first.productInstanceName(),
0259 item.first.processName()));
0260 if (range.first == range.second) {
0261 Exception ex(errors::Configuration);
0262 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0263 << item.first << " that is not produced by the chosen case "
0264 << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0265 .getUntrackedParameter<std::string>("@chosen_case")
0266 << " and that product is not transient. "
0267 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0268 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0269 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0270 addProductsToException(caseLabels, ex);
0271 throw ex;
0272 }
0273 assert(std::distance(range.first, range.second) == 1);
0274 foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0275 }
0276
0277
0278
0279
0280
0281
0282
0283
0284
0285 nonConstItem.second.setTransient(true);
0286
0287
0288 auto const& bd = item.second;
0289 if (not bd.branchAliases().empty()) {
0290 auto ex = Exception(errors::UnimplementedFeature)
0291 << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0292 "aliases for SwitchProducer with label "
0293 << switchLabel << " for case " << caseLabel << ":";
0294 for (auto const& branchAlias : bd.branchAliases()) {
0295 ex << " " << branchAlias;
0296 }
0297 throw ex;
0298 }
0299 }
0300 }
0301
0302 for (size_t i = 0; i < chosenBranches.size(); i++) {
0303 if (not foundBranches[i]) {
0304 auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0305 .getUntrackedParameter<std::string>("@chosen_case");
0306 Exception ex(errors::Configuration);
0307 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0308 << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0309 << chosenLabel << " and that product is not transient. "
0310 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0311 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0312 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0313 addProductsToException(caseLabels, ex);
0314 throw ex;
0315 }
0316 }
0317 }
0318 }
0319 }
0320
0321 void reduceParameterSet(ParameterSet& proc_pset,
0322 vstring const& end_path_name_list,
0323 vstring& modulesInConfig,
0324 std::set<std::string> const& usedModuleLabels,
0325 std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0326
0327
0328
0329
0330
0331
0332
0333
0334
0335
0336
0337 vstring outputModuleLabels;
0338 std::string edmType;
0339 std::string const moduleEdmType("@module_edm_type");
0340 std::string const outputModule("OutputModule");
0341 std::string const edAnalyzer("EDAnalyzer");
0342 std::string const edFilter("EDFilter");
0343 std::string const edProducer("EDProducer");
0344
0345 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0346
0347
0348
0349 vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0350 std::set<std::string> modulesOnPaths;
0351 {
0352 std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0353 for (auto const& endPath : end_path_name_list) {
0354 noEndPaths.erase(endPath);
0355 }
0356 {
0357 vstring labels;
0358 for (auto const& path : noEndPaths) {
0359 labels = proc_pset.getParameter<vstring>(path);
0360 modulesOnPaths.insert(labels.begin(), labels.end());
0361 }
0362 }
0363 }
0364
0365
0366 std::vector<std::string> labelsToBeDropped;
0367 labelsToBeDropped.reserve(modulesInConfigSet.size());
0368 std::set_difference(modulesInConfigSet.begin(),
0369 modulesInConfigSet.end(),
0370 usedModuleLabels.begin(),
0371 usedModuleLabels.end(),
0372 std::back_inserter(labelsToBeDropped));
0373
0374 const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0375 for (auto const& modLabel : usedModuleLabels) {
0376
0377
0378 if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0379 edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0380 if (edmType == outputModule) {
0381 outputModuleLabels.push_back(modLabel);
0382 labelsToBeDropped.push_back(modLabel);
0383 }
0384 if (edmType == edAnalyzer) {
0385 if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0386 labelsToBeDropped.push_back(modLabel);
0387 }
0388 }
0389 }
0390 }
0391
0392 std::inplace_merge(
0393 labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0394
0395
0396 for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0397
0398
0399 vstring::iterator endAfterRemove =
0400 std::remove_if(modulesInConfig.begin(),
0401 modulesInConfig.end(),
0402 std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0403 modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0404 proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0405
0406
0407 vstring endPathsToBeDropped;
0408 vstring labels;
0409 for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0410 iEndPath != endEndPath;
0411 ++iEndPath) {
0412 labels = proc_pset.getParameter<vstring>(*iEndPath);
0413 vstring::iterator iSave = labels.begin();
0414 vstring::iterator iBegin = labels.begin();
0415
0416 for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0417 if (binary_search_string(labelsToBeDropped, *iLabel)) {
0418 if (binary_search_string(outputModuleLabels, *iLabel)) {
0419 outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0420 }
0421 } else {
0422 if (iSave != iLabel) {
0423 iSave->swap(*iLabel);
0424 }
0425 ++iSave;
0426 }
0427 }
0428 labels.erase(iSave, labels.end());
0429 if (labels.empty()) {
0430
0431 proc_pset.eraseSimpleParameter(*iEndPath);
0432 endPathsToBeDropped.push_back(*iEndPath);
0433 } else {
0434 proc_pset.addParameter<vstring>(*iEndPath, labels);
0435 }
0436 }
0437 sort_all(endPathsToBeDropped);
0438
0439
0440 endAfterRemove = std::remove_if(scheduledPaths.begin(),
0441 scheduledPaths.end(),
0442 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0443 scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0444 proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0445
0446
0447 vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0448 endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0449 scheduledEndPaths.end(),
0450 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0451 scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0452 proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0453 }
0454
0455 class RngEDConsumer : public EDConsumerBase {
0456 public:
0457 explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
0458 Service<RandomNumberGenerator> rng;
0459 if (rng.isAvailable()) {
0460 rng->consumes(consumesCollector());
0461 for (auto const& consumesInfo : this->consumesInfo()) {
0462 typesConsumed.emplace(consumesInfo.type());
0463 }
0464 }
0465 }
0466 };
0467
0468 template <typename F>
0469 auto doCleanup(F&& iF) {
0470 auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0471 CMS_SA_ALLOW try { f(); } catch (...) {
0472 }
0473 if (iPtr) {
0474 iTask.doneWaiting(*iPtr);
0475 }
0476 };
0477 return wrapped;
0478 }
0479 }
0480
0481
0482 typedef std::vector<std::string> vstring;
0483
0484
0485
0486 Schedule::Schedule(ParameterSet& proc_pset,
0487 service::TriggerNamesService const& tns,
0488 ProductRegistry& preg,
0489 ExceptionToActionTable const& actions,
0490 std::shared_ptr<ActivityRegistry> areg,
0491 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0492 PreallocationConfiguration const& prealloc,
0493 ProcessContext const* processContext,
0494 ModuleTypeResolverMaker const* resolverMaker)
0495 :
0496 resultsInserter_{tns.getTrigPaths().empty()
0497 ? std::shared_ptr<TriggerResultInserter>{}
0498 : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
0499 moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
0500 all_output_communicators_(),
0501 preallocConfig_(prealloc),
0502 pathNames_(&tns.getTrigPaths()),
0503 endPathNames_(&tns.getEndPaths()),
0504 wantSummary_(tns.wantSummary()) {
0505 makePathStatusInserters(pathStatusInserters_,
0506 *pathNames_,
0507 prealloc,
0508 preg,
0509 areg,
0510 processConfiguration,
0511 std::string("PathStatusInserter"));
0512
0513 if (endPathNames_->size() > 1) {
0514
0515 makePathStatusInserters(endPathStatusInserters_,
0516 *endPathNames_,
0517 prealloc,
0518 preg,
0519 areg,
0520 processConfiguration,
0521 std::string("EndPathStatusInserter"));
0522 }
0523
0524 assert(0 < prealloc.numberOfStreams());
0525 streamSchedules_.reserve(prealloc.numberOfStreams());
0526 for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0527 streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
0528 pathStatusInserters_,
0529 endPathStatusInserters_,
0530 moduleRegistry(),
0531 proc_pset,
0532 tns,
0533 prealloc,
0534 preg,
0535 actions,
0536 areg,
0537 processConfiguration,
0538 StreamID{i},
0539 processContext));
0540 }
0541
0542
0543
0544 const std::string kTriggerResults("TriggerResults");
0545 std::vector<std::string> modulesToUse;
0546 modulesToUse.reserve(streamSchedules_[0]->allWorkersLumisAndEvents().size());
0547 for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
0548 if (worker->description()->moduleLabel() != kTriggerResults) {
0549 modulesToUse.push_back(worker->description()->moduleLabel());
0550 }
0551 }
0552
0553 unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
0554 if (nUnscheduledModules > 0) {
0555 std::vector<std::string> temp;
0556 temp.reserve(modulesToUse.size());
0557 auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
0558 std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
0559 std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
0560 temp.swap(modulesToUse);
0561 }
0562
0563
0564 globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0565 pathStatusInserters_,
0566 endPathStatusInserters_,
0567 moduleRegistry(),
0568 modulesToUse,
0569 proc_pset,
0570 preg,
0571 prealloc,
0572 actions,
0573 areg,
0574 processConfiguration,
0575 processContext);
0576 }
0577
0578 void Schedule::finishSetup(ParameterSet& proc_pset,
0579 service::TriggerNamesService const& tns,
0580 ProductRegistry& preg,
0581 BranchIDListHelper& branchIDListHelper,
0582 ProcessBlockHelperBase& processBlockHelper,
0583 ThinnedAssociationsHelper& thinnedAssociationsHelper,
0584 SubProcessParentageHelper const* subProcessParentageHelper,
0585 std::shared_ptr<ActivityRegistry> areg,
0586 std::shared_ptr<ProcessConfiguration> processConfiguration,
0587 bool hasSubprocesses,
0588 PreallocationConfiguration const& prealloc,
0589 ProcessContext const* processContext) {
0590
0591
0592 const std::string kTriggerResults("TriggerResults");
0593
0594 std::set<std::string> usedModuleLabels;
0595 for (auto const& worker : allWorkers()) {
0596 if (worker->description()->moduleLabel() != kTriggerResults) {
0597 usedModuleLabels.insert(worker->description()->moduleLabel());
0598 }
0599 }
0600 std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0601 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0602 reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0603 {
0604 std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0605 detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0606 }
0607
0608
0609
0610 {
0611 auto const& unsched = streamSchedules_[0]->unscheduledWorkersLumisAndEvents();
0612 if (not unsched.empty()) {
0613 std::set<std::string> unscheduledModules;
0614 std::transform(unsched.begin(),
0615 unsched.end(),
0616 std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
0617 [](auto worker) { return worker->description()->moduleLabel(); });
0618 preg.setUnscheduledProducts(unscheduledModules);
0619 }
0620 }
0621
0622 processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0623 proc_pset.registerIt();
0624 processConfiguration->setParameterSetID(proc_pset.id());
0625 processConfiguration->setProcessConfigurationID();
0626
0627
0628
0629 size_t all_workers_count = allWorkers().size();
0630
0631 moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0632 auto comm = iHolder->createOutputModuleCommunicator();
0633 if (comm) {
0634 all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0635 }
0636 });
0637
0638 limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
0639
0640
0641
0642 assert(all_workers_count == allWorkers().size());
0643
0644 branchIDListHelper.updateFromRegistry(preg);
0645
0646 for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
0647 worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
0648 }
0649
0650 processBlockHelper.updateForNewProcess(preg, processConfiguration->processName());
0651
0652
0653
0654 for (auto& c : all_output_communicators_) {
0655 c->selectProducts(preg, thinnedAssociationsHelper, processBlockHelper);
0656 }
0657
0658 for (auto& product : preg.productListUpdator()) {
0659 setIsMergeable(product.second);
0660 }
0661
0662 {
0663
0664 std::set<TypeID> productTypesConsumed;
0665 std::set<TypeID> elementTypesConsumed;
0666
0667 for (auto const& worker : allWorkers()) {
0668 for (auto const& consumesInfo : worker->consumesInfo()) {
0669 if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0670 productTypesConsumed.emplace(consumesInfo.type());
0671 } else {
0672 elementTypesConsumed.emplace(consumesInfo.type());
0673 }
0674 }
0675 }
0676
0677 if (hasSubprocesses) {
0678 productTypesConsumed.emplace(typeid(TriggerResults));
0679 }
0680
0681 { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
0682 preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0683 }
0684
0685 for (auto& c : all_output_communicators_) {
0686 c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
0687 }
0688
0689 if (wantSummary_) {
0690 std::vector<const ModuleDescription*> modDesc;
0691 const auto& workers = allWorkers();
0692 modDesc.reserve(workers.size());
0693
0694 std::transform(workers.begin(),
0695 workers.end(),
0696 std::back_inserter(modDesc),
0697 [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0698
0699
0700 summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0701 auto timeKeeperPtr = summaryTimeKeeper_.get();
0702
0703 areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0704
0705 areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0706 areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0707 areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0708 areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0709 areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0710 areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0711
0712 areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0713 areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0714
0715 areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0716 areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0717
0718 areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0719 areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0720
0721
0722
0723 }
0724
0725 }
0726
0727 void Schedule::limitOutput(ParameterSet const& proc_pset,
0728 BranchIDLists const& branchIDLists,
0729 SubProcessParentageHelper const* subProcessParentageHelper) {
0730 std::string const output("output");
0731
0732 ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0733 int maxEventSpecs = 0;
0734 int maxEventsOut = -1;
0735 ParameterSet const* vMaxEventsOut = nullptr;
0736 std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0737 if (search_all(intNamesE, output)) {
0738 maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0739 ++maxEventSpecs;
0740 }
0741 std::vector<std::string> psetNamesE;
0742 maxEventsPSet.getParameterSetNames(psetNamesE, false);
0743 if (search_all(psetNamesE, output)) {
0744 vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0745 ++maxEventSpecs;
0746 }
0747
0748 if (maxEventSpecs > 1) {
0749 throw Exception(errors::Configuration)
0750 << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0751 }
0752
0753 for (auto& c : all_output_communicators_) {
0754 OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
0755 if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0756 std::string const& moduleLabel = c->description().moduleLabel();
0757 try {
0758 desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0759 } catch (Exception const&) {
0760 throw Exception(errors::Configuration)
0761 << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0762 }
0763 }
0764 c->configure(desc);
0765 }
0766 }
0767
0768 bool Schedule::terminate() const {
0769 if (all_output_communicators_.empty()) {
0770 return false;
0771 }
0772 for (auto& c : all_output_communicators_) {
0773 if (!c->limitReached()) {
0774
0775 return false;
0776 }
0777 }
0778 LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0779 << "has reached its configured limit.\n";
0780 return true;
0781 }
0782
0783 void Schedule::endJob(ExceptionCollector& collector) {
0784 globalSchedule_->endJob(collector);
0785 if (collector.hasThrown()) {
0786 return;
0787 }
0788
0789 if (wantSummary_) {
0790 try {
0791 convertException::wrap([this]() { sendFwkSummaryToMessageLogger(); });
0792 } catch (cms::Exception const& ex) {
0793 collector.addException(ex);
0794 }
0795 }
0796 }
0797
0798 void Schedule::sendFwkSummaryToMessageLogger() const {
0799
0800
0801 auto logForEach = [](auto const& iContainer, auto iMessage) {
0802 auto logger = LogFwkVerbatim("FwkSummary");
0803 int count = 0;
0804 for (auto const& element : iContainer) {
0805 iMessage(logger, element);
0806 if (++count == 10) {
0807 logger = LogFwkVerbatim("FwkSummary");
0808 count = 0;
0809 } else {
0810 logger << "\n";
0811 }
0812 }
0813 };
0814
0815 {
0816 TriggerReport tr;
0817 getTriggerReport(tr);
0818
0819
0820
0821 LogFwkVerbatim("FwkSummary") << "";
0822 if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
0823 LogFwkVerbatim("FwkSummary") << "TrigReport Process: "
0824 << streamSchedules_[0]->context().processContext()->processName();
0825 }
0826 LogFwkVerbatim("FwkSummary") << "TrigReport "
0827 << "---------- Event Summary ------------";
0828 if (!tr.trigPathSummaries.empty()) {
0829 LogFwkVerbatim("FwkSummary") << "TrigReport"
0830 << " Events total = " << tr.eventSummary.totalEvents
0831 << " passed = " << tr.eventSummary.totalEventsPassed
0832 << " failed = " << tr.eventSummary.totalEventsFailed << "";
0833 } else {
0834 LogFwkVerbatim("FwkSummary") << "TrigReport"
0835 << " Events total = " << tr.eventSummary.totalEvents
0836 << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0837 }
0838
0839 LogFwkVerbatim("FwkSummary") << "";
0840 LogFwkVerbatim("FwkSummary") << "TrigReport "
0841 << "---------- Path Summary ------------";
0842 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0843 << " " << std::right << std::setw(10) << "Executed"
0844 << " " << std::right << std::setw(10) << "Passed"
0845 << " " << std::right << std::setw(10) << "Failed"
0846 << " " << std::right << std::setw(10) << "Error"
0847 << " "
0848 << "Name"
0849 << "";
0850 logForEach(tr.trigPathSummaries, [](auto& logger, auto const& p) {
0851 logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << p.bitPosition << " "
0852 << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0853 << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0854 << p.timesExcept << " " << p.name;
0855 });
0856
0857
0858
0859
0860
0861
0862
0863
0864
0865
0866
0867
0868
0869
0870
0871
0872
0873
0874 LogFwkVerbatim("FwkSummary") << "\n"
0875 << "TrigReport "
0876 << "-------End-Path Summary ------------\n"
0877 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0878 << " " << std::right << std::setw(10) << "Executed"
0879 << " " << std::right << std::setw(10) << "Passed"
0880 << " " << std::right << std::setw(10) << "Failed"
0881 << " " << std::right << std::setw(10) << "Error"
0882 << " "
0883 << "Name";
0884 logForEach(tr.endPathSummaries, [](auto& logger, auto const& p) {
0885 logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << p.bitPosition << " "
0886 << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0887 << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0888 << p.timesExcept << " " << p.name;
0889 });
0890
0891 for (auto const& p : tr.trigPathSummaries) {
0892 LogFwkVerbatim("FwkSummary") << "\n"
0893 << "TrigReport "
0894 << "---------- Modules in Path: " << p.name << " ------------\n"
0895 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0896 << " " << std::right << std::setw(10) << "Visited"
0897 << " " << std::right << std::setw(10) << "Passed"
0898 << " " << std::right << std::setw(10) << "Failed"
0899 << " " << std::right << std::setw(10) << "Error"
0900 << " "
0901 << "Name";
0902
0903 logForEach(p.moduleInPathSummaries, [](auto& logger, auto const& mod) {
0904 logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << mod.bitPosition
0905 << " " << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0906 << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0907 << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0908 });
0909 }
0910
0911 for (auto const& p : tr.endPathSummaries) {
0912 LogFwkVerbatim("FwkSummary") << "\n"
0913 << "TrigReport "
0914 << "------ Modules in End-Path: " << p.name << " ------------\n"
0915 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0916 << " " << std::right << std::setw(10) << "Visited"
0917 << " " << std::right << std::setw(10) << "Passed"
0918 << " " << std::right << std::setw(10) << "Failed"
0919 << " " << std::right << std::setw(10) << "Error"
0920 << " "
0921 << "Name";
0922
0923 unsigned int bitpos = 0;
0924 logForEach(p.moduleInPathSummaries, [&bitpos](auto& logger, auto const& mod) {
0925 logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << bitpos << " "
0926 << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0927 << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0928 << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0929 ++bitpos;
0930 });
0931 }
0932
0933 LogFwkVerbatim("FwkSummary") << "\n"
0934 << "TrigReport "
0935 << "---------- Module Summary ------------\n"
0936 << "TrigReport " << std::right << std::setw(10) << "Visited"
0937 << " " << std::right << std::setw(10) << "Executed"
0938 << " " << std::right << std::setw(10) << "Passed"
0939 << " " << std::right << std::setw(10) << "Failed"
0940 << " " << std::right << std::setw(10) << "Error"
0941 << " "
0942 << "Name";
0943
0944 logForEach(tr.workerSummaries, [](auto& logger, auto const& worker) {
0945 logger << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " " << std::right
0946 << std::setw(10) << worker.timesRun << " " << std::right << std::setw(10) << worker.timesPassed << " "
0947 << std::right << std::setw(10) << worker.timesFailed << " " << std::right << std::setw(10)
0948 << worker.timesExcept << " " << worker.moduleLabel;
0949 });
0950 LogFwkVerbatim("FwkSummary") << "";
0951 }
0952
0953 TriggerTimingReport tr;
0954 getTriggerTimingReport(tr);
0955
0956 const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0957
0958 LogFwkVerbatim("FwkSummary") << "TimeReport "
0959 << "---------- Event Summary ---[sec]----";
0960 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0961 << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0962 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0963 << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0964 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0965 << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0966 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0967 << " efficiency CPU/Real/thread = "
0968 << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0969 preallocConfig_.numberOfThreads();
0970
0971 constexpr int kColumn1Size = 10;
0972 constexpr int kColumn2Size = 12;
0973 constexpr int kColumn3Size = 12;
0974 LogFwkVerbatim("FwkSummary") << "";
0975 LogFwkVerbatim("FwkSummary") << "TimeReport "
0976 << "---------- Path Summary ---[Real sec]----";
0977 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0978 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0979 << " Name";
0980 logForEach(tr.trigPathSummaries, [&](auto& logger, auto const& p) {
0981 const int timesRun = std::max(1, p.timesRun);
0982 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0983 << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0984 << " " << p.name;
0985 });
0986 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0987 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0988 << " Name";
0989
0990 LogFwkVerbatim("FwkSummary") << "\n"
0991 << "TimeReport "
0992 << "-------End-Path Summary ---[Real sec]----\n"
0993 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0994 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0995 << " Name";
0996 logForEach(tr.endPathSummaries, [&](auto& logger, auto const& p) {
0997 const int timesRun = std::max(1, p.timesRun);
0998
0999 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1000 << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
1001 << " " << p.name;
1002 });
1003 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1004 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1005 << " Name";
1006
1007 for (auto const& p : tr.trigPathSummaries) {
1008 LogFwkVerbatim("FwkSummary") << "\n"
1009 << "TimeReport "
1010 << "---------- Modules in Path: " << p.name << " ---[Real sec]----\n"
1011 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1012 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1013 << " Name";
1014 logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1015 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1016 << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1017 << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
1018 });
1019 }
1020 if (not tr.trigPathSummaries.empty()) {
1021 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1022 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1023 << " Name";
1024 }
1025 for (auto const& p : tr.endPathSummaries) {
1026 LogFwkVerbatim("FwkSummary") << "\n"
1027 << "TimeReport "
1028 << "------ Modules in End-Path: " << p.name << " ---[Real sec]----\n"
1029 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1030 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1031 << " Name";
1032 logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1033 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1034 << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1035 << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
1036 });
1037 }
1038 if (not tr.endPathSummaries.empty()) {
1039 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1040 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1041 << " Name";
1042 }
1043 LogFwkVerbatim("FwkSummary") << "\n"
1044 << "TimeReport "
1045 << "---------- Module Summary ---[Real sec]----\n"
1046 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1047 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1048 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1049 << " Name";
1050 logForEach(tr.workerSummaries, [&](auto& logger, auto const& worker) {
1051 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1052 << worker.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1053 << worker.realTime / std::max(1, worker.timesRun) << " " << std::right << std::setw(kColumn3Size)
1054 << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel;
1055 });
1056 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1057 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1058 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1059 << " Name";
1060
1061 LogFwkVerbatim("FwkSummary") << "\nT---Report end!\n\n";
1062 }
1063
1064 void Schedule::closeOutputFiles() {
1065 using std::placeholders::_1;
1066 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
1067 for (auto& worker : allWorkers()) {
1068 worker->respondToCloseOutputFile();
1069 }
1070 }
1071
1072 void Schedule::openOutputFiles(FileBlock& fb) {
1073 using std::placeholders::_1;
1074 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1075 }
1076
1077 void Schedule::writeRunAsync(WaitingTaskHolder task,
1078 RunPrincipal const& rp,
1079 ProcessContext const* processContext,
1080 ActivityRegistry* activityRegistry,
1081 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1082 auto token = ServiceRegistry::instance().presentToken();
1083 GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
1084 LuminosityBlockID(rp.run(), 0),
1085 rp.index(),
1086 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1087 rp.endTime(),
1088 processContext);
1089
1090 using namespace edm::waiting_task;
1091 chain::first([&](auto nextTask) {
1092
1093 ServiceRegistry::Operate op(token);
1094
1095
1096 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1097 }
1098 for (auto& c : all_output_communicators_) {
1099 c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1100 }
1101 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1102
1103 ServiceRegistry::Operate op(token);
1104
1105 activityRegistry->postGlobalWriteRunSignal_(globalContext);
1106 })) |
1107 chain::runLast(task);
1108 }
1109
1110 void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
1111 ProcessBlockPrincipal const& pbp,
1112 ProcessContext const* processContext,
1113 ActivityRegistry* activityRegistry) {
1114 auto token = ServiceRegistry::instance().presentToken();
1115 GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
1116 LuminosityBlockID(),
1117 RunIndex::invalidRunIndex(),
1118 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1119 Timestamp::invalidTimestamp(),
1120 processContext);
1121
1122 using namespace edm::waiting_task;
1123 chain::first([&](auto nextTask) {
1124
1125 ServiceRegistry::Operate op(token);
1126 CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1127 }
1128 for (auto& c : all_output_communicators_) {
1129 c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1130 }
1131 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1132
1133 ServiceRegistry::Operate op(token);
1134
1135 activityRegistry->postWriteProcessBlockSignal_(globalContext);
1136 })) |
1137 chain::runLast(std::move(task));
1138 }
1139
1140 void Schedule::writeLumiAsync(WaitingTaskHolder task,
1141 LuminosityBlockPrincipal const& lbp,
1142 ProcessContext const* processContext,
1143 ActivityRegistry* activityRegistry) {
1144 auto token = ServiceRegistry::instance().presentToken();
1145 GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1146 lbp.id(),
1147 lbp.runPrincipal().index(),
1148 lbp.index(),
1149 lbp.beginTime(),
1150 processContext);
1151
1152 using namespace edm::waiting_task;
1153 chain::first([&](auto nextTask) {
1154 ServiceRegistry::Operate op(token);
1155 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1156 }
1157 for (auto& c : all_output_communicators_) {
1158 c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1159 }
1160 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1161
1162 ServiceRegistry::Operate op(token);
1163
1164 activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1165 })) |
1166 chain::runLast(task);
1167 }
1168
1169 bool Schedule::shouldWeCloseOutput() const {
1170 using std::placeholders::_1;
1171
1172 return (std::find_if(all_output_communicators_.begin(),
1173 all_output_communicators_.end(),
1174 std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1175 all_output_communicators_.end());
1176 }
1177
1178 void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1179 using std::placeholders::_1;
1180 for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1181 }
1182
1183 void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1184 using std::placeholders::_1;
1185 for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1186 }
1187
1188 void Schedule::beginJob(ProductRegistry const& iRegistry,
1189 eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
1190 ProcessBlockHelperBase const& processBlockHelperBase,
1191 PathsAndConsumesOfModulesBase const& pathsAndConsumesOfModules,
1192 ProcessContext const& processContext) {
1193 globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase, pathsAndConsumesOfModules, processContext);
1194 }
1195
1196 void Schedule::beginStream(unsigned int streamID) {
1197 assert(streamID < streamSchedules_.size());
1198 streamSchedules_[streamID]->beginStream();
1199 }
1200
1201 void Schedule::endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept {
1202 assert(streamID < streamSchedules_.size());
1203 streamSchedules_[streamID]->endStream(collector, collectorMutex);
1204 }
1205
1206 void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1207 unsigned int iStreamID,
1208 EventTransitionInfo& info,
1209 ServiceToken const& token) {
1210 assert(iStreamID < streamSchedules_.size());
1211 streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1212 }
1213
1214 bool Schedule::changeModule(std::string const& iLabel,
1215 ParameterSet const& iPSet,
1216 const ProductRegistry& iRegistry,
1217 eventsetup::ESRecordsToProductResolverIndices const& iIndices) {
1218 Worker* found = nullptr;
1219 for (auto const& worker : allWorkers()) {
1220 if (worker->description()->moduleLabel() == iLabel) {
1221 found = worker;
1222 break;
1223 }
1224 }
1225 if (nullptr == found) {
1226 return false;
1227 }
1228
1229 auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1230
1231 globalSchedule_->replaceModule(newMod, iLabel);
1232
1233 for (auto& s : streamSchedules_) {
1234 s->replaceModule(newMod, iLabel);
1235 }
1236
1237 {
1238
1239 auto const processBlockLookup = iRegistry.productLookup(InProcess);
1240 auto const runLookup = iRegistry.productLookup(InRun);
1241 auto const lumiLookup = iRegistry.productLookup(InLumi);
1242 auto const eventLookup = iRegistry.productLookup(InEvent);
1243 found->updateLookup(InProcess, *runLookup);
1244 found->updateLookup(InRun, *runLookup);
1245 found->updateLookup(InLumi, *lumiLookup);
1246 found->updateLookup(InEvent, *eventLookup);
1247 found->updateLookup(iIndices);
1248
1249 auto const& processName = newMod->moduleDescription().processName();
1250 auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1251 auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1252 auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1253 auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1254 found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1255 found->resolvePutIndicies(InRun, runModuleToIndicies);
1256 found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1257 found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1258 }
1259
1260 return true;
1261 }
1262
1263 void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1264 globalSchedule_->deleteModule(iLabel);
1265 for (auto& stream : streamSchedules_) {
1266 stream->deleteModule(iLabel);
1267 }
1268 moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1269 }
1270
1271 void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1272 std::multimap<std::string, std::string> const& referencesToBranches,
1273 std::vector<std::string> const& modulesToSkip,
1274 edm::ProductRegistry const& preg) {
1275 for (auto& stream : streamSchedules_) {
1276 stream->initializeEarlyDelete(
1277 *moduleRegistry(), branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1278 }
1279 }
1280
1281 std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1282 std::vector<ModuleDescription const*> result;
1283 result.reserve(allWorkers().size());
1284
1285 for (auto const& worker : allWorkers()) {
1286 ModuleDescription const* p = worker->description();
1287 result.push_back(p);
1288 }
1289 return result;
1290 }
1291
1292 Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1293
1294 void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1295 for (auto const& worker : allWorkers()) {
1296 worker->convertCurrentProcessAlias(processName);
1297 }
1298 }
1299
1300 void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1301 streamSchedules_[0]->availablePaths(oLabelsToFill);
1302 }
1303
1304 void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1305
1306 void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1307
1308 void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1309 streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1310 }
1311
1312 void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1313 std::vector<ModuleDescription const*>& descriptions,
1314 unsigned int hint) const {
1315 streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1316 }
1317
1318 void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1319 std::vector<ModuleDescription const*>& descriptions,
1320 unsigned int hint) const {
1321 streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1322 }
1323
1324 void Schedule::fillModuleAndConsumesInfo(
1325 std::vector<ModuleDescription const*>& allModuleDescriptions,
1326 std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
1327 std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>& modulesWhoseProductsAreConsumedBy,
1328 std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
1329 ProductRegistry const& preg) const {
1330 allModuleDescriptions.clear();
1331 moduleIDToIndex.clear();
1332 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1333 modulesWhoseProductsAreConsumedBy[iBranchType].clear();
1334 }
1335 modulesInPreviousProcessesWhoseProductsAreConsumedBy.clear();
1336
1337 allModuleDescriptions.reserve(allWorkers().size());
1338 moduleIDToIndex.reserve(allWorkers().size());
1339 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1340 modulesWhoseProductsAreConsumedBy[iBranchType].resize(allWorkers().size());
1341 }
1342 modulesInPreviousProcessesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1343
1344 std::map<std::string, ModuleDescription const*> labelToDesc;
1345 unsigned int i = 0;
1346 for (auto const& worker : allWorkers()) {
1347 ModuleDescription const* p = worker->description();
1348 allModuleDescriptions.push_back(p);
1349 moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1350 labelToDesc[p->moduleLabel()] = p;
1351 ++i;
1352 }
1353 sort_all(moduleIDToIndex);
1354
1355 i = 0;
1356 for (auto const& worker : allWorkers()) {
1357 std::array<std::vector<ModuleDescription const*>*, NumBranchTypes> modules;
1358 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1359 modules[iBranchType] = &modulesWhoseProductsAreConsumedBy[iBranchType].at(i);
1360 }
1361
1362 std::vector<ModuleProcessName>& modulesInPreviousProcesses =
1363 modulesInPreviousProcessesWhoseProductsAreConsumedBy.at(i);
1364 try {
1365 worker->modulesWhoseProductsAreConsumed(modules, modulesInPreviousProcesses, preg, labelToDesc);
1366 } catch (cms::Exception& ex) {
1367 ex.addContext("Calling Worker::modulesWhoseProductsAreConsumed() for module " +
1368 worker->description()->moduleLabel());
1369 throw;
1370 }
1371 ++i;
1372 }
1373 }
1374
1375 void Schedule::getTriggerReport(TriggerReport& rep) const {
1376 rep.eventSummary.totalEvents = 0;
1377 rep.eventSummary.totalEventsPassed = 0;
1378 rep.eventSummary.totalEventsFailed = 0;
1379 for (auto& s : streamSchedules_) {
1380 s->getTriggerReport(rep);
1381 }
1382 sort_all(rep.workerSummaries);
1383 }
1384
1385 void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1386 rep.eventSummary.totalEvents = 0;
1387 rep.eventSummary.cpuTime = 0.;
1388 rep.eventSummary.realTime = 0.;
1389 summaryTimeKeeper_->fillTriggerTimingReport(rep);
1390 }
1391
1392 int Schedule::totalEvents() const {
1393 int returnValue = 0;
1394 for (auto& s : streamSchedules_) {
1395 returnValue += s->totalEvents();
1396 }
1397 return returnValue;
1398 }
1399
1400 int Schedule::totalEventsPassed() const {
1401 int returnValue = 0;
1402 for (auto& s : streamSchedules_) {
1403 returnValue += s->totalEventsPassed();
1404 }
1405 return returnValue;
1406 }
1407
1408 int Schedule::totalEventsFailed() const {
1409 int returnValue = 0;
1410 for (auto& s : streamSchedules_) {
1411 returnValue += s->totalEventsFailed();
1412 }
1413 return returnValue;
1414 }
1415
1416 void Schedule::clearCounters() {
1417 for (auto& s : streamSchedules_) {
1418 s->clearCounters();
1419 }
1420 }
1421 }