File indexing completed on 2024-06-04 04:34:55
0001 #include "FWCore/Framework/interface/Schedule.h"
0002
0003 #include "DataFormats/Common/interface/setIsMergeable.h"
0004 #include "DataFormats/Common/interface/TriggerResults.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0008 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0011 #include "FWCore/Framework/interface/EDConsumerBase.h"
0012 #include "FWCore/Framework/src/OutputModuleDescription.h"
0013 #include "FWCore/Framework/interface/SubProcess.h"
0014 #include "FWCore/Framework/interface/TriggerNamesService.h"
0015 #include "FWCore/Framework/src/TriggerReport.h"
0016 #include "FWCore/Framework/src/TriggerTimingReport.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/src/Factory.h"
0019 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0020 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0021 #include "FWCore/Framework/interface/ModuleRegistry.h"
0022 #include "FWCore/Framework/src/TriggerResultInserter.h"
0023 #include "FWCore/Framework/src/PathStatusInserter.h"
0024 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0025 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0026 #include "FWCore/Concurrency/interface/chain_first.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0031 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0032 #include "FWCore/Utilities/interface/Algorithms.h"
0033 #include "FWCore/Utilities/interface/ConvertException.h"
0034 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0035 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0036 #include "FWCore/Utilities/interface/TypeID.h"
0037 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0038
0039 #include <array>
0040 #include <algorithm>
0041 #include <cassert>
0042 #include <cstdlib>
0043 #include <functional>
0044 #include <iomanip>
0045 #include <list>
0046 #include <map>
0047 #include <set>
0048 #include <exception>
0049 #include <sstream>
0050
0051 #include "make_shared_noexcept_false.h"
0052 #include "processEDAliases.h"
0053
0054 namespace edm {
0055
0056 class Maker;
0057
0058 namespace {
0059 using std::placeholders::_1;
0060
0061 bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0062 return std::binary_search(v.begin(), v.end(), s);
0063 }
0064
0065
0066
0067
0068 std::shared_ptr<TriggerResultInserter> makeInserter(
0069 ParameterSet& proc_pset,
0070 PreallocationConfiguration const& iPrealloc,
0071 ProductRegistry& preg,
0072 ExceptionToActionTable const& actions,
0073 std::shared_ptr<ActivityRegistry> areg,
0074 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0075 ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
0076 trig_pset->registerIt();
0077
0078 WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
0079 ModuleDescription md(trig_pset->id(),
0080 "TriggerResultInserter",
0081 "TriggerResults",
0082 processConfiguration.get(),
0083 ModuleDescription::getUniqueID());
0084
0085 areg->preModuleConstructionSignal_(md);
0086 bool postCalled = false;
0087 std::shared_ptr<TriggerResultInserter> returnValue;
0088
0089 CMS_SA_ALLOW try {
0090 maker::ModuleHolderT<TriggerResultInserter> holder(
0091 make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),
0092 static_cast<Maker const*>(nullptr));
0093 holder.setModuleDescription(md);
0094 holder.registerProductsAndCallbacks(&preg);
0095 returnValue = holder.module();
0096 postCalled = true;
0097
0098 areg->postModuleConstructionSignal_(md);
0099 } catch (...) {
0100 if (!postCalled) {
0101 CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0102
0103 }
0104 }
0105 throw;
0106 }
0107 return returnValue;
0108 }
0109
0110 template <typename T>
0111 void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
0112 std::vector<std::string> const& pathNames,
0113 PreallocationConfiguration const& iPrealloc,
0114 ProductRegistry& preg,
0115 std::shared_ptr<ActivityRegistry> areg,
0116 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0117 std::string const& moduleTypeName) {
0118 ParameterSet pset;
0119 pset.addParameter<std::string>("@module_type", moduleTypeName);
0120 pset.addParameter<std::string>("@module_edm_type", "EDProducer");
0121 pset.registerIt();
0122
0123 pathStatusInserters.reserve(pathNames.size());
0124
0125 for (auto const& pathName : pathNames) {
0126 ModuleDescription md(
0127 pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
0128
0129 areg->preModuleConstructionSignal_(md);
0130 bool postCalled = false;
0131
0132 CMS_SA_ALLOW try {
0133 maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
0134 static_cast<Maker const*>(nullptr));
0135 holder.setModuleDescription(md);
0136 holder.registerProductsAndCallbacks(&preg);
0137 pathStatusInserters.emplace_back(holder.module());
0138 postCalled = true;
0139
0140 areg->postModuleConstructionSignal_(md);
0141 } catch (...) {
0142 if (!postCalled) {
0143 CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0144
0145 }
0146 }
0147 throw;
0148 }
0149 }
0150 }
0151
0152 typedef std::vector<std::string> vstring;
0153
0154 void processSwitchProducers(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
0155
0156 struct BranchesCases {
0157 BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0158 std::vector<BranchKey> chosenBranches;
0159 std::vector<std::string> caseLabels;
0160 };
0161 std::map<std::string, BranchesCases> switchMap;
0162 for (auto& prod : preg.productListUpdator()) {
0163 if (prod.second.isSwitchAlias()) {
0164 auto it = switchMap.find(prod.second.moduleLabel());
0165 if (it == switchMap.end()) {
0166 auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0167 auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0168 switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0169 assert(inserted.second);
0170 it = inserted.first;
0171 }
0172
0173 bool found = false;
0174 for (auto const& productIter : preg.productList()) {
0175 BranchKey const& branchKey = productIter.first;
0176
0177
0178
0179
0180 if (branchKey.processName() != processName) {
0181 continue;
0182 }
0183
0184 BranchDescription const& desc = productIter.second;
0185 if (desc.branchType() == prod.second.branchType() and
0186 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0187 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0188 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0189 prod.second.setSwitchAliasForBranch(desc);
0190 if (!prod.second.transient()) {
0191 it->second.chosenBranches.push_back(prod.first);
0192 }
0193 found = true;
0194 }
0195 }
0196 if (not found) {
0197 Exception ex(errors::LogicError);
0198 ex << "Trying to find a BranchDescription to be aliased-for by SwitchProducer with\n"
0199 << " friendly class name = " << prod.second.friendlyClassName() << "\n"
0200 << " module label = " << prod.second.moduleLabel() << "\n"
0201 << " product instance name = " << prod.second.productInstanceName() << "\n"
0202 << " process name = " << processName
0203 << "\n\nbut did not find any. Please contact a framework developer.";
0204 ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0205 throw ex;
0206 }
0207 }
0208 }
0209 if (switchMap.empty())
0210 return;
0211
0212 for (auto& elem : switchMap) {
0213 std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0214 }
0215
0216 auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0217 std::map<std::string, std::vector<BranchKey>> caseBranches;
0218 for (auto const& item : preg.productList()) {
0219 if (item.first.processName() != processName)
0220 continue;
0221
0222 if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0223 found != caseLabels.end()) {
0224 caseBranches[*found].push_back(item.first);
0225 }
0226 }
0227
0228 for (auto const& caseLabel : caseLabels) {
0229 ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0230 auto& branches = caseBranches[caseLabel];
0231 std::sort(branches.begin(), branches.end());
0232 for (auto const& branch : branches) {
0233 ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0234 }
0235 }
0236 };
0237
0238
0239
0240 std::vector<bool> foundBranches;
0241 for (auto const& switchItem : switchMap) {
0242 auto const& switchLabel = switchItem.first;
0243 auto const& chosenBranches = switchItem.second.chosenBranches;
0244 auto const& caseLabels = switchItem.second.caseLabels;
0245 foundBranches.resize(chosenBranches.size());
0246 for (auto const& caseLabel : caseLabels) {
0247 std::fill(foundBranches.begin(), foundBranches.end(), false);
0248 for (auto& nonConstItem : preg.productListUpdator()) {
0249 auto const& item = nonConstItem;
0250 if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0251
0252
0253 if (!item.second.transient()) {
0254 auto range = std::equal_range(chosenBranches.begin(),
0255 chosenBranches.end(),
0256 BranchKey(item.first.friendlyClassName(),
0257 switchLabel,
0258 item.first.productInstanceName(),
0259 item.first.processName()));
0260 if (range.first == range.second) {
0261 Exception ex(errors::Configuration);
0262 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0263 << item.first << " that is not produced by the chosen case "
0264 << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0265 .getUntrackedParameter<std::string>("@chosen_case")
0266 << " and that product is not transient. "
0267 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0268 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0269 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0270 addProductsToException(caseLabels, ex);
0271 throw ex;
0272 }
0273 assert(std::distance(range.first, range.second) == 1);
0274 foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0275 }
0276
0277
0278
0279
0280
0281
0282
0283
0284
0285 nonConstItem.second.setTransient(true);
0286
0287
0288 auto const& bd = item.second;
0289 if (not bd.branchAliases().empty()) {
0290 auto ex = Exception(errors::UnimplementedFeature)
0291 << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0292 "aliases for SwitchProducer with label "
0293 << switchLabel << " for case " << caseLabel << ":";
0294 for (auto const& branchAlias : bd.branchAliases()) {
0295 ex << " " << branchAlias;
0296 }
0297 throw ex;
0298 }
0299 }
0300 }
0301
0302 for (size_t i = 0; i < chosenBranches.size(); i++) {
0303 if (not foundBranches[i]) {
0304 auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0305 .getUntrackedParameter<std::string>("@chosen_case");
0306 Exception ex(errors::Configuration);
0307 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0308 << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0309 << chosenLabel << " and that product is not transient. "
0310 << "If the intention is to produce only a subset of the non-transient products listed below, each "
0311 "case with more non-transient products needs to be replaced with an EDAlias to only the "
0312 "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0313 addProductsToException(caseLabels, ex);
0314 throw ex;
0315 }
0316 }
0317 }
0318 }
0319 }
0320
0321 void reduceParameterSet(ParameterSet& proc_pset,
0322 vstring const& end_path_name_list,
0323 vstring& modulesInConfig,
0324 std::set<std::string> const& usedModuleLabels,
0325 std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0326
0327
0328
0329
0330
0331
0332
0333
0334
0335
0336
0337 vstring outputModuleLabels;
0338 std::string edmType;
0339 std::string const moduleEdmType("@module_edm_type");
0340 std::string const outputModule("OutputModule");
0341 std::string const edAnalyzer("EDAnalyzer");
0342 std::string const edFilter("EDFilter");
0343 std::string const edProducer("EDProducer");
0344
0345 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0346
0347
0348
0349 vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0350 std::set<std::string> modulesOnPaths;
0351 {
0352 std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0353 for (auto const& endPath : end_path_name_list) {
0354 noEndPaths.erase(endPath);
0355 }
0356 {
0357 vstring labels;
0358 for (auto const& path : noEndPaths) {
0359 labels = proc_pset.getParameter<vstring>(path);
0360 modulesOnPaths.insert(labels.begin(), labels.end());
0361 }
0362 }
0363 }
0364
0365
0366 std::vector<std::string> labelsToBeDropped;
0367 labelsToBeDropped.reserve(modulesInConfigSet.size());
0368 std::set_difference(modulesInConfigSet.begin(),
0369 modulesInConfigSet.end(),
0370 usedModuleLabels.begin(),
0371 usedModuleLabels.end(),
0372 std::back_inserter(labelsToBeDropped));
0373
0374 const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0375 for (auto const& modLabel : usedModuleLabels) {
0376
0377
0378 if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0379 edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0380 if (edmType == outputModule) {
0381 outputModuleLabels.push_back(modLabel);
0382 labelsToBeDropped.push_back(modLabel);
0383 }
0384 if (edmType == edAnalyzer) {
0385 if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0386 labelsToBeDropped.push_back(modLabel);
0387 }
0388 }
0389 }
0390 }
0391
0392 std::inplace_merge(
0393 labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0394
0395
0396 for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0397
0398
0399 vstring::iterator endAfterRemove =
0400 std::remove_if(modulesInConfig.begin(),
0401 modulesInConfig.end(),
0402 std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0403 modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0404 proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0405
0406
0407 vstring endPathsToBeDropped;
0408 vstring labels;
0409 for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0410 iEndPath != endEndPath;
0411 ++iEndPath) {
0412 labels = proc_pset.getParameter<vstring>(*iEndPath);
0413 vstring::iterator iSave = labels.begin();
0414 vstring::iterator iBegin = labels.begin();
0415
0416 for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0417 if (binary_search_string(labelsToBeDropped, *iLabel)) {
0418 if (binary_search_string(outputModuleLabels, *iLabel)) {
0419 outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0420 }
0421 } else {
0422 if (iSave != iLabel) {
0423 iSave->swap(*iLabel);
0424 }
0425 ++iSave;
0426 }
0427 }
0428 labels.erase(iSave, labels.end());
0429 if (labels.empty()) {
0430
0431 proc_pset.eraseSimpleParameter(*iEndPath);
0432 endPathsToBeDropped.push_back(*iEndPath);
0433 } else {
0434 proc_pset.addParameter<vstring>(*iEndPath, labels);
0435 }
0436 }
0437 sort_all(endPathsToBeDropped);
0438
0439
0440 endAfterRemove = std::remove_if(scheduledPaths.begin(),
0441 scheduledPaths.end(),
0442 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0443 scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0444 proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0445
0446
0447 vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0448 endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0449 scheduledEndPaths.end(),
0450 std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0451 scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0452 proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0453 }
0454
0455 class RngEDConsumer : public EDConsumerBase {
0456 public:
0457 explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
0458 Service<RandomNumberGenerator> rng;
0459 if (rng.isAvailable()) {
0460 rng->consumes(consumesCollector());
0461 for (auto const& consumesInfo : this->consumesInfo()) {
0462 typesConsumed.emplace(consumesInfo.type());
0463 }
0464 }
0465 }
0466 };
0467
0468 template <typename F>
0469 auto doCleanup(F&& iF) {
0470 auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0471 CMS_SA_ALLOW try { f(); } catch (...) {
0472 }
0473 if (iPtr) {
0474 iTask.doneWaiting(*iPtr);
0475 }
0476 };
0477 return wrapped;
0478 }
0479 }
0480
0481
0482 typedef std::vector<std::string> vstring;
0483
0484
0485
0486 Schedule::Schedule(ParameterSet& proc_pset,
0487 service::TriggerNamesService const& tns,
0488 ProductRegistry& preg,
0489 ExceptionToActionTable const& actions,
0490 std::shared_ptr<ActivityRegistry> areg,
0491 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0492 PreallocationConfiguration const& prealloc,
0493 ProcessContext const* processContext,
0494 ModuleTypeResolverMaker const* resolverMaker)
0495 :
0496 resultsInserter_{tns.getTrigPaths().empty()
0497 ? std::shared_ptr<TriggerResultInserter>{}
0498 : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
0499 moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
0500 all_output_communicators_(),
0501 preallocConfig_(prealloc),
0502 pathNames_(&tns.getTrigPaths()),
0503 endPathNames_(&tns.getEndPaths()),
0504 wantSummary_(tns.wantSummary()) {
0505 makePathStatusInserters(pathStatusInserters_,
0506 *pathNames_,
0507 prealloc,
0508 preg,
0509 areg,
0510 processConfiguration,
0511 std::string("PathStatusInserter"));
0512
0513 if (endPathNames_->size() > 1) {
0514
0515 makePathStatusInserters(endPathStatusInserters_,
0516 *endPathNames_,
0517 prealloc,
0518 preg,
0519 areg,
0520 processConfiguration,
0521 std::string("EndPathStatusInserter"));
0522 }
0523
0524 assert(0 < prealloc.numberOfStreams());
0525 streamSchedules_.reserve(prealloc.numberOfStreams());
0526 for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0527 streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
0528 pathStatusInserters_,
0529 endPathStatusInserters_,
0530 moduleRegistry(),
0531 proc_pset,
0532 tns,
0533 prealloc,
0534 preg,
0535 actions,
0536 areg,
0537 processConfiguration,
0538 StreamID{i},
0539 processContext));
0540 }
0541
0542
0543
0544 const std::string kTriggerResults("TriggerResults");
0545 std::vector<std::string> modulesToUse;
0546 modulesToUse.reserve(streamSchedules_[0]->allWorkersLumisAndEvents().size());
0547 for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
0548 if (worker->description()->moduleLabel() != kTriggerResults) {
0549 modulesToUse.push_back(worker->description()->moduleLabel());
0550 }
0551 }
0552
0553 unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
0554 if (nUnscheduledModules > 0) {
0555 std::vector<std::string> temp;
0556 temp.reserve(modulesToUse.size());
0557 auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
0558 std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
0559 std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
0560 temp.swap(modulesToUse);
0561 }
0562
0563
0564 globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0565 pathStatusInserters_,
0566 endPathStatusInserters_,
0567 moduleRegistry(),
0568 modulesToUse,
0569 proc_pset,
0570 preg,
0571 prealloc,
0572 actions,
0573 areg,
0574 processConfiguration,
0575 processContext);
0576 }
0577
0578 void Schedule::finishSetup(ParameterSet& proc_pset,
0579 service::TriggerNamesService const& tns,
0580 ProductRegistry& preg,
0581 BranchIDListHelper& branchIDListHelper,
0582 ProcessBlockHelperBase& processBlockHelper,
0583 ThinnedAssociationsHelper& thinnedAssociationsHelper,
0584 SubProcessParentageHelper const* subProcessParentageHelper,
0585 std::shared_ptr<ActivityRegistry> areg,
0586 std::shared_ptr<ProcessConfiguration> processConfiguration,
0587 bool hasSubprocesses,
0588 PreallocationConfiguration const& prealloc,
0589 ProcessContext const* processContext) {
0590
0591
0592 const std::string kTriggerResults("TriggerResults");
0593
0594 std::set<std::string> usedModuleLabels;
0595 for (auto const& worker : allWorkers()) {
0596 if (worker->description()->moduleLabel() != kTriggerResults) {
0597 usedModuleLabels.insert(worker->description()->moduleLabel());
0598 }
0599 }
0600 std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0601 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0602 reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0603 {
0604 std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0605 detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0606 }
0607
0608
0609
0610 {
0611 auto const& unsched = streamSchedules_[0]->unscheduledWorkersLumisAndEvents();
0612 if (not unsched.empty()) {
0613 std::set<std::string> unscheduledModules;
0614 std::transform(unsched.begin(),
0615 unsched.end(),
0616 std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
0617 [](auto worker) { return worker->description()->moduleLabel(); });
0618 preg.setUnscheduledProducts(unscheduledModules);
0619 }
0620 }
0621
0622 processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0623 proc_pset.registerIt();
0624 processConfiguration->setParameterSetID(proc_pset.id());
0625 processConfiguration->setProcessConfigurationID();
0626
0627
0628
0629 size_t all_workers_count = allWorkers().size();
0630
0631 moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0632 auto comm = iHolder->createOutputModuleCommunicator();
0633 if (comm) {
0634 all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0635 }
0636 });
0637
0638 limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
0639
0640
0641
0642 assert(all_workers_count == allWorkers().size());
0643
0644 branchIDListHelper.updateFromRegistry(preg);
0645
0646 for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
0647 worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
0648 }
0649
0650 processBlockHelper.updateForNewProcess(preg, processConfiguration->processName());
0651
0652
0653
0654 for (auto& c : all_output_communicators_) {
0655 c->selectProducts(preg, thinnedAssociationsHelper, processBlockHelper);
0656 }
0657
0658 for (auto& product : preg.productListUpdator()) {
0659 setIsMergeable(product.second);
0660 }
0661
0662 {
0663
0664 std::set<TypeID> productTypesConsumed;
0665 std::set<TypeID> elementTypesConsumed;
0666
0667 for (auto const& worker : allWorkers()) {
0668 for (auto const& consumesInfo : worker->consumesInfo()) {
0669 if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0670 productTypesConsumed.emplace(consumesInfo.type());
0671 } else {
0672 elementTypesConsumed.emplace(consumesInfo.type());
0673 }
0674 }
0675 }
0676
0677 if (hasSubprocesses) {
0678 productTypesConsumed.emplace(typeid(TriggerResults));
0679 }
0680
0681 { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
0682 preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0683 }
0684
0685 for (auto& c : all_output_communicators_) {
0686 c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
0687 }
0688
0689 if (wantSummary_) {
0690 std::vector<const ModuleDescription*> modDesc;
0691 const auto& workers = allWorkers();
0692 modDesc.reserve(workers.size());
0693
0694 std::transform(workers.begin(),
0695 workers.end(),
0696 std::back_inserter(modDesc),
0697 [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0698
0699
0700 summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0701 auto timeKeeperPtr = summaryTimeKeeper_.get();
0702
0703 areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0704
0705 areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0706 areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0707 areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0708 areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0709 areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0710 areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0711
0712 areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0713 areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0714
0715 areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0716 areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0717
0718 areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0719 areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0720
0721
0722
0723 }
0724
0725 }
0726
0727 void Schedule::limitOutput(ParameterSet const& proc_pset,
0728 BranchIDLists const& branchIDLists,
0729 SubProcessParentageHelper const* subProcessParentageHelper) {
0730 std::string const output("output");
0731
0732 ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0733 int maxEventSpecs = 0;
0734 int maxEventsOut = -1;
0735 ParameterSet const* vMaxEventsOut = nullptr;
0736 std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0737 if (search_all(intNamesE, output)) {
0738 maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0739 ++maxEventSpecs;
0740 }
0741 std::vector<std::string> psetNamesE;
0742 maxEventsPSet.getParameterSetNames(psetNamesE, false);
0743 if (search_all(psetNamesE, output)) {
0744 vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0745 ++maxEventSpecs;
0746 }
0747
0748 if (maxEventSpecs > 1) {
0749 throw Exception(errors::Configuration)
0750 << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0751 }
0752
0753 for (auto& c : all_output_communicators_) {
0754 OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
0755 if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0756 std::string const& moduleLabel = c->description().moduleLabel();
0757 try {
0758 desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0759 } catch (Exception const&) {
0760 throw Exception(errors::Configuration)
0761 << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0762 }
0763 }
0764 c->configure(desc);
0765 }
0766 }
0767
0768 bool Schedule::terminate() const {
0769 if (all_output_communicators_.empty()) {
0770 return false;
0771 }
0772 for (auto& c : all_output_communicators_) {
0773 if (!c->limitReached()) {
0774
0775 return false;
0776 }
0777 }
0778 LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0779 << "has reached its configured limit.\n";
0780 return true;
0781 }
0782
0783 void Schedule::endJob(ExceptionCollector& collector) {
0784 globalSchedule_->endJob(collector);
0785 if (collector.hasThrown()) {
0786 return;
0787 }
0788
0789 if (wantSummary_ == false)
0790 return;
0791
0792
0793
0794 auto logForEach = [](auto const& iContainer, auto iMessage) {
0795 auto logger = LogFwkVerbatim("FwkSummary");
0796 int count = 0;
0797 for (auto const& element : iContainer) {
0798 iMessage(logger, element);
0799 if (++count == 10) {
0800 logger = LogFwkVerbatim("FwkSummary");
0801 count = 0;
0802 } else {
0803 logger << "\n";
0804 }
0805 }
0806 };
0807
0808 {
0809 TriggerReport tr;
0810 getTriggerReport(tr);
0811
0812
0813
0814 LogFwkVerbatim("FwkSummary") << "";
0815 if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
0816 LogFwkVerbatim("FwkSummary") << "TrigReport Process: "
0817 << streamSchedules_[0]->context().processContext()->processName();
0818 }
0819 LogFwkVerbatim("FwkSummary") << "TrigReport "
0820 << "---------- Event Summary ------------";
0821 if (!tr.trigPathSummaries.empty()) {
0822 LogFwkVerbatim("FwkSummary") << "TrigReport"
0823 << " Events total = " << tr.eventSummary.totalEvents
0824 << " passed = " << tr.eventSummary.totalEventsPassed
0825 << " failed = " << tr.eventSummary.totalEventsFailed << "";
0826 } else {
0827 LogFwkVerbatim("FwkSummary") << "TrigReport"
0828 << " Events total = " << tr.eventSummary.totalEvents
0829 << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0830 }
0831
0832 LogFwkVerbatim("FwkSummary") << "";
0833 LogFwkVerbatim("FwkSummary") << "TrigReport "
0834 << "---------- Path Summary ------------";
0835 LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0836 << " " << std::right << std::setw(10) << "Executed"
0837 << " " << std::right << std::setw(10) << "Passed"
0838 << " " << std::right << std::setw(10) << "Failed"
0839 << " " << std::right << std::setw(10) << "Error"
0840 << " "
0841 << "Name"
0842 << "";
0843 logForEach(tr.trigPathSummaries, [](auto& logger, auto const& p) {
0844 logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << p.bitPosition << " "
0845 << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0846 << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0847 << p.timesExcept << " " << p.name;
0848 });
0849
0850
0851
0852
0853
0854
0855
0856
0857
0858
0859
0860
0861
0862
0863
0864
0865
0866
0867 LogFwkVerbatim("FwkSummary") << "\n"
0868 << "TrigReport "
0869 << "-------End-Path Summary ------------\n"
0870 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0871 << " " << std::right << std::setw(10) << "Executed"
0872 << " " << std::right << std::setw(10) << "Passed"
0873 << " " << std::right << std::setw(10) << "Failed"
0874 << " " << std::right << std::setw(10) << "Error"
0875 << " "
0876 << "Name";
0877 logForEach(tr.endPathSummaries, [](auto& logger, auto const& p) {
0878 logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << p.bitPosition << " "
0879 << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0880 << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0881 << p.timesExcept << " " << p.name;
0882 });
0883
0884 for (auto const& p : tr.trigPathSummaries) {
0885 LogFwkVerbatim("FwkSummary") << "\n"
0886 << "TrigReport "
0887 << "---------- Modules in Path: " << p.name << " ------------\n"
0888 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0889 << " " << std::right << std::setw(10) << "Visited"
0890 << " " << std::right << std::setw(10) << "Passed"
0891 << " " << std::right << std::setw(10) << "Failed"
0892 << " " << std::right << std::setw(10) << "Error"
0893 << " "
0894 << "Name";
0895
0896 logForEach(p.moduleInPathSummaries, [](auto& logger, auto const& mod) {
0897 logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << mod.bitPosition
0898 << " " << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0899 << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0900 << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0901 });
0902 }
0903
0904 for (auto const& p : tr.endPathSummaries) {
0905 LogFwkVerbatim("FwkSummary") << "\n"
0906 << "TrigReport "
0907 << "------ Modules in End-Path: " << p.name << " ------------\n"
0908 << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0909 << " " << std::right << std::setw(10) << "Visited"
0910 << " " << std::right << std::setw(10) << "Passed"
0911 << " " << std::right << std::setw(10) << "Failed"
0912 << " " << std::right << std::setw(10) << "Error"
0913 << " "
0914 << "Name";
0915
0916 unsigned int bitpos = 0;
0917 logForEach(p.moduleInPathSummaries, [&bitpos](auto& logger, auto const& mod) {
0918 logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << bitpos << " "
0919 << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0920 << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0921 << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0922 ++bitpos;
0923 });
0924 }
0925
0926 LogFwkVerbatim("FwkSummary") << "\n"
0927 << "TrigReport "
0928 << "---------- Module Summary ------------\n"
0929 << "TrigReport " << std::right << std::setw(10) << "Visited"
0930 << " " << std::right << std::setw(10) << "Executed"
0931 << " " << std::right << std::setw(10) << "Passed"
0932 << " " << std::right << std::setw(10) << "Failed"
0933 << " " << std::right << std::setw(10) << "Error"
0934 << " "
0935 << "Name";
0936
0937 logForEach(tr.workerSummaries, [](auto& logger, auto const& worker) {
0938 logger << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " " << std::right
0939 << std::setw(10) << worker.timesRun << " " << std::right << std::setw(10) << worker.timesPassed << " "
0940 << std::right << std::setw(10) << worker.timesFailed << " " << std::right << std::setw(10)
0941 << worker.timesExcept << " " << worker.moduleLabel;
0942 });
0943 LogFwkVerbatim("FwkSummary") << "";
0944 }
0945
0946 TriggerTimingReport tr;
0947 getTriggerTimingReport(tr);
0948
0949 const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0950
0951 LogFwkVerbatim("FwkSummary") << "TimeReport "
0952 << "---------- Event Summary ---[sec]----";
0953 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0954 << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0955 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0956 << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0957 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0958 << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0959 LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0960 << " efficiency CPU/Real/thread = "
0961 << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0962 preallocConfig_.numberOfThreads();
0963
0964 constexpr int kColumn1Size = 10;
0965 constexpr int kColumn2Size = 12;
0966 constexpr int kColumn3Size = 12;
0967 LogFwkVerbatim("FwkSummary") << "";
0968 LogFwkVerbatim("FwkSummary") << "TimeReport "
0969 << "---------- Path Summary ---[Real sec]----";
0970 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0971 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0972 << " Name";
0973 logForEach(tr.trigPathSummaries, [&](auto& logger, auto const& p) {
0974 const int timesRun = std::max(1, p.timesRun);
0975 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0976 << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0977 << " " << p.name;
0978 });
0979 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0980 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0981 << " Name";
0982
0983 LogFwkVerbatim("FwkSummary") << "\n"
0984 << "TimeReport "
0985 << "-------End-Path Summary ---[Real sec]----\n"
0986 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0987 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0988 << " Name";
0989 logForEach(tr.endPathSummaries, [&](auto& logger, auto const& p) {
0990 const int timesRun = std::max(1, p.timesRun);
0991
0992 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0993 << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0994 << " " << p.name;
0995 });
0996 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0997 << " " << std::right << std::setw(kColumn2Size) << "per exec"
0998 << " Name";
0999
1000 for (auto const& p : tr.trigPathSummaries) {
1001 LogFwkVerbatim("FwkSummary") << "\n"
1002 << "TimeReport "
1003 << "---------- Modules in Path: " << p.name << " ---[Real sec]----\n"
1004 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1005 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1006 << " Name";
1007 logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1008 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1009 << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1010 << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
1011 });
1012 }
1013 if (not tr.trigPathSummaries.empty()) {
1014 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1015 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1016 << " Name";
1017 }
1018 for (auto const& p : tr.endPathSummaries) {
1019 LogFwkVerbatim("FwkSummary") << "\n"
1020 << "TimeReport "
1021 << "------ Modules in End-Path: " << p.name << " ---[Real sec]----\n"
1022 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1023 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1024 << " Name";
1025 logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1026 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1027 << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1028 << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
1029 });
1030 }
1031 if (not tr.endPathSummaries.empty()) {
1032 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1033 << " " << std::right << std::setw(kColumn2Size) << "per visit"
1034 << " Name";
1035 }
1036 LogFwkVerbatim("FwkSummary") << "\n"
1037 << "TimeReport "
1038 << "---------- Module Summary ---[Real sec]----\n"
1039 << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1040 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1041 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1042 << " Name";
1043 logForEach(tr.workerSummaries, [&](auto& logger, auto const& worker) {
1044 logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1045 << worker.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1046 << worker.realTime / std::max(1, worker.timesRun) << " " << std::right << std::setw(kColumn3Size)
1047 << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel;
1048 });
1049 LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1050 << " " << std::right << std::setw(kColumn2Size) << "per exec"
1051 << " " << std::right << std::setw(kColumn3Size) << "per visit"
1052 << " Name";
1053
1054 LogFwkVerbatim("FwkSummary") << "\nT---Report end!\n\n";
1055 }
1056
1057 void Schedule::closeOutputFiles() {
1058 using std::placeholders::_1;
1059 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
1060 for (auto& worker : allWorkers()) {
1061 worker->respondToCloseOutputFile();
1062 }
1063 }
1064
1065 void Schedule::openOutputFiles(FileBlock& fb) {
1066 using std::placeholders::_1;
1067 for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1068 }
1069
1070 void Schedule::writeRunAsync(WaitingTaskHolder task,
1071 RunPrincipal const& rp,
1072 ProcessContext const* processContext,
1073 ActivityRegistry* activityRegistry,
1074 MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1075 auto token = ServiceRegistry::instance().presentToken();
1076 GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
1077 LuminosityBlockID(rp.run(), 0),
1078 rp.index(),
1079 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1080 rp.endTime(),
1081 processContext);
1082
1083 using namespace edm::waiting_task;
1084 chain::first([&](auto nextTask) {
1085
1086 ServiceRegistry::Operate op(token);
1087
1088
1089 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1090 }
1091 for (auto& c : all_output_communicators_) {
1092 c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1093 }
1094 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1095
1096 ServiceRegistry::Operate op(token);
1097
1098 activityRegistry->postGlobalWriteRunSignal_(globalContext);
1099 })) |
1100 chain::runLast(task);
1101 }
1102
1103 void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
1104 ProcessBlockPrincipal const& pbp,
1105 ProcessContext const* processContext,
1106 ActivityRegistry* activityRegistry) {
1107 auto token = ServiceRegistry::instance().presentToken();
1108 GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
1109 LuminosityBlockID(),
1110 RunIndex::invalidRunIndex(),
1111 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1112 Timestamp::invalidTimestamp(),
1113 processContext);
1114
1115 using namespace edm::waiting_task;
1116 chain::first([&](auto nextTask) {
1117
1118 ServiceRegistry::Operate op(token);
1119 CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1120 }
1121 for (auto& c : all_output_communicators_) {
1122 c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1123 }
1124 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1125
1126 ServiceRegistry::Operate op(token);
1127
1128 activityRegistry->postWriteProcessBlockSignal_(globalContext);
1129 })) |
1130 chain::runLast(std::move(task));
1131 }
1132
1133 void Schedule::writeLumiAsync(WaitingTaskHolder task,
1134 LuminosityBlockPrincipal const& lbp,
1135 ProcessContext const* processContext,
1136 ActivityRegistry* activityRegistry) {
1137 auto token = ServiceRegistry::instance().presentToken();
1138 GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1139 lbp.id(),
1140 lbp.runPrincipal().index(),
1141 lbp.index(),
1142 lbp.beginTime(),
1143 processContext);
1144
1145 using namespace edm::waiting_task;
1146 chain::first([&](auto nextTask) {
1147 ServiceRegistry::Operate op(token);
1148 CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1149 }
1150 for (auto& c : all_output_communicators_) {
1151 c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1152 }
1153 }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1154
1155 ServiceRegistry::Operate op(token);
1156
1157 activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1158 })) |
1159 chain::runLast(task);
1160 }
1161
1162 bool Schedule::shouldWeCloseOutput() const {
1163 using std::placeholders::_1;
1164
1165 return (std::find_if(all_output_communicators_.begin(),
1166 all_output_communicators_.end(),
1167 std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1168 all_output_communicators_.end());
1169 }
1170
1171 void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1172 using std::placeholders::_1;
1173 for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1174 }
1175
1176 void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1177 using std::placeholders::_1;
1178 for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1179 }
1180
1181 void Schedule::beginJob(ProductRegistry const& iRegistry,
1182 eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
1183 ProcessBlockHelperBase const& processBlockHelperBase) {
1184 globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase);
1185 }
1186
1187 void Schedule::beginStream(unsigned int iStreamID) {
1188 assert(iStreamID < streamSchedules_.size());
1189 streamSchedules_[iStreamID]->beginStream();
1190 }
1191
1192 void Schedule::endStream(unsigned int iStreamID) {
1193 assert(iStreamID < streamSchedules_.size());
1194 streamSchedules_[iStreamID]->endStream();
1195 }
1196
1197 void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1198 unsigned int iStreamID,
1199 EventTransitionInfo& info,
1200 ServiceToken const& token) {
1201 assert(iStreamID < streamSchedules_.size());
1202 streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1203 }
1204
1205 bool Schedule::changeModule(std::string const& iLabel,
1206 ParameterSet const& iPSet,
1207 const ProductRegistry& iRegistry,
1208 eventsetup::ESRecordsToProductResolverIndices const& iIndices) {
1209 Worker* found = nullptr;
1210 for (auto const& worker : allWorkers()) {
1211 if (worker->description()->moduleLabel() == iLabel) {
1212 found = worker;
1213 break;
1214 }
1215 }
1216 if (nullptr == found) {
1217 return false;
1218 }
1219
1220 auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1221
1222 globalSchedule_->replaceModule(newMod, iLabel);
1223
1224 for (auto& s : streamSchedules_) {
1225 s->replaceModule(newMod, iLabel);
1226 }
1227
1228 {
1229
1230 auto const processBlockLookup = iRegistry.productLookup(InProcess);
1231 auto const runLookup = iRegistry.productLookup(InRun);
1232 auto const lumiLookup = iRegistry.productLookup(InLumi);
1233 auto const eventLookup = iRegistry.productLookup(InEvent);
1234 found->updateLookup(InProcess, *runLookup);
1235 found->updateLookup(InRun, *runLookup);
1236 found->updateLookup(InLumi, *lumiLookup);
1237 found->updateLookup(InEvent, *eventLookup);
1238 found->updateLookup(iIndices);
1239
1240 auto const& processName = newMod->moduleDescription().processName();
1241 auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1242 auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1243 auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1244 auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1245 found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1246 found->resolvePutIndicies(InRun, runModuleToIndicies);
1247 found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1248 found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1249 }
1250
1251 return true;
1252 }
1253
1254 void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1255 globalSchedule_->deleteModule(iLabel);
1256 for (auto& stream : streamSchedules_) {
1257 stream->deleteModule(iLabel);
1258 }
1259 moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1260 }
1261
1262 void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1263 std::multimap<std::string, std::string> const& referencesToBranches,
1264 std::vector<std::string> const& modulesToSkip,
1265 edm::ProductRegistry const& preg) {
1266 for (auto& stream : streamSchedules_) {
1267 stream->initializeEarlyDelete(
1268 *moduleRegistry(), branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1269 }
1270 }
1271
1272 std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1273 std::vector<ModuleDescription const*> result;
1274 result.reserve(allWorkers().size());
1275
1276 for (auto const& worker : allWorkers()) {
1277 ModuleDescription const* p = worker->description();
1278 result.push_back(p);
1279 }
1280 return result;
1281 }
1282
1283 Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1284
1285 void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1286 for (auto const& worker : allWorkers()) {
1287 worker->convertCurrentProcessAlias(processName);
1288 }
1289 }
1290
1291 void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1292 streamSchedules_[0]->availablePaths(oLabelsToFill);
1293 }
1294
1295 void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1296
1297 void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1298
1299 void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1300 streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1301 }
1302
1303 void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1304 std::vector<ModuleDescription const*>& descriptions,
1305 unsigned int hint) const {
1306 streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1307 }
1308
1309 void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1310 std::vector<ModuleDescription const*>& descriptions,
1311 unsigned int hint) const {
1312 streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1313 }
1314
1315 void Schedule::fillModuleAndConsumesInfo(
1316 std::vector<ModuleDescription const*>& allModuleDescriptions,
1317 std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
1318 std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>& modulesWhoseProductsAreConsumedBy,
1319 std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
1320 ProductRegistry const& preg) const {
1321 allModuleDescriptions.clear();
1322 moduleIDToIndex.clear();
1323 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1324 modulesWhoseProductsAreConsumedBy[iBranchType].clear();
1325 }
1326 modulesInPreviousProcessesWhoseProductsAreConsumedBy.clear();
1327
1328 allModuleDescriptions.reserve(allWorkers().size());
1329 moduleIDToIndex.reserve(allWorkers().size());
1330 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1331 modulesWhoseProductsAreConsumedBy[iBranchType].resize(allWorkers().size());
1332 }
1333 modulesInPreviousProcessesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1334
1335 std::map<std::string, ModuleDescription const*> labelToDesc;
1336 unsigned int i = 0;
1337 for (auto const& worker : allWorkers()) {
1338 ModuleDescription const* p = worker->description();
1339 allModuleDescriptions.push_back(p);
1340 moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1341 labelToDesc[p->moduleLabel()] = p;
1342 ++i;
1343 }
1344 sort_all(moduleIDToIndex);
1345
1346 i = 0;
1347 for (auto const& worker : allWorkers()) {
1348 std::array<std::vector<ModuleDescription const*>*, NumBranchTypes> modules;
1349 for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1350 modules[iBranchType] = &modulesWhoseProductsAreConsumedBy[iBranchType].at(i);
1351 }
1352
1353 std::vector<ModuleProcessName>& modulesInPreviousProcesses =
1354 modulesInPreviousProcessesWhoseProductsAreConsumedBy.at(i);
1355 try {
1356 worker->modulesWhoseProductsAreConsumed(modules, modulesInPreviousProcesses, preg, labelToDesc);
1357 } catch (cms::Exception& ex) {
1358 ex.addContext("Calling Worker::modulesWhoseProductsAreConsumed() for module " +
1359 worker->description()->moduleLabel());
1360 throw;
1361 }
1362 ++i;
1363 }
1364 }
1365
1366 void Schedule::getTriggerReport(TriggerReport& rep) const {
1367 rep.eventSummary.totalEvents = 0;
1368 rep.eventSummary.totalEventsPassed = 0;
1369 rep.eventSummary.totalEventsFailed = 0;
1370 for (auto& s : streamSchedules_) {
1371 s->getTriggerReport(rep);
1372 }
1373 sort_all(rep.workerSummaries);
1374 }
1375
1376 void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1377 rep.eventSummary.totalEvents = 0;
1378 rep.eventSummary.cpuTime = 0.;
1379 rep.eventSummary.realTime = 0.;
1380 summaryTimeKeeper_->fillTriggerTimingReport(rep);
1381 }
1382
1383 int Schedule::totalEvents() const {
1384 int returnValue = 0;
1385 for (auto& s : streamSchedules_) {
1386 returnValue += s->totalEvents();
1387 }
1388 return returnValue;
1389 }
1390
1391 int Schedule::totalEventsPassed() const {
1392 int returnValue = 0;
1393 for (auto& s : streamSchedules_) {
1394 returnValue += s->totalEventsPassed();
1395 }
1396 return returnValue;
1397 }
1398
1399 int Schedule::totalEventsFailed() const {
1400 int returnValue = 0;
1401 for (auto& s : streamSchedules_) {
1402 returnValue += s->totalEventsFailed();
1403 }
1404 return returnValue;
1405 }
1406
1407 void Schedule::clearCounters() {
1408 for (auto& s : streamSchedules_) {
1409 s->clearCounters();
1410 }
1411 }
1412 }