File indexing completed on 2022-09-30 01:18:08
0001 #include "FWCore/Framework/interface/StreamSchedule.h"
0002
0003 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0004 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0005 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0007 #include "FWCore/Framework/src/OutputModuleDescription.h"
0008 #include "FWCore/Framework/interface/TriggerNamesService.h"
0009 #include "FWCore/Framework/src/TriggerReport.h"
0010 #include "FWCore/Framework/src/TriggerTimingReport.h"
0011 #include "FWCore/Framework/src/Factory.h"
0012 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0013 #include "FWCore/Framework/src/TriggerResultInserter.h"
0014 #include "FWCore/Framework/src/PathStatusInserter.h"
0015 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0016 #include "FWCore/Framework/interface/WorkerInPath.h"
0017 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0018 #include "FWCore/Framework/interface/maker/WorkerT.h"
0019 #include "FWCore/Framework/interface/ModuleRegistry.h"
0020 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 #include "FWCore/ParameterSet/interface/Registry.h"
0024 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0025 #include "FWCore/Utilities/interface/Algorithms.h"
0026 #include "FWCore/Utilities/interface/ConvertException.h"
0027 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0028 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0029
0030 #include "LuminosityBlockProcessingStatus.h"
0031 #include "processEDAliases.h"
0032
0033 #include <algorithm>
0034 #include <cassert>
0035 #include <cstdlib>
0036 #include <functional>
0037 #include <iomanip>
0038 #include <limits>
0039 #include <list>
0040 #include <map>
0041 #include <exception>
0042 #include <fmt/format.h>
0043
0044 namespace edm {
0045
0046 namespace {
0047
0048
0049
0050
0051
0052 template <typename InputIterator, typename ForwardIterator, typename Func>
0053 void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
0054 for (; begin != end; ++begin, ++out)
0055 func(*begin, *out);
0056 }
0057
0058
0059
0060
0061
0062
0063 template <typename FROM, typename TO, typename FUNC>
0064 void fill_summary(FROM const& from, TO& to, FUNC func) {
0065 if (to.size() != from.size()) {
0066 TO temp(from.size());
0067 transform_into(from.begin(), from.end(), temp.begin(), func);
0068 to.swap(temp);
0069 } else {
0070 transform_into(from.begin(), from.end(), to.begin(), func);
0071 }
0072 }
0073
0074
0075
0076
0077
0078
0079 StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
0080 std::shared_ptr<ActivityRegistry> areg,
0081 std::shared_ptr<TriggerResultInserter> inserter) {
0082 StreamSchedule::WorkerPtr ptr(
0083 new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
0084 ptr->setActivityRegistry(areg);
0085 return ptr;
0086 }
0087
0088 void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
0089 ProductRegistry const& preg,
0090 std::multimap<std::string, Worker*>& branchToReadingWorker) {
0091 auto vBranchesToDeleteEarly = branchesToDeleteEarly;
0092
0093 std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
0094 vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
0095 vBranchesToDeleteEarly.end());
0096
0097
0098 auto allBranchNames = preg.allBranchNames();
0099
0100 for (auto& b : allBranchNames) {
0101 b.resize(b.size() - 1);
0102 }
0103 std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
0104 std::vector<std::string> temp;
0105 temp.reserve(vBranchesToDeleteEarly.size());
0106
0107 std::set_intersection(vBranchesToDeleteEarly.begin(),
0108 vBranchesToDeleteEarly.end(),
0109 allBranchNames.begin(),
0110 allBranchNames.end(),
0111 std::back_inserter(temp));
0112 vBranchesToDeleteEarly.swap(temp);
0113 if (temp.size() != vBranchesToDeleteEarly.size()) {
0114 std::vector<std::string> missingProducts;
0115 std::set_difference(temp.begin(),
0116 temp.end(),
0117 vBranchesToDeleteEarly.begin(),
0118 vBranchesToDeleteEarly.end(),
0119 std::back_inserter(missingProducts));
0120 LogInfo l("MissingProductsForCanDeleteEarly");
0121 l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
0122 for (auto const& n : missingProducts) {
0123 l << "\n " << n;
0124 }
0125 }
0126
0127
0128 for (auto const& branch : vBranchesToDeleteEarly) {
0129 branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
0130 }
0131 }
0132
0133 Worker* getWorker(std::string const& moduleLabel,
0134 ParameterSet& proc_pset,
0135 WorkerManager& workerManager,
0136 ProductRegistry& preg,
0137 PreallocationConfiguration const* prealloc,
0138 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0139 bool isTracked;
0140 ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0141 if (modpset == nullptr) {
0142 return nullptr;
0143 }
0144 assert(isTracked);
0145
0146 return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
0147 }
0148
0149
0150
0151
0152
0153
0154
0155 template <typename T>
0156 auto findConditionalTaskModulesRange(T& modnames) {
0157 auto beg = std::find(modnames.begin(), modnames.end(), "#");
0158 if (beg == modnames.end()) {
0159 return std::pair(modnames.end(), modnames.end());
0160 }
0161 return std::pair(beg + 1, std::prev(modnames.end()));
0162 }
0163
0164 std::optional<std::string> findBestMatchingAlias(
0165 std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0166 std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
0167 std::string const& productModuleLabel,
0168 ConsumesInfo const& consumesInfo) {
0169 std::optional<std::string> best;
0170 int wildcardsInBest = std::numeric_limits<int>::max();
0171 bool bestIsAmbiguous = false;
0172
0173 auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
0174 std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
0175 int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
0176 if (wildcards == 0) {
0177 bestIsAmbiguous = false;
0178 return true;
0179 }
0180 if (not best or wildcards < wildcardsInBest) {
0181 best = label;
0182 wildcardsInBest = wildcards;
0183 bestIsAmbiguous = false;
0184 } else if (best and *best != label and wildcardsInBest == wildcards) {
0185 bestIsAmbiguous = true;
0186 }
0187 return false;
0188 };
0189
0190 auto findAlias = aliasMap.equal_range(productModuleLabel);
0191 for (auto it = findAlias.first; it != findAlias.second; ++it) {
0192 std::string const& aliasInstanceLabel =
0193 it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
0194 bool const instanceIsWildcard = (aliasInstanceLabel == "*");
0195 if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
0196 bool const typeIsWildcard = it->second.friendlyClassName == "*";
0197 if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
0198 if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0199 return it->second.originalModuleLabel;
0200 }
0201 } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
0202
0203
0204 auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
0205 for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
0206 if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
0207 if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
0208 TypeID(itBranch->second->wrappedType().typeInfo()),
0209 itBranch->second->className())) {
0210 if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0211 return it->second.originalModuleLabel;
0212 }
0213 }
0214 }
0215 }
0216 }
0217 }
0218 }
0219 if (bestIsAmbiguous) {
0220 throw Exception(errors::UnimplementedFeature)
0221 << "Encountered ambiguity when trying to find a best-matching alias for\n"
0222 << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
0223 << " module label " << productModuleLabel << "\n"
0224 << " product instance name " << consumesInfo.instance() << "\n"
0225 << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
0226 "wildcards ("
0227 << wildcardsInBest << ")";
0228 }
0229 return best;
0230 }
0231 }
0232
0233
0234
0235 typedef std::vector<std::string> vstring;
0236
0237
0238
0239 class ConditionalTaskHelper {
0240 public:
0241 using AliasInfo = StreamSchedule::AliasInfo;
0242
0243 ConditionalTaskHelper(ParameterSet& proc_pset,
0244 ProductRegistry& preg,
0245 PreallocationConfiguration const* prealloc,
0246 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0247 WorkerManager& workerManager,
0248 std::vector<std::string> const& trigPathNames) {
0249 std::unordered_set<std::string> allConditionalMods;
0250 for (auto const& pathName : trigPathNames) {
0251 auto const modnames = proc_pset.getParameter<vstring>(pathName);
0252
0253
0254 auto condRange = findConditionalTaskModulesRange(modnames);
0255 if (condRange.first == condRange.second)
0256 continue;
0257
0258
0259 allConditionalMods.insert(condRange.first, condRange.second);
0260 }
0261
0262 for (auto const& cond : allConditionalMods) {
0263
0264 (void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
0265 }
0266
0267 fillAliasMap(proc_pset, allConditionalMods);
0268 processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
0269
0270
0271 for (auto const& prod : preg.productList()) {
0272 if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
0273 conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
0274 }
0275 }
0276 }
0277
0278 std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
0279
0280 std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
0281 std::unordered_set<std::string> const& conditionalmods) const {
0282 std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
0283 for (auto const& mod : conditionalmods) {
0284 auto range = conditionalModsBranches_.equal_range(mod);
0285 ret.insert(range.first, range.second);
0286 }
0287 return ret;
0288 }
0289
0290 private:
0291 void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
0292 auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0293 std::string const star("*");
0294 for (auto const& alias : aliases) {
0295 auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
0296 auto aliasedToModuleLabels = info.getParameterNames();
0297 for (auto const& mod : aliasedToModuleLabels) {
0298 if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
0299 auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
0300 for (auto const& aliasPSet : aliasVPSet) {
0301 std::string type = star;
0302 std::string instance = star;
0303 std::string originalInstance = star;
0304 if (aliasPSet.exists("type")) {
0305 type = aliasPSet.getParameter<std::string>("type");
0306 }
0307 if (aliasPSet.exists("toProductInstance")) {
0308 instance = aliasPSet.getParameter<std::string>("toProductInstance");
0309 }
0310 if (aliasPSet.exists("fromProductInstance")) {
0311 originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
0312 }
0313
0314 aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
0315 }
0316 }
0317 }
0318 }
0319 }
0320
0321 void processSwitchEDAliases(ParameterSet const& proc_pset,
0322 ProductRegistry& preg,
0323 ProcessConfiguration const& processConfiguration,
0324 std::unordered_set<std::string> const& allConditionalMods) {
0325 auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
0326 std::vector<std::string> switchEDAliases;
0327 for (auto const& module : all_modules) {
0328 auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
0329 if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
0330 auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
0331 for (auto const& case_label : all_cases) {
0332 auto range = aliasMap_.equal_range(case_label);
0333 if (range.first != range.second) {
0334 switchEDAliases.push_back(case_label);
0335 }
0336 }
0337 }
0338 }
0339 detail::processEDAliases(
0340 switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
0341 }
0342
0343 std::unordered_multimap<std::string, AliasInfo> aliasMap_;
0344 std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
0345 };
0346
0347
0348
0349 StreamSchedule::StreamSchedule(
0350 std::shared_ptr<TriggerResultInserter> inserter,
0351 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0352 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0353 std::shared_ptr<ModuleRegistry> modReg,
0354 ParameterSet& proc_pset,
0355 service::TriggerNamesService const& tns,
0356 PreallocationConfiguration const& prealloc,
0357 ProductRegistry& preg,
0358 ExceptionToActionTable const& actions,
0359 std::shared_ptr<ActivityRegistry> areg,
0360 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0361 StreamID streamID,
0362 ProcessContext const* processContext)
0363 : workerManager_(modReg, areg, actions),
0364 actReg_(areg),
0365 results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
0366 results_inserter_(),
0367 trig_paths_(),
0368 end_paths_(),
0369 total_events_(),
0370 total_passed_(),
0371 number_of_unscheduled_modules_(0),
0372 streamID_(streamID),
0373 streamContext_(streamID_, processContext),
0374 skippingEvent_(false) {
0375 bool hasPath = false;
0376 std::vector<std::string> const& pathNames = tns.getTrigPaths();
0377 std::vector<std::string> const& endPathNames = tns.getEndPaths();
0378
0379 ConditionalTaskHelper conditionalTaskHelper(
0380 proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
0381
0382 int trig_bitpos = 0;
0383 trig_paths_.reserve(pathNames.size());
0384 for (auto const& trig_name : pathNames) {
0385 fillTrigPath(proc_pset,
0386 preg,
0387 &prealloc,
0388 processConfiguration,
0389 trig_bitpos,
0390 trig_name,
0391 results(),
0392 endPathNames,
0393 conditionalTaskHelper);
0394 ++trig_bitpos;
0395 hasPath = true;
0396 }
0397
0398 if (hasPath) {
0399
0400 inserter->setTrigResultForStream(streamID.value(), results());
0401
0402 results_inserter_ = makeInserter(actions, actReg_, inserter);
0403 addToAllWorkers(results_inserter_.get());
0404 }
0405
0406
0407 int bitpos = 0;
0408 end_paths_.reserve(endPathNames.size());
0409 for (auto const& end_path_name : endPathNames) {
0410 fillEndPath(
0411 proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
0412 ++bitpos;
0413 }
0414
0415 makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
0416
0417
0418 std::set<std::string> usedWorkerLabels;
0419 for (auto const& worker : allWorkers()) {
0420 usedWorkerLabels.insert(worker->description()->moduleLabel());
0421 }
0422 std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0423 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0424 std::vector<std::string> unusedLabels;
0425 set_difference(modulesInConfigSet.begin(),
0426 modulesInConfigSet.end(),
0427 usedWorkerLabels.begin(),
0428 usedWorkerLabels.end(),
0429 back_inserter(unusedLabels));
0430 std::set<std::string> unscheduledLabels;
0431 std::vector<std::string> shouldBeUsedLabels;
0432 if (!unusedLabels.empty()) {
0433
0434
0435
0436
0437 for (auto const& label : unusedLabels) {
0438 bool isTracked;
0439 ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
0440 assert(isTracked);
0441 assert(modulePSet != nullptr);
0442 workerManager_.addToUnscheduledWorkers(
0443 *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0444 }
0445 if (!shouldBeUsedLabels.empty()) {
0446 std::ostringstream unusedStream;
0447 unusedStream << "'" << shouldBeUsedLabels.front() << "'";
0448 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
0449 itLabelEnd = shouldBeUsedLabels.end();
0450 itLabel != itLabelEnd;
0451 ++itLabel) {
0452 unusedStream << ",'" << *itLabel << "'";
0453 }
0454 LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
0455 }
0456 }
0457 number_of_unscheduled_modules_ = unscheduledLabels.size();
0458 }
0459
0460 void StreamSchedule::initializeEarlyDelete(ModuleRegistry& modReg,
0461 std::vector<std::string> const& branchesToDeleteEarly,
0462 std::multimap<std::string, std::string> const& referencesToBranches,
0463 std::vector<std::string> const& modulesToSkip,
0464 edm::ProductRegistry const& preg) {
0465
0466 std::multimap<std::string, Worker*> branchToReadingWorker;
0467 initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
0468
0469 const std::vector<std::string> kEmpty;
0470 std::map<Worker*, unsigned int> reserveSizeForWorker;
0471 unsigned int upperLimitOnReadingWorker = 0;
0472 unsigned int upperLimitOnIndicies = 0;
0473 unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
0474
0475
0476 modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
0477 auto comm = iHolder->createOutputModuleCommunicator();
0478 if (comm) {
0479 if (!branchToReadingWorker.empty()) {
0480
0481
0482 SelectedProductsForBranchType const& kept = comm->keptProducts();
0483 for (auto const& item : kept[InEvent]) {
0484 BranchDescription const& desc = *item.first;
0485 auto found = branchToReadingWorker.equal_range(desc.branchName());
0486 if (found.first != found.second) {
0487 --nUniqueBranchesToDelete;
0488 branchToReadingWorker.erase(found.first, found.second);
0489 }
0490 }
0491 }
0492 }
0493 });
0494
0495 if (branchToReadingWorker.empty()) {
0496 return;
0497 }
0498
0499 std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
0500 for (auto w : allWorkers()) {
0501 if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
0502 continue;
0503 }
0504
0505 auto consumes = w->consumesInfo();
0506 if (not consumes.empty()) {
0507 bool foundAtLeastOneMatchingBranch = false;
0508 for (auto const& product : consumes) {
0509 std::string branch = fmt::format("{}_{}_{}_{}",
0510 product.type().friendlyClassName(),
0511 product.label().data(),
0512 product.instance().data(),
0513 product.process().data());
0514 {
0515
0516 auto found = branchToReadingWorker.end();
0517 if (product.process().empty()) {
0518 auto startFound = branchToReadingWorker.lower_bound(branch);
0519 if (startFound != branchToReadingWorker.end()) {
0520 if (startFound->first.substr(0, branch.size()) == branch) {
0521
0522 found = startFound;
0523 }
0524 }
0525 } else {
0526 auto exactFound = branchToReadingWorker.equal_range(branch);
0527 if (exactFound.first != exactFound.second) {
0528 found = exactFound.first;
0529 }
0530 }
0531 if (found != branchToReadingWorker.end()) {
0532 if (not foundAtLeastOneMatchingBranch) {
0533 ++upperLimitOnReadingWorker;
0534 foundAtLeastOneMatchingBranch = true;
0535 }
0536 ++upperLimitOnIndicies;
0537 ++reserveSizeForWorker[w];
0538 if (nullptr == found->second) {
0539 found->second = w;
0540 } else {
0541 branchToReadingWorker.insert(make_pair(found->first, w));
0542 }
0543 }
0544 }
0545 {
0546
0547 auto found = referencesToBranches.end();
0548 if (product.process().empty()) {
0549 auto startFound = referencesToBranches.lower_bound(branch);
0550 if (startFound != referencesToBranches.end()) {
0551 if (startFound->first.substr(0, branch.size()) == branch) {
0552
0553 found = startFound;
0554 }
0555 }
0556 } else {
0557
0558 auto exactFound = referencesToBranches.equal_range(branch);
0559 if (exactFound.first != exactFound.second) {
0560 found = exactFound.first;
0561 }
0562 }
0563 if (found != referencesToBranches.end()) {
0564 for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
0565 auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
0566 if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
0567 continue;
0568 }
0569 if (not foundAtLeastOneMatchingBranch) {
0570 ++upperLimitOnReadingWorker;
0571 foundAtLeastOneMatchingBranch = true;
0572 }
0573 ++upperLimitOnIndicies;
0574 ++reserveSizeForWorker[w];
0575 if (nullptr == foundInBranchToReadingWorker->second) {
0576 foundInBranchToReadingWorker->second = w;
0577 } else {
0578 branchToReadingWorker.insert(make_pair(itr->second, w));
0579 }
0580 }
0581 }
0582 }
0583 }
0584 }
0585 }
0586 {
0587 auto it = branchToReadingWorker.begin();
0588 std::vector<std::string> unusedBranches;
0589 while (it != branchToReadingWorker.end()) {
0590 if (it->second == nullptr) {
0591 unusedBranches.push_back(it->first);
0592
0593 auto temp = it;
0594 ++it;
0595 branchToReadingWorker.erase(temp);
0596 } else {
0597 ++it;
0598 }
0599 }
0600 if (not unusedBranches.empty()) {
0601 LogWarning l("UnusedProductsForCanDeleteEarly");
0602 l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
0603 " If possible, remove the producer from the job.";
0604 for (auto const& n : unusedBranches) {
0605 l << "\n " << n;
0606 }
0607 }
0608 }
0609 if (!branchToReadingWorker.empty()) {
0610 earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
0611 earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
0612 earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
0613 std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
0614 std::string lastBranchName;
0615 size_t nextOpenIndex = 0;
0616 unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
0617 for (auto& branchAndWorker : branchToReadingWorker) {
0618 if (lastBranchName != branchAndWorker.first) {
0619
0620 BranchID bid(branchAndWorker.first + ".");
0621 earlyDeleteBranchToCount_.emplace_back(bid, 0U);
0622 lastBranchName = branchAndWorker.first;
0623 }
0624 auto found = alreadySeenWorkers.find(branchAndWorker.second);
0625 if (alreadySeenWorkers.end() == found) {
0626
0627
0628
0629
0630 size_t index = nextOpenIndex;
0631 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
0632 assert(index < earlyDeleteHelperToBranchIndicies_.size());
0633 earlyDeleteHelperToBranchIndicies_[index] = earlyDeleteBranchToCount_.size() - 1;
0634 earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
0635 branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
0636 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
0637 nextOpenIndex += nIndices;
0638 } else {
0639 found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
0640 }
0641 }
0642
0643
0644
0645 auto itLast = earlyDeleteHelpers_.begin();
0646 for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
0647 if (itLast->end() != it->begin()) {
0648
0649 unsigned int delta = it->begin() - itLast->end();
0650 it->shiftIndexPointers(delta);
0651
0652 earlyDeleteHelperToBranchIndicies_.erase(
0653 earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0654 earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
0655 }
0656 itLast = it;
0657 }
0658 earlyDeleteHelperToBranchIndicies_.erase(
0659 earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0660 earlyDeleteHelperToBranchIndicies_.end());
0661
0662
0663 for (auto& p : trig_paths_) {
0664 p.setEarlyDeleteHelpers(alreadySeenWorkers);
0665 }
0666 for (auto& p : end_paths_) {
0667 p.setEarlyDeleteHelpers(alreadySeenWorkers);
0668 }
0669 resetEarlyDelete();
0670 }
0671 }
0672
0673 std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
0674 Worker* worker,
0675 std::unordered_set<std::string>& conditionalModules,
0676 std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0677 std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0678 ParameterSet& proc_pset,
0679 ProductRegistry& preg,
0680 PreallocationConfiguration const* prealloc,
0681 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0682 std::vector<Worker*> returnValue;
0683 auto const& consumesInfo = worker->consumesInfo();
0684 auto moduleLabel = worker->description()->moduleLabel();
0685 using namespace productholderindexhelper;
0686 for (auto const& ci : consumesInfo) {
0687 if (not ci.skipCurrentProcess() and
0688 (ci.process().empty() or ci.process() == processConfiguration->processName())) {
0689 auto productModuleLabel = std::string(ci.label());
0690 if (productModuleLabel.empty()) {
0691
0692 for (auto const& branch : conditionalModuleBranches) {
0693
0694 if (conditionalModules.find(branch.first) == conditionalModules.end()) {
0695 continue;
0696 }
0697 if (ci.kindOfType() == edm::PRODUCT_TYPE) {
0698 if (branch.second->unwrappedTypeID() != ci.type()) {
0699 continue;
0700 }
0701 } else {
0702 if (not typeIsViewCompatible(
0703 ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
0704 continue;
0705 }
0706 }
0707
0708 auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0709 assert(condWorker);
0710
0711 conditionalModules.erase(branch.first);
0712
0713 auto dependents = tryToPlaceConditionalModules(condWorker,
0714 conditionalModules,
0715 conditionalModuleBranches,
0716 aliasMap,
0717 proc_pset,
0718 preg,
0719 prealloc,
0720 processConfiguration);
0721 returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0722 returnValue.push_back(condWorker);
0723 }
0724 } else {
0725
0726 bool productFromConditionalModule = false;
0727 auto itFound = conditionalModules.find(productModuleLabel);
0728 if (itFound == conditionalModules.end()) {
0729
0730
0731 auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
0732 if (foundAlias) {
0733 productModuleLabel = *foundAlias;
0734 productFromConditionalModule = true;
0735 itFound = conditionalModules.find(productModuleLabel);
0736
0737 if (itFound == conditionalModules.end()) {
0738 continue;
0739 }
0740 }
0741 } else {
0742
0743 auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
0744 for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
0745 if (itBranch->second->productInstanceName() == ci.instance()) {
0746 if (ci.kindOfType() == PRODUCT_TYPE) {
0747 if (ci.type() == itBranch->second->unwrappedTypeID()) {
0748 productFromConditionalModule = true;
0749 break;
0750 }
0751 } else {
0752
0753 if (typeIsViewCompatible(ci.type(),
0754 TypeID(itBranch->second->wrappedType().typeInfo()),
0755 itBranch->second->className())) {
0756 productFromConditionalModule = true;
0757 break;
0758 }
0759 }
0760 }
0761 }
0762 }
0763 if (productFromConditionalModule) {
0764 auto condWorker =
0765 getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0766 assert(condWorker);
0767
0768 conditionalModules.erase(itFound);
0769
0770 auto dependents = tryToPlaceConditionalModules(condWorker,
0771 conditionalModules,
0772 conditionalModuleBranches,
0773 aliasMap,
0774 proc_pset,
0775 preg,
0776 prealloc,
0777 processConfiguration);
0778 returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0779 returnValue.push_back(condWorker);
0780 }
0781 }
0782 }
0783 }
0784 return returnValue;
0785 }
0786
0787 void StreamSchedule::fillWorkers(ParameterSet& proc_pset,
0788 ProductRegistry& preg,
0789 PreallocationConfiguration const* prealloc,
0790 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0791 std::string const& pathName,
0792 bool ignoreFilters,
0793 PathWorkers& out,
0794 std::vector<std::string> const& endPathNames,
0795 ConditionalTaskHelper const& conditionalTaskHelper) {
0796 vstring modnames = proc_pset.getParameter<vstring>(pathName);
0797 PathWorkers tmpworkers;
0798
0799
0800 auto condRange = findConditionalTaskModulesRange(modnames);
0801
0802 std::unordered_set<std::string> conditionalmods;
0803
0804 std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
0805 std::unordered_map<std::string, unsigned int> conditionalModOrder;
0806 if (condRange.first != condRange.second) {
0807 for (auto it = condRange.first; it != condRange.second; ++it) {
0808
0809 conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
0810 }
0811
0812 conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
0813 std::make_move_iterator(condRange.second));
0814
0815 conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
0816 modnames.erase(std::prev(condRange.first), modnames.end());
0817 }
0818
0819 unsigned int placeInPath = 0;
0820 for (auto const& name : modnames) {
0821
0822 bool doNotRunConcurrently = false;
0823 WorkerInPath::FilterAction filterAction = WorkerInPath::Normal;
0824 if (name[0] == '!') {
0825 filterAction = WorkerInPath::Veto;
0826 } else if (name[0] == '-' or name[0] == '+') {
0827 filterAction = WorkerInPath::Ignore;
0828 }
0829 if (name[0] == '|' or name[0] == '+') {
0830
0831 doNotRunConcurrently = true;
0832 }
0833
0834 std::string moduleLabel = name;
0835 if (filterAction != WorkerInPath::Normal or name[0] == '|') {
0836 moduleLabel.erase(0, 1);
0837 }
0838
0839 Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0840 if (worker == nullptr) {
0841 std::string pathType("endpath");
0842 if (!search_all(endPathNames, pathName)) {
0843 pathType = std::string("path");
0844 }
0845 throw Exception(errors::Configuration)
0846 << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
0847 << "\"\n please check spelling or remove that label from the path.";
0848 }
0849
0850 if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
0851
0852
0853 std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
0854 if (!search_all(allowed_filters, worker->description()->moduleName())) {
0855
0856 filterAction = WorkerInPath::Ignore;
0857 LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
0858 << "' with module label '" << moduleLabel << "' appears on EndPath '"
0859 << pathName << "'.\n"
0860 << "The return value of the filter will be ignored.\n"
0861 << "To suppress this warning, either remove the filter from the endpath,\n"
0862 << "or explicitly ignore it in the configuration by using cms.ignore().\n";
0863 }
0864 }
0865 bool runConcurrently = not doNotRunConcurrently;
0866 if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
0867 runConcurrently = false;
0868 }
0869
0870 auto condModules = tryToPlaceConditionalModules(worker,
0871 conditionalmods,
0872 conditionalModsBranches,
0873 conditionalTaskHelper.aliasMap(),
0874 proc_pset,
0875 preg,
0876 prealloc,
0877 processConfiguration);
0878 for (auto condMod : condModules) {
0879 tmpworkers.emplace_back(
0880 condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
0881 }
0882
0883 tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
0884 ++placeInPath;
0885 }
0886
0887 out.swap(tmpworkers);
0888 }
0889
0890 void StreamSchedule::fillTrigPath(ParameterSet& proc_pset,
0891 ProductRegistry& preg,
0892 PreallocationConfiguration const* prealloc,
0893 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0894 int bitpos,
0895 std::string const& name,
0896 TrigResPtr trptr,
0897 std::vector<std::string> const& endPathNames,
0898 ConditionalTaskHelper const& conditionalTaskHelper) {
0899 PathWorkers tmpworkers;
0900 fillWorkers(
0901 proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
0902
0903
0904 if (!tmpworkers.empty()) {
0905 trig_paths_.emplace_back(bitpos,
0906 name,
0907 tmpworkers,
0908 trptr,
0909 actionTable(),
0910 actReg_,
0911 &streamContext_,
0912 &skippingEvent_,
0913 PathContext::PathType::kPath);
0914 } else {
0915 empty_trig_paths_.push_back(bitpos);
0916 }
0917 for (WorkerInPath const& workerInPath : tmpworkers) {
0918 addToAllWorkers(workerInPath.getWorker());
0919 }
0920 }
0921
0922 void StreamSchedule::fillEndPath(ParameterSet& proc_pset,
0923 ProductRegistry& preg,
0924 PreallocationConfiguration const* prealloc,
0925 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0926 int bitpos,
0927 std::string const& name,
0928 std::vector<std::string> const& endPathNames,
0929 ConditionalTaskHelper const& conditionalTaskHelper) {
0930 PathWorkers tmpworkers;
0931 fillWorkers(
0932 proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
0933
0934 if (!tmpworkers.empty()) {
0935
0936 end_paths_.emplace_back(bitpos,
0937 name,
0938 tmpworkers,
0939 TrigResPtr(),
0940 actionTable(),
0941 actReg_,
0942 &streamContext_,
0943 nullptr,
0944 PathContext::PathType::kEndPath);
0945 } else {
0946 empty_end_paths_.push_back(bitpos);
0947 }
0948 for (WorkerInPath const& workerInPath : tmpworkers) {
0949 addToAllWorkers(workerInPath.getWorker());
0950 }
0951 }
0952
0953 void StreamSchedule::beginStream() { workerManager_.beginStream(streamID_, streamContext_); }
0954
0955 void StreamSchedule::endStream() { workerManager_.endStream(streamID_, streamContext_); }
0956
0957 void StreamSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
0958 Worker* found = nullptr;
0959 for (auto const& worker : allWorkers()) {
0960 if (worker->description()->moduleLabel() == iLabel) {
0961 found = worker;
0962 break;
0963 }
0964 }
0965 if (nullptr == found) {
0966 return;
0967 }
0968
0969 iMod->replaceModuleFor(found);
0970 found->beginStream(streamID_, streamContext_);
0971 }
0972
0973 void StreamSchedule::deleteModule(std::string const& iLabel) { workerManager_.deleteModuleIfExists(iLabel); }
0974
0975 std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
0976 std::vector<ModuleDescription const*> result;
0977 result.reserve(allWorkers().size());
0978
0979 for (auto const& worker : allWorkers()) {
0980 ModuleDescription const* p = worker->description();
0981 result.push_back(p);
0982 }
0983 return result;
0984 }
0985
0986 void StreamSchedule::processOneEventAsync(
0987 WaitingTaskHolder iTask,
0988 EventTransitionInfo& info,
0989 ServiceToken const& serviceToken,
0990 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
0991 EventPrincipal& ep = info.principal();
0992
0993
0994 CMS_SA_ALLOW try {
0995 this->resetAll();
0996
0997 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
0998
0999 Traits::setStreamContext(streamContext_, ep);
1000
1001 ServiceRegistry::Operate guard(serviceToken);
1002 Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1003
1004
1005
1006
1007 workerManager_.setupResolvers(ep);
1008 workerManager_.setupOnDemandSystem(info);
1009
1010 HLTPathStatus hltPathStatus(hlt::Pass, 0);
1011 for (int empty_trig_path : empty_trig_paths_) {
1012 results_->at(empty_trig_path) = hltPathStatus;
1013 pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1014 std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1015 ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1016 info, streamID_, ParentContext(&streamContext_), &streamContext_);
1017 if (except) {
1018 iTask.doneWaiting(except);
1019 return;
1020 }
1021 }
1022 for (int empty_end_path : empty_end_paths_) {
1023 std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
1024 ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1025 info, streamID_, ParentContext(&streamContext_), &streamContext_);
1026 if (except) {
1027 iTask.doneWaiting(except);
1028 return;
1029 }
1030 }
1031
1032 ++total_events_;
1033
1034
1035 auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1036 auto pathErrorPtr = pathErrorHolder.get();
1037 ServiceWeakToken weakToken = serviceToken;
1038 auto allPathsDone = make_waiting_task(
1039 [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1040 ServiceRegistry::Operate operate(weakToken.lock());
1041
1042 std::exception_ptr ptr;
1043 if (pathError->load()) {
1044 ptr = *pathError->load();
1045 delete pathError->load();
1046 }
1047 if ((not ptr) and iPtr) {
1048 ptr = *iPtr;
1049 }
1050 iTask.doneWaiting(finishProcessOneEvent(ptr));
1051 });
1052
1053
1054
1055 WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1056
1057 auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1058 std::exception_ptr const* iPtr) mutable {
1059 ServiceRegistry::Operate operate(weakToken.lock());
1060
1061 if (iPtr) {
1062
1063
1064 pathErrorPtr->store(new std::exception_ptr(*iPtr));
1065 }
1066 finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1067 });
1068
1069
1070
1071
1072 WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1073
1074
1075 WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1076 for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1077 it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1078 }
1079
1080 for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1081 it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1082 }
1083
1084 ParentContext parentContext(&streamContext_);
1085 workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1086 hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1087 } catch (...) {
1088 iTask.doneWaiting(std::current_exception());
1089 }
1090 }
1091
1092 void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1093 WaitingTaskHolder iWait,
1094 EventTransitionInfo& info) {
1095 if (iExcept) {
1096
1097 CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1098 exception_actions::ActionCodes action = actionTable().find(e.category());
1099 assert(action != exception_actions::IgnoreCompletely);
1100 assert(action != exception_actions::FailPath);
1101 if (action == exception_actions::SkipEvent) {
1102 edm::printCmsExceptionWarning("SkipEvent", e);
1103 *(iExcept.load()) = std::exception_ptr();
1104 } else {
1105 *(iExcept.load()) = std::current_exception();
1106 }
1107 } catch (...) {
1108 *(iExcept.load()) = std::current_exception();
1109 }
1110 }
1111
1112 if ((not iExcept) and results_->accept()) {
1113 ++total_passed_;
1114 }
1115
1116 if (nullptr != results_inserter_.get()) {
1117
1118 CMS_SA_ALLOW try {
1119
1120
1121 ParentContext parentContext(&streamContext_);
1122 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1123
1124 auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1125 if (expt) {
1126 std::rethrow_exception(expt);
1127 }
1128 } catch (cms::Exception& ex) {
1129 if (not iExcept) {
1130 if (ex.context().empty()) {
1131 std::ostringstream ost;
1132 ost << "Processing Event " << info.principal().id();
1133 ex.addContext(ost.str());
1134 }
1135 iExcept.store(new std::exception_ptr(std::current_exception()));
1136 }
1137 } catch (...) {
1138 if (not iExcept) {
1139 iExcept.store(new std::exception_ptr(std::current_exception()));
1140 }
1141 }
1142 }
1143 std::exception_ptr ptr;
1144 if (iExcept) {
1145 ptr = *iExcept.load();
1146 }
1147 iWait.doneWaiting(ptr);
1148 }
1149
1150 std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1151 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1152
1153 if (iExcept) {
1154
1155 try {
1156 convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1157 } catch (cms::Exception& ex) {
1158 bool const cleaningUpAfterException = false;
1159 if (ex.context().empty()) {
1160 addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1161 } else {
1162 addContextAndPrintException("", ex, cleaningUpAfterException);
1163 }
1164 iExcept = std::current_exception();
1165 }
1166
1167 actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1168 }
1169
1170 CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1171 if (not iExcept) {
1172 iExcept = std::current_exception();
1173 }
1174 }
1175 if (not iExcept) {
1176 resetEarlyDelete();
1177 }
1178
1179 return iExcept;
1180 }
1181
1182 void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1183 oLabelsToFill.reserve(trig_paths_.size());
1184 std::transform(trig_paths_.begin(),
1185 trig_paths_.end(),
1186 std::back_inserter(oLabelsToFill),
1187 std::bind(&Path::name, std::placeholders::_1));
1188 }
1189
1190 void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1191 TrigPaths::const_iterator itFound = std::find_if(
1192 trig_paths_.begin(),
1193 trig_paths_.end(),
1194 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1195 if (itFound != trig_paths_.end()) {
1196 oLabelsToFill.reserve(itFound->size());
1197 for (size_t i = 0; i < itFound->size(); ++i) {
1198 oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1199 }
1200 }
1201 }
1202
1203 void StreamSchedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1204 std::vector<ModuleDescription const*>& descriptions,
1205 unsigned int hint) const {
1206 descriptions.clear();
1207 bool found = false;
1208 TrigPaths::const_iterator itFound;
1209
1210 if (hint < trig_paths_.size()) {
1211 itFound = trig_paths_.begin() + hint;
1212 if (itFound->name() == iPathLabel)
1213 found = true;
1214 }
1215 if (!found) {
1216
1217 itFound = std::find_if(
1218 trig_paths_.begin(),
1219 trig_paths_.end(),
1220 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1221 if (itFound != trig_paths_.end())
1222 found = true;
1223 }
1224 if (found) {
1225 descriptions.reserve(itFound->size());
1226 for (size_t i = 0; i < itFound->size(); ++i) {
1227 descriptions.push_back(itFound->getWorker(i)->description());
1228 }
1229 }
1230 }
1231
1232 void StreamSchedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1233 std::vector<ModuleDescription const*>& descriptions,
1234 unsigned int hint) const {
1235 descriptions.clear();
1236 bool found = false;
1237 TrigPaths::const_iterator itFound;
1238
1239 if (hint < end_paths_.size()) {
1240 itFound = end_paths_.begin() + hint;
1241 if (itFound->name() == iEndPathLabel)
1242 found = true;
1243 }
1244 if (!found) {
1245
1246 itFound = std::find_if(
1247 end_paths_.begin(),
1248 end_paths_.end(),
1249 std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1250 if (itFound != end_paths_.end())
1251 found = true;
1252 }
1253 if (found) {
1254 descriptions.reserve(itFound->size());
1255 for (size_t i = 0; i < itFound->size(); ++i) {
1256 descriptions.push_back(itFound->getWorker(i)->description());
1257 }
1258 }
1259 }
1260
1261 static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1262 sum.timesVisited += path.timesVisited(which);
1263 sum.timesPassed += path.timesPassed(which);
1264 sum.timesFailed += path.timesFailed(which);
1265 sum.timesExcept += path.timesExcept(which);
1266 sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1267 sum.bitPosition = path.bitPosition(which);
1268 }
1269
1270 static void fillPathSummary(Path const& path, PathSummary& sum) {
1271 sum.name = path.name();
1272 sum.bitPosition = path.bitPosition();
1273 sum.timesRun += path.timesRun();
1274 sum.timesPassed += path.timesPassed();
1275 sum.timesFailed += path.timesFailed();
1276 sum.timesExcept += path.timesExcept();
1277
1278 Path::size_type sz = path.size();
1279 if (sum.moduleInPathSummaries.empty()) {
1280 std::vector<ModuleInPathSummary> temp(sz);
1281 for (size_t i = 0; i != sz; ++i) {
1282 fillModuleInPathSummary(path, i, temp[i]);
1283 }
1284 sum.moduleInPathSummaries.swap(temp);
1285 } else {
1286 assert(sz == sum.moduleInPathSummaries.size());
1287 for (size_t i = 0; i != sz; ++i) {
1288 fillModuleInPathSummary(path, i, sum.moduleInPathSummaries[i]);
1289 }
1290 }
1291 }
1292
1293 static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1294 sum.timesVisited += w.timesVisited();
1295 sum.timesRun += w.timesRun();
1296 sum.timesPassed += w.timesPassed();
1297 sum.timesFailed += w.timesFailed();
1298 sum.timesExcept += w.timesExcept();
1299 sum.moduleLabel = w.description()->moduleLabel();
1300 }
1301
1302 static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1303
1304 void StreamSchedule::getTriggerReport(TriggerReport& rep) const {
1305 rep.eventSummary.totalEvents += totalEvents();
1306 rep.eventSummary.totalEventsPassed += totalEventsPassed();
1307 rep.eventSummary.totalEventsFailed += totalEventsFailed();
1308
1309 fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1310 fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1311 fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1312 }
1313
1314 void StreamSchedule::clearCounters() {
1315 using std::placeholders::_1;
1316 total_events_ = total_passed_ = 0;
1317 for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1318 for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1319 for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1320 }
1321
1322 void StreamSchedule::resetAll() {
1323 skippingEvent_ = false;
1324 results_->reset();
1325 }
1326
1327 void StreamSchedule::addToAllWorkers(Worker* w) { workerManager_.addToAllWorkers(w); }
1328
1329 void StreamSchedule::resetEarlyDelete() {
1330
1331 for (auto& count : earlyDeleteBranchToCount_) {
1332 count.count = 0;
1333 }
1334
1335 for (auto& index : earlyDeleteHelperToBranchIndicies_) {
1336 ++(earlyDeleteBranchToCount_[index].count);
1337 }
1338 for (auto& helper : earlyDeleteHelpers_) {
1339 helper.reset();
1340 }
1341 }
1342
1343 void StreamSchedule::makePathStatusInserters(
1344 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1345 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1346 ExceptionToActionTable const& actions) {
1347 int bitpos = 0;
1348 unsigned int indexEmpty = 0;
1349 unsigned int indexOfPath = 0;
1350 for (auto& pathStatusInserter : pathStatusInserters) {
1351 std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1352 WorkerPtr workerPtr(
1353 new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1354 pathStatusInserterWorkers_.emplace_back(workerPtr);
1355 workerPtr->setActivityRegistry(actReg_);
1356 addToAllWorkers(workerPtr.get());
1357
1358
1359
1360
1361 if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1362 ++indexEmpty;
1363 } else {
1364 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1365 ++indexOfPath;
1366 }
1367 ++bitpos;
1368 }
1369
1370 bitpos = 0;
1371 indexEmpty = 0;
1372 indexOfPath = 0;
1373 for (auto& endPathStatusInserter : endPathStatusInserters) {
1374 std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1375 WorkerPtr workerPtr(
1376 new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1377 endPathStatusInserterWorkers_.emplace_back(workerPtr);
1378 workerPtr->setActivityRegistry(actReg_);
1379 addToAllWorkers(workerPtr.get());
1380
1381
1382
1383
1384 if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1385 ++indexEmpty;
1386 } else {
1387 end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1388 ++indexOfPath;
1389 }
1390 ++bitpos;
1391 }
1392 }
1393 }