File indexing completed on 2023-01-11 16:27:16
0001 #include "FWCore/Framework/interface/Schedule.h"
0002
0003 #include "DataFormats/Common/interface/setIsMergeable.h"
0004 #include "DataFormats/Common/interface/TriggerResults.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0008 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0011 #include "FWCore/Framework/interface/EDConsumerBase.h"
0012 #include "FWCore/Framework/src/OutputModuleDescription.h"
0013 #include "FWCore/Framework/interface/SubProcess.h"
0014 #include "FWCore/Framework/interface/TriggerNamesService.h"
0015 #include "FWCore/Framework/src/TriggerReport.h"
0016 #include "FWCore/Framework/src/TriggerTimingReport.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/src/Factory.h"
0019 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0020 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0021 #include "FWCore/Framework/interface/ModuleRegistry.h"
0022 #include "FWCore/Framework/src/TriggerResultInserter.h"
0023 #include "FWCore/Framework/src/PathStatusInserter.h"
0024 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0025 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0026 #include "FWCore/Concurrency/interface/chain_first.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0031 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0032 #include "FWCore/Utilities/interface/Algorithms.h"
0033 #include "FWCore/Utilities/interface/ConvertException.h"
0034 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0035 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0036 #include "FWCore/Utilities/interface/TypeID.h"
0037 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0038
0039 #include <array>
0040 #include <algorithm>
0041 #include <cassert>
0042 #include <cstdlib>
0043 #include <functional>
0044 #include <iomanip>
0045 #include <list>
0046 #include <map>
0047 #include <set>
0048 #include <exception>
0049 #include <sstream>
0050
0051 #include "make_shared_noexcept_false.h"
0052 #include "processEDAliases.h"
0053
0054 namespace edm {
0055
0056 class Maker;
0057
0058 namespace {
0059 using std::placeholders::_1;
0060
0061 bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0062 return std::binary_search(v.begin(), v.end(), s);
0063 }
0064
0065
0066
0067
0068 std::shared_ptr<TriggerResultInserter> makeInserter(
0069 ParameterSet& proc_pset,
0070 PreallocationConfiguration const& iPrealloc,
0071 ProductRegistry& preg,
0072 ExceptionToActionTable const& actions,
0073 std::shared_ptr<ActivityRegistry> areg,
0074 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0075 ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
0076 trig_pset->registerIt();
0077
0078 WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
0079 ModuleDescription md(trig_pset->id(),
0080 "TriggerResultInserter",
0081 "TriggerResults",
0082 processConfiguration.get(),
0083 ModuleDescription::getUniqueID());
0084
0085 areg->preModuleConstructionSignal_(md);
0086 bool postCalled = false;
0087 std::shared_ptr<TriggerResultInserter> returnValue;
0088
0089 CMS_SA_ALLOW try {
0090 maker::ModuleHolderT<TriggerResultInserter> holder(
0091 make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),
0092 static_cast<Maker const*>(nullptr));
0093 holder.setModuleDescription(md);
0094 holder.registerProductsAndCallbacks(&preg);
0095 returnValue = holder.module();
0096 postCalled = true;
0097
0098 areg->postModuleConstructionSignal_(md);
0099 } catch (...) {
0100 if (!postCalled) {
0101 CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0102
0103 }
0104 }
0105 throw;
0106 }
0107 return returnValue;
0108 }
0109
0110 template <typename T>
0111 void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
0112 std::vector<std::string> const& pathNames,
0113 PreallocationConfiguration const& iPrealloc,
0114 ProductRegistry& preg,
0115 std::shared_ptr<ActivityRegistry> areg,
0116 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0117 std::string const& moduleTypeName) {
0118 ParameterSet pset;
0119 pset.addParameter<std::string>("@module_type", moduleTypeName);
0120 pset.addParameter<std::string>("@module_edm_type", "EDProducer");
0121 pset.registerIt();
0122
0123 pathStatusInserters.reserve(pathNames.size());
0124
0125 for (auto const& pathName : pathNames) {
0126 ModuleDescription md(
0127 pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
0128
0129 areg->preModuleConstructionSignal_(md);
0130 bool postCalled = false;
0131
0132 CMS_SA_ALLOW try {
0133 maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
0134 static_cast<Maker const*>(nullptr));
0135 holder.setModuleDescription(md);
0136 holder.registerProductsAndCallbacks(&preg);
0137 pathStatusInserters.emplace_back(holder.module());
0138 postCalled = true;
0139
0140 areg->postModuleConstructionSignal_(md);
0141 } catch (...) {
0142 if (!postCalled) {
0143 CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0144
0145 }
0146 }
0147 throw;
0148 }
0149 }
0150 }
0151
0152 typedef std::vector<std::string> vstring;
0153
0154 void processSwitchProducers(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
0155
0156 struct BranchesCases {
0157 BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0158 std::vector<BranchKey> chosenBranches;
0159 std::vector<std::string> caseLabels;
0160 };
0161 std::map<std::string, BranchesCases> switchMap;
0162 for (auto& prod : preg.productListUpdator()) {
0163 if (prod.second.isSwitchAlias()) {
0164 auto it = switchMap.find(prod.second.moduleLabel());
0165 if (it == switchMap.end()) {
0166 auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0167 auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0168 switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0169 assert(inserted.second);
0170 it = inserted.first;
0171 }
0172
0173 bool found = false;
0174 for (auto const& productIter : preg.productList()) {
0175 BranchKey const& branchKey = productIter.first;
0176
0177
0178
0179
0180 if (branchKey.processName() != processName) {
0181 continue;
0182 }
0183
0184 BranchDescription const& desc = productIter.second;
0185 if (desc.branchType() == prod.second.branchType() and
0186 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0187 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0188 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0189 prod.second.setSwitchAliasForBranch(desc);
0190 if (!prod.second.transient()) {
0191 it->second.chosenBranches.push_back(prod.first);
0192 }
0193 found = true;
0194 }
0195 }
0196 if (not found) {
0197 Exception ex(errors::LogicError);
0198 ex << "Trying to find a BranchDescription to be aliased-for by SwitchProducer with\n"
0199 << " friendly class name = " << prod.second.friendlyClassName() << "\n"
0200 << " module label = " << prod.second.moduleLabel() << "\n"
0201 << " product instance name = " << prod.second.productInstanceName() << "\n"
0202 << " process name = " << processName
0203 << "\n\nbut did not find any. Please contact a framework developer.";
0204 ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0205 throw ex;
0206 }
0207 }
0208 }
0209 if (switchMap.empty())
0210 return;
0211
0212 for (auto& elem : switchMap) {
0213 std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0214 }
0215
0216 auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0217 std::map<std::string, std::vector<BranchKey>> caseBranches;
0218 for (auto const& item : preg.productList()) {
0219 if (item.first.processName() != processName)
0220 continue;
0221
0222 if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0223 found != caseLabels.end()) {
0224 caseBranches[*found].push_back(item.first);
0225 }
0226 }
0227
0228 for (auto const& caseLabel : caseLabels) {
0229 ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0230 auto& branches = caseBranches[caseLabel];
0231 std::sort(branches.begin(), branches.end());
0232 for (auto const& branch : branches) {
0233 ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0234 }
0235 }
0236 };
0237
0238
0239
0240 std::vector<bool> foundBranches;
0241 for (auto const& switchItem : switchMap) {
0242 auto const& switchLabel = switchItem.first;
0243 auto const& chosenBranches = switchItem.second.chosenBranches;
0244 auto const& caseLabels = switchItem.second.caseLabels;
0245 foundBranches.resize(chosenBranches.size());
0246 for (auto const& caseLabel : caseLabels) {
0247 std::fill(foundBranches.begin(), foundBranches.end(), false);
0248 for (auto& nonConstItem : preg.productListUpdator()) {
0249 auto const& item = nonConstItem;
0250 if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0251
0252
0253 if (!item.second.transient()) {
0254 auto range = std::equal_range(chosenBranches.begin(),
0255 chosenBranches.end(),
0256 BranchKey(item.first.friendlyClassName(),
0257 switchLabel,
0258 item.first.productInstanceName(),
0259 item.first.processName()));
0260 if (range.first == range.second) {
0261 Exception ex(errors::Configuration);
0262 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0263 << item.first << " that is not produced by the chosen case "
0264 << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0265 .getUntrackedParameter<std::string>("@chosen_case")
0266 << " and that product is not transient. "
0267 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0268 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0269 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0270 addProductsToException(caseLabels, ex);
0271 throw ex;
0272 }
0273 assert(std::distance(range.first, range.second) == 1);
0274 foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0275 }
0276
0277
0278
0279
0280
0281
0282
0283
0284
0285 nonConstItem.second.setTransient(true);
0286
0287
0288 auto const& bd = item.second;
0289 if (not bd.branchAliases().empty()) {
0290 auto ex = Exception(errors::UnimplementedFeature)
0291 << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0292 "aliases for SwitchProducer with label "
0293 << switchLabel << " for case " << caseLabel << ":";
0294 for (auto const& branchAlias : bd.branchAliases()) {
0295 ex << " " << branchAlias;
0296 }
0297 throw ex;
0298 }
0299 }
0300 }
0301
0302 for (size_t i = 0; i < chosenBranches.size(); i++) {
0303 if (not foundBranches[i]) {
0304 auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0305 .getUntrackedParameter<std::string>("@chosen_case");
0306 Exception ex(errors::Configuration);
0307 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0308 << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0309 << chosenLabel << " and that product is not transient. "
0310 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0311 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0312 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0313 addProductsToException(caseLabels, ex);
0314 throw ex;
0315 }
0316 }
0317 }
0318 }
0319 }
0320
0321 void reduceParameterSet(ParameterSet& proc_pset,
0322 vstring const& end_path_name_list,
0323 vstring& modulesInConfig,
0324 std::set<std::string> const& usedModuleLabels,
0325 std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0326
0327
0328
0329
0330
0331
0332
0333
0334
0335
0336
0337 vstring outputModuleLabels;
0338 std::string edmType;
0339 std::string const moduleEdmType("@module_edm_type");
0340 std::string const outputModule("OutputModule");
0341 std::string const edAnalyzer("EDAnalyzer");
0342 std::string const edFilter("EDFilter");
0343 std::string const edProducer("EDProducer");
0344
0345 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0346
0347
0348
0349 vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0350 std::set<std::string> modulesOnPaths;
0351 {
0352 std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0353 for (auto const& endPath : end_path_name_list) {
0354 noEndPaths.erase(endPath);
0355 }
0356 {
0357 vstring labels;
0358 for (auto const& path : noEndPaths) {
0359 labels = proc_pset.getParameter<vstring>(path);
0360 modulesOnPaths.insert(labels.begin(), labels.end());
0361 }
0362 }
0363 }
0364
0365
0366 std::vector<std::string> labelsToBeDropped;
0367 labelsToBeDropped.reserve(modulesInConfigSet.size());
0368 std::set_difference(modulesInConfigSet.begin(),
0369 modulesInConfigSet.end(),
0370 usedModuleLabels.begin(),
0371 usedModuleLabels.end(),
0372 std::back_inserter(labelsToBeDropped));
0373
0374 const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0375 for (auto const& modLabel : usedModuleLabels) {
0376
0377
0378 if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0379 edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0380 if (edmType == outputModule) {
0381 outputModuleLabels.push_back(modLabel);
0382 labelsToBeDropped.push_back(modLabel);
0383 }
0384 if (edmType == edAnalyzer) {
0385 if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0386 labelsToBeDropped.push_back(modLabel);
0387 }
0388 }
0389 }
0390 }
0391
0392 std::inplace_merge(
0393 labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0394
0395
0396 for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0397
0398
0399 vstring::iterator endAfterRemove =
0400 std::remove_if(modulesInConfig.begin(),
0401 modulesInConfig.end(),
0402 std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0403 modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0404 proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0405
0406
0407 vstring endPathsToBeDropped;
0408 vstring labels;
0409 for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0410 iEndPath != endEndPath;
0411 ++iEndPath) {
0412 labels = proc_pset.getParameter<vstring>(*iEndPath);
0413 vstring::iterator iSave = labels.begin();
0414 vstring::iterator iBegin = labels.begin();
0415
0416 for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0417 if (binary_search_string(labelsToBeDropped, *iLabel)) {
0418 if (binary_search_string(outputModuleLabels, *iLabel)) {
0419 outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0420 }
0421 } else {
0422 if (iSave != iLabel) {
0423 iSave->swap(*iLabel);
0424 }
0425 ++iSave;
0426 }
0427 }
0428 labels.erase(iSave, labels.end());
0429 if (labels.empty()) {
0430
0431 proc_pset.eraseSimpleParameter(*iEndPath);
0432 endPathsToBeDropped.push_back(*iEndPath);
0433 } else {
0434 proc_pset.addParameter<vstring>(*iEndPath, labels);
0435 }
0436 }
0437 sort_all(endPathsToBeDropped);
0438
0439
0440 endAfterRemove = std::remove_if(scheduledPaths.begin(),
0441 scheduledPaths.end(),
0442 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0443 scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0444 proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0445
0446
0447 vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0448 endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0449 scheduledEndPaths.end(),
0450 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0451 scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0452 proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0453 }
0454
0455 class RngEDConsumer : public EDConsumerBase {
0456 public:
0457 explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
0458 Service<RandomNumberGenerator> rng;
0459 if (rng.isAvailable()) {
0460 rng->consumes(consumesCollector());
0461 for (auto const& consumesInfo : this->consumesInfo()) {
0462 typesConsumed.emplace(consumesInfo.type());
0463 }
0464 }
0465 }
0466 };
0467
0468 template <typename F>
0469 auto doCleanup(F&& iF) {
0470 auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0471 CMS_SA_ALLOW try { f(); } catch (...) {
0472 }
0473 if (iPtr) {
0474 iTask.doneWaiting(*iPtr);
0475 }
0476 };
0477 return wrapped;
0478 }
0479 }
0480
0481
0482 typedef std::vector<std::string> vstring;
0483
0484
0485
0486 Schedule::Schedule(ParameterSet& proc_pset,
0487 service::TriggerNamesService const& tns,
0488 ProductRegistry& preg,
0489 ExceptionToActionTable const& actions,
0490 std::shared_ptr<ActivityRegistry> areg,
0491 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0492 PreallocationConfiguration const& prealloc,
0493 ProcessContext const* processContext,
0494 ModuleTypeResolverMaker const* resolverMaker)
0495 :
0496 resultsInserter_{tns.getTrigPaths().empty()
0497 ? std::shared_ptr<TriggerResultInserter>{}
0498 : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
0499 moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
0500 all_output_communicators_(),
0501 preallocConfig_(prealloc),
0502 pathNames_(&tns.getTrigPaths()),
0503 endPathNames_(&tns.getEndPaths()),
0504 wantSummary_(tns.wantSummary()) {
0505 makePathStatusInserters(pathStatusInserters_,
0506 *pathNames_,
0507 prealloc,
0508 preg,
0509 areg,
0510 processConfiguration,
0511 std::string("PathStatusInserter"));
0512
0513 makePathStatusInserters(endPathStatusInserters_,
0514 *endPathNames_,
0515 prealloc,
0516 preg,
0517 areg,
0518 processConfiguration,
0519 std::string("EndPathStatusInserter"));
0520
0521 assert(0 < prealloc.numberOfStreams());
0522 streamSchedules_.reserve(prealloc.numberOfStreams());
0523 for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0524 streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
0525 pathStatusInserters_,
0526 endPathStatusInserters_,
0527 moduleRegistry(),
0528 proc_pset,
0529 tns,
0530 prealloc,
0531 preg,
0532 actions,
0533 areg,
0534 processConfiguration,
0535 StreamID{i},
0536 processContext));
0537 }
0538
0539
0540
0541 const std::string kTriggerResults("TriggerResults");
0542 std::vector<std::string> modulesToUse;
0543 modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
0544 for (auto const& worker : streamSchedules_[0]->allWorkers()) {
0545 if (worker->description()->moduleLabel() != kTriggerResults) {
0546 modulesToUse.push_back(worker->description()->moduleLabel());
0547 }
0548 }
0549
0550 unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
0551 if (nUnscheduledModules > 0) {
0552 std::vector<std::string> temp;
0553 temp.reserve(modulesToUse.size());
0554 auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
0555 std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
0556 std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
0557 temp.swap(modulesToUse);
0558 }
0559
0560
0561 globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0562 pathStatusInserters_,
0563 endPathStatusInserters_,
0564 moduleRegistry(),
0565 modulesToUse,
0566 proc_pset,
0567 preg,
0568 prealloc,
0569 actions,
0570 areg,
0571 processConfiguration,
0572 processContext);
0573 }
0574
0575 void Schedule::finishSetup(ParameterSet& proc_pset,
0576 service::TriggerNamesService const& tns,
0577 ProductRegistry& preg,
0578 BranchIDListHelper& branchIDListHelper,
0579 ProcessBlockHelperBase& processBlockHelper,
0580 ThinnedAssociationsHelper& thinnedAssociationsHelper,
0581 SubProcessParentageHelper const* subProcessParentageHelper,
0582 std::shared_ptr<ActivityRegistry> areg,
0583 std::shared_ptr<ProcessConfiguration> processConfiguration,
0584 bool hasSubprocesses,
0585 PreallocationConfiguration const& prealloc,
0586 ProcessContext const* processContext) {
0587
0588
0589 const std::string kTriggerResults("TriggerResults");
0590
0591 std::set<std::string> usedModuleLabels;
0592 for (auto const& worker : allWorkers()) {
0593 if (worker->description()->moduleLabel() != kTriggerResults) {
0594 usedModuleLabels.insert(worker->description()->moduleLabel());
0595 }
0596 }
0597 std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0598 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0599 reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0600 {
0601 std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0602 detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0603 }
0604
0605
0606
0607 {
0608 auto const& unsched = streamSchedules_[0]->unscheduledWorkers();
0609 if (not unsched.empty()) {
0610 std::set<std::string> unscheduledModules;
0611 std::transform(unsched.begin(),
0612 unsched.end(),
0613 std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
0614 [](auto worker) { return worker->description()->moduleLabel(); });
0615 preg.setUnscheduledProducts(unscheduledModules);
0616 }
0617 }
0618
0619 processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0620 proc_pset.registerIt();
0621 processConfiguration->setParameterSetID(proc_pset.id());
0622 processConfiguration->setProcessConfigurationID();
0623
0624
0625
0626 size_t all_workers_count = allWorkers().size();
0627
0628 moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0629 auto comm = iHolder->createOutputModuleCommunicator();
0630 if (comm) {
0631 all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0632 }
0633 });
0634
0635 limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
0636
0637
0638
0639 assert(all_workers_count == allWorkers().size());
0640
0641 branchIDListHelper.updateFromRegistry(preg);
0642
0643 for (auto const& worker : streamSchedules_[0]->allWorkers()) {
0644 worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
0645 }
0646
0647 processBlockHelper.updateForNewProcess(preg, processConfiguration->processName());
0648
0649
0650
0651 for (auto& c : all_output_communicators_) {
0652 c->selectProducts(preg, thinnedAssociationsHelper, processBlockHelper);
0653 }
0654
0655 for (auto& product : preg.productListUpdator()) {
0656 setIsMergeable(product.second);
0657 }
0658
0659 {
0660
0661 std::set<TypeID> productTypesConsumed;
0662 std::set<TypeID> elementTypesConsumed;
0663
0664 for (auto const& worker : allWorkers()) {
0665 for (auto const& consumesInfo : worker->consumesInfo()) {
0666 if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0667 productTypesConsumed.emplace(consumesInfo.type());
0668 } else {
0669 elementTypesConsumed.emplace(consumesInfo.type());
0670 }
0671 }
0672 }
0673
0674 if (hasSubprocesses) {
0675 productTypesConsumed.emplace(typeid(TriggerResults));
0676 }
0677
0678 { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
0679 preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0680 }
0681
0682 for (auto& c : all_output_communicators_) {
0683 c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
0684 }
0685
0686 if (wantSummary_) {
0687 std::vector<const ModuleDescription*> modDesc;
0688 const auto& workers = allWorkers();
0689 modDesc.reserve(workers.size());
0690
0691 std::transform(workers.begin(),
0692 workers.end(),
0693 std::back_inserter(modDesc),
0694 [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0695
0696
0697 summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0698 auto timeKeeperPtr = summaryTimeKeeper_.get();
0699
0700 areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0701
0702 areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0703 areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0704 areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0705 areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0706 areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0707 areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0708
0709 areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0710 areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0711
0712 areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0713 areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0714
0715 areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0716 areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0717
0718
0719
0720 }
0721
0722 }
0723
0724 void Schedule::limitOutput(ParameterSet const& proc_pset,
0725 BranchIDLists const& branchIDLists,
0726 SubProcessParentageHelper const* subProcessParentageHelper) {
0727 std::string const output("output");
0728
0729 ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0730 int maxEventSpecs = 0;
0731 int maxEventsOut = -1;
0732 ParameterSet const* vMaxEventsOut = nullptr;
0733 std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0734 if (search_all(intNamesE, output)) {
0735 maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0736 ++maxEventSpecs;
0737 }
0738 std::vector<std::string> psetNamesE;
0739 maxEventsPSet.getParameterSetNames(psetNamesE, false);
0740 if (search_all(psetNamesE, output)) {
0741 vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0742 ++maxEventSpecs;
0743 }
0744
0745 if (maxEventSpecs > 1) {
0746 throw Exception(errors::Configuration)
0747 << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0748 }
0749
0750 for (auto& c : all_output_communicators_) {
0751 OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
0752 if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0753 std::string const& moduleLabel = c->description().moduleLabel();
0754 try {
0755 desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0756 } catch (Exception const&) {
0757 throw Exception(errors::Configuration)
0758 << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0759 }
0760 }
0761 c->configure(desc);
0762 }
0763 }
0764
0765 bool Schedule::terminate() const {
0766 if (all_output_communicators_.empty()) {
0767 return false;
0768 }
0769 for (auto& c : all_output_communicators_) {
0770 if (!c->limitReached()) {
0771
0772 return false;
0773 }
0774 }
0775 LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0776 << "has reached its configured limit.\n";
0777 return true;
0778 }
0779
0780 void Schedule::endJob(ExceptionCollector& collector) {
0781 globalSchedule_->endJob(collector);
0782 if (collector.hasThrown()) {
0783 return;
0784 }
0785
0786 if (wantSummary_ == false)
0787 return;
0788 {
0789 TriggerReport tr;
0790 getTriggerReport(tr);
0791
0792
0793
0794 LogFwkVerbatim("FwkSummary") << "";
0795 if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
0796 LogFwkVerbatim("FwkSummary") << "TrigReport Process: "
0797 << streamSchedules_[0]->context().processContext()->processName();
0798 }
0799 LogFwkVerbatim("FwkSummary") << "TrigReport "
0800 << "---------- Event Summary ------------";
0801 if (!tr.trigPathSummaries.empty()) {
0802 LogFwkVerbatim("FwkSummary") << "TrigReport"
0803 << " Events total = " << tr.eventSummary.totalEvents
0804 << " passed = " << tr.eventSummary.totalEventsPassed
0805 << " failed = " << tr.eventSummary.totalEventsFailed << "";
0806 } else {
0807 LogFwkVerbatim("FwkSummary") << "TrigReport"
0808 << " Events total = " << tr.eventSummary.totalEvents
0809 << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0810 }
0811
0812 LogFwkVerbatim("FwkSummary") << "";
0813 LogFwkVerbatim("FwkSummary") << "TrigReport "
0814 << "---------- Path Summary ------------";
0815 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0816 << " " << std::right << std::setw(10) << "Executed"
0817 << " " << std::right << std::setw(10) << "Passed"
0818 << " " << std::right << std::setw(10) << "Failed"
0819 << " " << std::right << std::setw(10) << "Error"
0820 << " "
0821 << "Name"
0822 << "";
0823 for (auto const& p : tr.trigPathSummaries) {
0824 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
0825 << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
0826 << std::right << std::setw(10) << p.timesPassed << " " << std::right
0827 << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0828 << p.timesExcept << " " << p.name << "";
0829 }
0830
0831
0832
0833
0834
0835
0836
0837
0838
0839
0840
0841
0842
0843
0844
0845
0846
0847
0848 LogFwkVerbatim("FwkSummary") << "";
0849 LogFwkVerbatim("FwkSummary") << "TrigReport "
0850 << "-------End-Path Summary ------------";
0851 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0852 << " " << std::right << std::setw(10) << "Executed"
0853 << " " << std::right << std::setw(10) << "Passed"
0854 << " " << std::right << std::setw(10) << "Failed"
0855 << " " << std::right << std::setw(10) << "Error"
0856 << " "
0857 << "Name"
0858 << "";
0859 for (auto const& p : tr.endPathSummaries) {
0860 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
0861 << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
0862 << std::right << std::setw(10) << p.timesPassed << " " << std::right
0863 << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0864 << p.timesExcept << " " << p.name << "";
0865 }
0866
0867 for (auto const& p : tr.trigPathSummaries) {
0868 LogFwkVerbatim("FwkSummary") << "";
0869 LogFwkVerbatim("FwkSummary") << "TrigReport "
0870 << "---------- Modules in Path: " << p.name << " ------------";
0871 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0872 << " " << std::right << std::setw(10) << "Visited"
0873 << " " << std::right << std::setw(10) << "Passed"
0874 << " " << std::right << std::setw(10) << "Failed"
0875 << " " << std::right << std::setw(10) << "Error"
0876 << " "
0877 << "Name"
0878 << "";
0879
0880 for (auto const& mod : p.moduleInPathSummaries) {
0881 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
0882 << mod.bitPosition << " " << std::right << std::setw(10) << mod.timesVisited
0883 << " " << std::right << std::setw(10) << mod.timesPassed << " " << std::right
0884 << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
0885 << mod.timesExcept << " " << mod.moduleLabel << "";
0886 }
0887 }
0888
0889 for (auto const& p : tr.endPathSummaries) {
0890 LogFwkVerbatim("FwkSummary") << "";
0891 LogFwkVerbatim("FwkSummary") << "TrigReport "
0892 << "------ Modules in End-Path: " << p.name << " ------------";
0893 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0894 << " " << std::right << std::setw(10) << "Visited"
0895 << " " << std::right << std::setw(10) << "Passed"
0896 << " " << std::right << std::setw(10) << "Failed"
0897 << " " << std::right << std::setw(10) << "Error"
0898 << " "
0899 << "Name"
0900 << "";
0901
0902 unsigned int bitpos = 0;
0903 for (auto const& mod : p.moduleInPathSummaries) {
0904 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
0905 << bitpos << " " << std::right << std::setw(10) << mod.timesVisited << " "
0906 << std::right << std::setw(10) << mod.timesPassed << " " << std::right
0907 << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
0908 << mod.timesExcept << " " << mod.moduleLabel << "";
0909 ++bitpos;
0910 }
0911 }
0912
0913 LogFwkVerbatim("FwkSummary") << "";
0914 LogFwkVerbatim("FwkSummary") << "TrigReport "
0915 << "---------- Module Summary ------------";
0916 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Visited"
0917 << " " << std::right << std::setw(10) << "Executed"
0918 << " " << std::right << std::setw(10) << "Passed"
0919 << " " << std::right << std::setw(10) << "Failed"
0920 << " " << std::right << std::setw(10) << "Error"
0921 << " "
0922 << "Name"
0923 << "";
0924 for (auto const& worker : tr.workerSummaries) {
0925 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " "
0926 << std::right << std::setw(10) << worker.timesRun << " " << std::right
0927 << std::setw(10) << worker.timesPassed << " " << std::right << std::setw(10)
0928 << worker.timesFailed << " " << std::right << std::setw(10) << worker.timesExcept
0929 << " " << worker.moduleLabel << "";
0930 }
0931 LogFwkVerbatim("FwkSummary") << "";
0932 }
0933
0934 TriggerTimingReport tr;
0935 getTriggerTimingReport(tr);
0936
0937 const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0938
0939 LogFwkVerbatim("FwkSummary") << "TimeReport "
0940 << "---------- Event Summary ---[sec]----";
0941 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0942 << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0943 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0944 << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0945 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0946 << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0947 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0948 << " efficiency CPU/Real/thread = "
0949 << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0950 preallocConfig_.numberOfThreads();
0951
0952 constexpr int kColumn1Size = 10;
0953 constexpr int kColumn2Size = 12;
0954 constexpr int kColumn3Size = 12;
0955 LogFwkVerbatim("FwkSummary") << "";
0956 LogFwkVerbatim("FwkSummary") << "TimeReport "
0957 << "---------- Path Summary ---[Real sec]----";
0958 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0959 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0960 << " Name";
0961 for (auto const& p : tr.trigPathSummaries) {
0962 const int timesRun = std::max(1, p.timesRun);
0963 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0964 << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
0965 << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
0966 }
0967 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0968 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0969 << " Name"
0970 << "";
0971
0972 LogFwkVerbatim("FwkSummary") << "";
0973 LogFwkVerbatim("FwkSummary") << "TimeReport "
0974 << "-------End-Path Summary ---[Real sec]----";
0975 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0976 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0977 << " Name"
0978 << "";
0979 for (auto const& p : tr.endPathSummaries) {
0980 const int timesRun = std::max(1, p.timesRun);
0981
0982 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0983 << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
0984 << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
0985 }
0986 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0987 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0988 << " Name"
0989 << "";
0990
0991 for (auto const& p : tr.trigPathSummaries) {
0992 LogFwkVerbatim("FwkSummary") << "";
0993 LogFwkVerbatim("FwkSummary") << "TimeReport "
0994 << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
0995 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0996 << " " << std::right << std::setw(kColumn2Size) << "per visit"
0997 << " Name"
0998 << "";
0999 for (auto const& mod : p.moduleInPathSummaries) {
1000 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1001 << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
1002 << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
1003 << mod.moduleLabel << "";
1004 }
1005 }
1006 if (not tr.trigPathSummaries.empty()) {
1007 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1008 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1009 << " Name"
1010 << "";
1011 }
1012 for (auto const& p : tr.endPathSummaries) {
1013 LogFwkVerbatim("FwkSummary") << "";
1014 LogFwkVerbatim("FwkSummary") << "TimeReport "
1015 << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
1016 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1017 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1018 << " Name"
1019 << "";
1020 for (auto const& mod : p.moduleInPathSummaries) {
1021 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1022 << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
1023 << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
1024 << mod.moduleLabel << "";
1025 }
1026 }
1027 if (not tr.endPathSummaries.empty()) {
1028 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1029 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1030 << " Name"
1031 << "";
1032 }
1033 LogFwkVerbatim("FwkSummary") << "";
1034 LogFwkVerbatim("FwkSummary") << "TimeReport "
1035 << "---------- Module Summary ---[Real sec]----";
1036 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1037 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1038 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1039 << " Name"
1040 << "";
1041 for (auto const& worker : tr.workerSummaries) {
1042 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1043 << std::setw(kColumn1Size) << worker.realTime / totalEvents << " " << std::right
1044 << std::setw(kColumn2Size) << worker.realTime / std::max(1, worker.timesRun) << " "
1045 << std::right << std::setw(kColumn3Size)
1046 << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel
1047 << "";
1048 }
1049 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1050 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1051 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1052 << " Name"
1053 << "";
1054
1055 LogFwkVerbatim("FwkSummary") << "";
1056 LogFwkVerbatim("FwkSummary") << "T---Report end!"
1057 << "";
1058 LogFwkVerbatim("FwkSummary") << "";
1059 }
1060
1061 void Schedule::closeOutputFiles() {
1062 using std::placeholders::_1;
1063 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
1064 for (auto& worker : allWorkers()) {
1065 worker->respondToCloseOutputFile();
1066 }
1067 }
1068
1069 void Schedule::openOutputFiles(FileBlock& fb) {
1070 using std::placeholders::_1;
1071 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1072 }
1073
1074 void Schedule::writeRunAsync(WaitingTaskHolder task,
1075 RunPrincipal const& rp,
1076 ProcessContext const* processContext,
1077 ActivityRegistry* activityRegistry,
1078 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1079 auto token = ServiceRegistry::instance().presentToken();
1080 GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
1081 LuminosityBlockID(rp.run(), 0),
1082 rp.index(),
1083 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1084 rp.endTime(),
1085 processContext);
1086
1087 using namespace edm::waiting_task;
1088 chain::first([&](auto nextTask) {
1089
1090 ServiceRegistry::Operate op(token);
1091
1092
1093 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1094 }
1095 for (auto& c : all_output_communicators_) {
1096 c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1097 }
1098 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1099
1100 ServiceRegistry::Operate op(token);
1101
1102 activityRegistry->postGlobalWriteRunSignal_(globalContext);
1103 })) |
1104 chain::runLast(task);
1105 }
1106
1107 void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
1108 ProcessBlockPrincipal const& pbp,
1109 ProcessContext const* processContext,
1110 ActivityRegistry* activityRegistry) {
1111 auto token = ServiceRegistry::instance().presentToken();
1112 GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
1113 LuminosityBlockID(),
1114 RunIndex::invalidRunIndex(),
1115 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1116 Timestamp::invalidTimestamp(),
1117 processContext);
1118
1119 using namespace edm::waiting_task;
1120 chain::first([&](auto nextTask) {
1121
1122 ServiceRegistry::Operate op(token);
1123 CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1124 }
1125 for (auto& c : all_output_communicators_) {
1126 c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1127 }
1128 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1129
1130 ServiceRegistry::Operate op(token);
1131
1132 activityRegistry->postWriteProcessBlockSignal_(globalContext);
1133 })) |
1134 chain::runLast(std::move(task));
1135 }
1136
1137 void Schedule::writeLumiAsync(WaitingTaskHolder task,
1138 LuminosityBlockPrincipal const& lbp,
1139 ProcessContext const* processContext,
1140 ActivityRegistry* activityRegistry) {
1141 auto token = ServiceRegistry::instance().presentToken();
1142 GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1143 lbp.id(),
1144 lbp.runPrincipal().index(),
1145 lbp.index(),
1146 lbp.beginTime(),
1147 processContext);
1148
1149 using namespace edm::waiting_task;
1150 chain::first([&](auto nextTask) {
1151 ServiceRegistry::Operate op(token);
1152 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1153 }
1154 for (auto& c : all_output_communicators_) {
1155 c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1156 }
1157 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1158
1159 ServiceRegistry::Operate op(token);
1160
1161 activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1162 })) |
1163 chain::runLast(task);
1164 }
1165
1166 bool Schedule::shouldWeCloseOutput() const {
1167 using std::placeholders::_1;
1168
1169 return (std::find_if(all_output_communicators_.begin(),
1170 all_output_communicators_.end(),
1171 std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1172 all_output_communicators_.end());
1173 }
1174
1175 void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1176 using std::placeholders::_1;
1177 for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1178 }
1179
1180 void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1181 using std::placeholders::_1;
1182 for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1183 }
1184
1185 void Schedule::beginJob(ProductRegistry const& iRegistry,
1186 eventsetup::ESRecordsToProxyIndices const& iESIndices,
1187 ProcessBlockHelperBase const& processBlockHelperBase) {
1188 globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase);
1189 }
1190
1191 void Schedule::beginStream(unsigned int iStreamID) {
1192 assert(iStreamID < streamSchedules_.size());
1193 streamSchedules_[iStreamID]->beginStream();
1194 }
1195
1196 void Schedule::endStream(unsigned int iStreamID) {
1197 assert(iStreamID < streamSchedules_.size());
1198 streamSchedules_[iStreamID]->endStream();
1199 }
1200
1201 void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1202 unsigned int iStreamID,
1203 EventTransitionInfo& info,
1204 ServiceToken const& token) {
1205 assert(iStreamID < streamSchedules_.size());
1206 streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1207 }
1208
1209 bool Schedule::changeModule(std::string const& iLabel,
1210 ParameterSet const& iPSet,
1211 const ProductRegistry& iRegistry,
1212 eventsetup::ESRecordsToProxyIndices const& iIndices) {
1213 Worker* found = nullptr;
1214 for (auto const& worker : allWorkers()) {
1215 if (worker->description()->moduleLabel() == iLabel) {
1216 found = worker;
1217 break;
1218 }
1219 }
1220 if (nullptr == found) {
1221 return false;
1222 }
1223
1224 auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1225
1226 globalSchedule_->replaceModule(newMod, iLabel);
1227
1228 for (auto& s : streamSchedules_) {
1229 s->replaceModule(newMod, iLabel);
1230 }
1231
1232 {
1233
1234 auto const processBlockLookup = iRegistry.productLookup(InProcess);
1235 auto const runLookup = iRegistry.productLookup(InRun);
1236 auto const lumiLookup = iRegistry.productLookup(InLumi);
1237 auto const eventLookup = iRegistry.productLookup(InEvent);
1238 found->updateLookup(InProcess, *runLookup);
1239 found->updateLookup(InRun, *runLookup);
1240 found->updateLookup(InLumi, *lumiLookup);
1241 found->updateLookup(InEvent, *eventLookup);
1242 found->updateLookup(iIndices);
1243
1244 auto const& processName = newMod->moduleDescription().processName();
1245 auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1246 auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1247 auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1248 auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1249 found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1250 found->resolvePutIndicies(InRun, runModuleToIndicies);
1251 found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1252 found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1253 }
1254
1255 return true;
1256 }
1257
1258 void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1259 globalSchedule_->deleteModule(iLabel);
1260 for (auto& stream : streamSchedules_) {
1261 stream->deleteModule(iLabel);
1262 }
1263 moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1264 }
1265
1266 void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1267 std::multimap<std::string, std::string> const& referencesToBranches,
1268 std::vector<std::string> const& modulesToSkip,
1269 edm::ProductRegistry const& preg) {
1270 for (auto& stream : streamSchedules_) {
1271 stream->initializeEarlyDelete(
1272 *moduleRegistry(), branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1273 }
1274 }
1275
1276 std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1277 std::vector<ModuleDescription const*> result;
1278 result.reserve(allWorkers().size());
1279
1280 for (auto const& worker : allWorkers()) {
1281 ModuleDescription const* p = worker->description();
1282 result.push_back(p);
1283 }
1284 return result;
1285 }
1286
1287 Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1288
1289 void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1290 for (auto const& worker : allWorkers()) {
1291 worker->convertCurrentProcessAlias(processName);
1292 }
1293 }
1294
1295 void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1296 streamSchedules_[0]->availablePaths(oLabelsToFill);
1297 }
1298
1299 void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1300
1301 void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1302
1303 void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1304 streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1305 }
1306
1307 void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1308 std::vector<ModuleDescription const*>& descriptions,
1309 unsigned int hint) const {
1310 streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1311 }
1312
1313 void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1314 std::vector<ModuleDescription const*>& descriptions,
1315 unsigned int hint) const {
1316 streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1317 }
1318
1319 void Schedule::fillModuleAndConsumesInfo(
1320 std::vector<ModuleDescription const*>& allModuleDescriptions,
1321 std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
1322 std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>& modulesWhoseProductsAreConsumedBy,
1323 std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
1324 ProductRegistry const& preg) const {
1325 allModuleDescriptions.clear();
1326 moduleIDToIndex.clear();
1327 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1328 modulesWhoseProductsAreConsumedBy[iBranchType].clear();
1329 }
1330 modulesInPreviousProcessesWhoseProductsAreConsumedBy.clear();
1331
1332 allModuleDescriptions.reserve(allWorkers().size());
1333 moduleIDToIndex.reserve(allWorkers().size());
1334 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1335 modulesWhoseProductsAreConsumedBy[iBranchType].resize(allWorkers().size());
1336 }
1337 modulesInPreviousProcessesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1338
1339 std::map<std::string, ModuleDescription const*> labelToDesc;
1340 unsigned int i = 0;
1341 for (auto const& worker : allWorkers()) {
1342 ModuleDescription const* p = worker->description();
1343 allModuleDescriptions.push_back(p);
1344 moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1345 labelToDesc[p->moduleLabel()] = p;
1346 ++i;
1347 }
1348 sort_all(moduleIDToIndex);
1349
1350 i = 0;
1351 for (auto const& worker : allWorkers()) {
1352 std::array<std::vector<ModuleDescription const*>*, NumBranchTypes> modules;
1353 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1354 modules[iBranchType] = &modulesWhoseProductsAreConsumedBy[iBranchType].at(i);
1355 }
1356
1357 std::vector<ModuleProcessName>& modulesInPreviousProcesses =
1358 modulesInPreviousProcessesWhoseProductsAreConsumedBy.at(i);
1359 try {
1360 worker->modulesWhoseProductsAreConsumed(modules, modulesInPreviousProcesses, preg, labelToDesc);
1361 } catch (cms::Exception& ex) {
1362 ex.addContext("Calling Worker::modulesWhoseProductsAreConsumed() for module " +
1363 worker->description()->moduleLabel());
1364 throw;
1365 }
1366 ++i;
1367 }
1368 }
1369
1370 void Schedule::getTriggerReport(TriggerReport& rep) const {
1371 rep.eventSummary.totalEvents = 0;
1372 rep.eventSummary.totalEventsPassed = 0;
1373 rep.eventSummary.totalEventsFailed = 0;
1374 for (auto& s : streamSchedules_) {
1375 s->getTriggerReport(rep);
1376 }
1377 sort_all(rep.workerSummaries);
1378 }
1379
1380 void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1381 rep.eventSummary.totalEvents = 0;
1382 rep.eventSummary.cpuTime = 0.;
1383 rep.eventSummary.realTime = 0.;
1384 summaryTimeKeeper_->fillTriggerTimingReport(rep);
1385 }
1386
1387 int Schedule::totalEvents() const {
1388 int returnValue = 0;
1389 for (auto& s : streamSchedules_) {
1390 returnValue += s->totalEvents();
1391 }
1392 return returnValue;
1393 }
1394
1395 int Schedule::totalEventsPassed() const {
1396 int returnValue = 0;
1397 for (auto& s : streamSchedules_) {
1398 returnValue += s->totalEventsPassed();
1399 }
1400 return returnValue;
1401 }
1402
1403 int Schedule::totalEventsFailed() const {
1404 int returnValue = 0;
1405 for (auto& s : streamSchedules_) {
1406 returnValue += s->totalEventsFailed();
1407 }
1408 return returnValue;
1409 }
1410
1411 void Schedule::clearCounters() {
1412 for (auto& s : streamSchedules_) {
1413 s->clearCounters();
1414 }
1415 }
1416 }