File indexing completed on 2024-06-04 04:34:55
0001 #include "FWCore/Framework/interface/StreamSchedule.h"
0002
0003 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0004 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0005 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0007 #include "FWCore/Framework/src/OutputModuleDescription.h"
0008 #include "FWCore/Framework/interface/TriggerNamesService.h"
0009 #include "FWCore/Framework/src/TriggerReport.h"
0010 #include "FWCore/Framework/src/TriggerTimingReport.h"
0011 #include "FWCore/Framework/src/Factory.h"
0012 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0013 #include "FWCore/Framework/src/TriggerResultInserter.h"
0014 #include "FWCore/Framework/src/PathStatusInserter.h"
0015 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0016 #include "FWCore/Framework/interface/WorkerInPath.h"
0017 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0018 #include "FWCore/Framework/interface/maker/WorkerT.h"
0019 #include "FWCore/Framework/interface/ModuleRegistry.h"
0020 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 #include "FWCore/ParameterSet/interface/Registry.h"
0024 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0025 #include "FWCore/Utilities/interface/Algorithms.h"
0026 #include "FWCore/Utilities/interface/ConvertException.h"
0027 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0028 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0029
0030 #include "LuminosityBlockProcessingStatus.h"
0031 #include "processEDAliases.h"
0032
0033 #include <algorithm>
0034 #include <cassert>
0035 #include <cstdlib>
0036 #include <functional>
0037 #include <iomanip>
0038 #include <limits>
0039 #include <list>
0040 #include <map>
0041 #include <exception>
0042 #include <fmt/format.h>
0043
0044 namespace edm {
0045
0046 namespace {
0047
0048
0049
0050
0051
0052 template <typename InputIterator, typename ForwardIterator, typename Func>
0053 void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
0054 for (; begin != end; ++begin, ++out)
0055 func(*begin, *out);
0056 }
0057
0058
0059
0060
0061
0062
0063 template <typename FROM, typename TO, typename FUNC>
0064 void fill_summary(FROM const& from, TO& to, FUNC func) {
0065 if (to.size() != from.size()) {
0066 TO temp(from.size());
0067 transform_into(from.begin(), from.end(), temp.begin(), func);
0068 to.swap(temp);
0069 } else {
0070 transform_into(from.begin(), from.end(), to.begin(), func);
0071 }
0072 }
0073
0074
0075
0076
0077
0078
0079 StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
0080 std::shared_ptr<ActivityRegistry> areg,
0081 std::shared_ptr<TriggerResultInserter> inserter) {
0082 StreamSchedule::WorkerPtr ptr(
0083 new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
0084 ptr->setActivityRegistry(areg);
0085 return ptr;
0086 }
0087
0088 void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
0089 ProductRegistry const& preg,
0090 std::multimap<std::string, Worker*>& branchToReadingWorker) {
0091 auto vBranchesToDeleteEarly = branchesToDeleteEarly;
0092
0093 std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
0094 vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
0095 vBranchesToDeleteEarly.end());
0096
0097
0098 auto allBranchNames = preg.allBranchNames();
0099
0100 for (auto& b : allBranchNames) {
0101 b.resize(b.size() - 1);
0102 }
0103 std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
0104 std::vector<std::string> temp;
0105 temp.reserve(vBranchesToDeleteEarly.size());
0106
0107 std::set_intersection(vBranchesToDeleteEarly.begin(),
0108 vBranchesToDeleteEarly.end(),
0109 allBranchNames.begin(),
0110 allBranchNames.end(),
0111 std::back_inserter(temp));
0112 vBranchesToDeleteEarly.swap(temp);
0113 if (temp.size() != vBranchesToDeleteEarly.size()) {
0114 std::vector<std::string> missingProducts;
0115 std::set_difference(temp.begin(),
0116 temp.end(),
0117 vBranchesToDeleteEarly.begin(),
0118 vBranchesToDeleteEarly.end(),
0119 std::back_inserter(missingProducts));
0120 LogInfo l("MissingProductsForCanDeleteEarly");
0121 l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
0122 for (auto const& n : missingProducts) {
0123 l << "\n " << n;
0124 }
0125 }
0126
0127
0128 for (auto const& branch : vBranchesToDeleteEarly) {
0129 branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
0130 }
0131 }
0132
0133 Worker* getWorker(std::string const& moduleLabel,
0134 ParameterSet& proc_pset,
0135 WorkerManager& workerManager,
0136 ProductRegistry& preg,
0137 PreallocationConfiguration const* prealloc,
0138 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0139 bool isTracked;
0140 ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0141 if (modpset == nullptr) {
0142 return nullptr;
0143 }
0144 assert(isTracked);
0145
0146 return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
0147 }
0148
0149
0150
0151
0152
0153
0154
0155 template <typename T>
0156 auto findConditionalTaskModulesRange(T& modnames) {
0157 auto beg = std::find(modnames.begin(), modnames.end(), "#");
0158 if (beg == modnames.end()) {
0159 return std::pair(modnames.end(), modnames.end());
0160 }
0161 return std::pair(beg + 1, std::prev(modnames.end()));
0162 }
0163
0164 std::optional<std::string> findBestMatchingAlias(
0165 std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0166 std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
0167 std::string const& productModuleLabel,
0168 ConsumesInfo const& consumesInfo) {
0169 std::optional<std::string> best;
0170 int wildcardsInBest = std::numeric_limits<int>::max();
0171 bool bestIsAmbiguous = false;
0172
0173 auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
0174 std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
0175 int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
0176 if (wildcards == 0) {
0177 bestIsAmbiguous = false;
0178 return true;
0179 }
0180 if (not best or wildcards < wildcardsInBest) {
0181 best = label;
0182 wildcardsInBest = wildcards;
0183 bestIsAmbiguous = false;
0184 } else if (best and *best != label and wildcardsInBest == wildcards) {
0185 bestIsAmbiguous = true;
0186 }
0187 return false;
0188 };
0189
0190 auto findAlias = aliasMap.equal_range(productModuleLabel);
0191 for (auto it = findAlias.first; it != findAlias.second; ++it) {
0192 std::string const& aliasInstanceLabel =
0193 it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
0194 bool const instanceIsWildcard = (aliasInstanceLabel == "*");
0195 if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
0196 bool const typeIsWildcard = it->second.friendlyClassName == "*";
0197 if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
0198 if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0199 return it->second.originalModuleLabel;
0200 }
0201 } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
0202
0203
0204 auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
0205 for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
0206 if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
0207 if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
0208 TypeID(itBranch->second->wrappedType().typeInfo()),
0209 itBranch->second->className())) {
0210 if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0211 return it->second.originalModuleLabel;
0212 }
0213 }
0214 }
0215 }
0216 }
0217 }
0218 }
0219 if (bestIsAmbiguous) {
0220 throw Exception(errors::UnimplementedFeature)
0221 << "Encountered ambiguity when trying to find a best-matching alias for\n"
0222 << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
0223 << " module label " << productModuleLabel << "\n"
0224 << " product instance name " << consumesInfo.instance() << "\n"
0225 << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
0226 "wildcards ("
0227 << wildcardsInBest << ")";
0228 }
0229 return best;
0230 }
0231 }
0232
0233
0234
0235 typedef std::vector<std::string> vstring;
0236
0237
0238
0239 class ConditionalTaskHelper {
0240 public:
0241 using AliasInfo = StreamSchedule::AliasInfo;
0242
0243 ConditionalTaskHelper(ParameterSet& proc_pset,
0244 ProductRegistry& preg,
0245 PreallocationConfiguration const* prealloc,
0246 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0247 WorkerManager& workerManagerLumisAndEvents,
0248 std::vector<std::string> const& trigPathNames) {
0249 std::unordered_set<std::string> allConditionalMods;
0250 for (auto const& pathName : trigPathNames) {
0251 auto const modnames = proc_pset.getParameter<vstring>(pathName);
0252
0253
0254 auto condRange = findConditionalTaskModulesRange(modnames);
0255 if (condRange.first == condRange.second)
0256 continue;
0257
0258
0259 allConditionalMods.insert(condRange.first, condRange.second);
0260 }
0261
0262 for (auto const& cond : allConditionalMods) {
0263
0264 (void)getWorker(cond, proc_pset, workerManagerLumisAndEvents, preg, prealloc, processConfiguration);
0265 }
0266
0267 fillAliasMap(proc_pset, allConditionalMods);
0268 processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
0269
0270
0271 for (auto const& prod : preg.productList()) {
0272 if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
0273 conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
0274 }
0275 }
0276 }
0277
0278 std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
0279
0280 std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
0281 std::unordered_set<std::string> const& conditionalmods) const {
0282 std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
0283 for (auto const& mod : conditionalmods) {
0284 auto range = conditionalModsBranches_.equal_range(mod);
0285 ret.insert(range.first, range.second);
0286 }
0287 return ret;
0288 }
0289
0290 private:
0291 void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
0292 auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0293 std::string const star("*");
0294 for (auto const& alias : aliases) {
0295 auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
0296 auto aliasedToModuleLabels = info.getParameterNames();
0297 for (auto const& mod : aliasedToModuleLabels) {
0298 if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
0299 auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
0300 for (auto const& aliasPSet : aliasVPSet) {
0301 std::string type = star;
0302 std::string instance = star;
0303 std::string originalInstance = star;
0304 if (aliasPSet.exists("type")) {
0305 type = aliasPSet.getParameter<std::string>("type");
0306 }
0307 if (aliasPSet.exists("toProductInstance")) {
0308 instance = aliasPSet.getParameter<std::string>("toProductInstance");
0309 }
0310 if (aliasPSet.exists("fromProductInstance")) {
0311 originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
0312 }
0313
0314 aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
0315 }
0316 }
0317 }
0318 }
0319 }
0320
0321 void processSwitchEDAliases(ParameterSet const& proc_pset,
0322 ProductRegistry& preg,
0323 ProcessConfiguration const& processConfiguration,
0324 std::unordered_set<std::string> const& allConditionalMods) {
0325 auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
0326 std::vector<std::string> switchEDAliases;
0327 for (auto const& module : all_modules) {
0328 auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
0329 if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
0330 auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
0331 for (auto const& case_label : all_cases) {
0332 auto range = aliasMap_.equal_range(case_label);
0333 if (range.first != range.second) {
0334 switchEDAliases.push_back(case_label);
0335 }
0336 }
0337 }
0338 }
0339 detail::processEDAliases(
0340 switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
0341 }
0342
0343 std::unordered_multimap<std::string, AliasInfo> aliasMap_;
0344 std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
0345 };
0346
0347
0348
0349 StreamSchedule::StreamSchedule(
0350 std::shared_ptr<TriggerResultInserter> inserter,
0351 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0352 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0353 std::shared_ptr<ModuleRegistry> modReg,
0354 ParameterSet& proc_pset,
0355 service::TriggerNamesService const& tns,
0356 PreallocationConfiguration const& prealloc,
0357 ProductRegistry& preg,
0358 ExceptionToActionTable const& actions,
0359 std::shared_ptr<ActivityRegistry> areg,
0360 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0361 StreamID streamID,
0362 ProcessContext const* processContext)
0363 : workerManagerBeginEnd_(modReg, areg, actions),
0364 workerManagerRuns_(modReg, areg, actions),
0365 workerManagerLumisAndEvents_(modReg, areg, actions),
0366 actReg_(areg),
0367 results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
0368 results_inserter_(),
0369 trig_paths_(),
0370 end_paths_(),
0371 total_events_(),
0372 total_passed_(),
0373 number_of_unscheduled_modules_(0),
0374 streamID_(streamID),
0375 streamContext_(streamID_, processContext) {
0376 bool hasPath = false;
0377 std::vector<std::string> const& pathNames = tns.getTrigPaths();
0378 std::vector<std::string> const& endPathNames = tns.getEndPaths();
0379
0380 ConditionalTaskHelper conditionalTaskHelper(
0381 proc_pset, preg, &prealloc, processConfiguration, workerManagerLumisAndEvents_, pathNames);
0382 std::unordered_set<std::string> conditionalModules;
0383
0384 int trig_bitpos = 0;
0385 trig_paths_.reserve(pathNames.size());
0386 for (auto const& trig_name : pathNames) {
0387 fillTrigPath(proc_pset,
0388 preg,
0389 &prealloc,
0390 processConfiguration,
0391 trig_bitpos,
0392 trig_name,
0393 results(),
0394 endPathNames,
0395 conditionalTaskHelper,
0396 conditionalModules);
0397 ++trig_bitpos;
0398 hasPath = true;
0399 }
0400
0401 if (hasPath) {
0402
0403 inserter->setTrigResultForStream(streamID.value(), results());
0404
0405 results_inserter_ = makeInserter(actions, actReg_, inserter);
0406 addToAllWorkers(results_inserter_.get());
0407 }
0408
0409
0410 int bitpos = 0;
0411 end_paths_.reserve(endPathNames.size());
0412 for (auto const& end_path_name : endPathNames) {
0413 fillEndPath(proc_pset,
0414 preg,
0415 &prealloc,
0416 processConfiguration,
0417 bitpos,
0418 end_path_name,
0419 endPathNames,
0420 conditionalTaskHelper,
0421 conditionalModules);
0422 ++bitpos;
0423 }
0424
0425 makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
0426
0427
0428 std::set<std::string> usedWorkerLabels;
0429 for (auto const& worker : allWorkersLumisAndEvents()) {
0430 usedWorkerLabels.insert(worker->description()->moduleLabel());
0431 }
0432 std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0433 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0434 std::vector<std::string> unusedLabels;
0435 set_difference(modulesInConfigSet.begin(),
0436 modulesInConfigSet.end(),
0437 usedWorkerLabels.begin(),
0438 usedWorkerLabels.end(),
0439 back_inserter(unusedLabels));
0440 std::set<std::string> unscheduledLabels;
0441 std::vector<std::string> shouldBeUsedLabels;
0442 if (!unusedLabels.empty()) {
0443
0444
0445
0446
0447 for (auto const& label : unusedLabels) {
0448 bool isTracked;
0449 ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
0450 assert(isTracked);
0451 assert(modulePSet != nullptr);
0452 workerManagerLumisAndEvents_.addToUnscheduledWorkers(
0453 *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0454 }
0455 if (!shouldBeUsedLabels.empty()) {
0456 std::ostringstream unusedStream;
0457 unusedStream << "'" << shouldBeUsedLabels.front() << "'";
0458 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
0459 itLabelEnd = shouldBeUsedLabels.end();
0460 itLabel != itLabelEnd;
0461 ++itLabel) {
0462 unusedStream << ",'" << *itLabel << "'";
0463 }
0464 LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
0465 }
0466 }
0467 number_of_unscheduled_modules_ = unscheduledLabels.size();
0468
0469
0470 if (streamID.value() == 0 and not conditionalModules.empty()) {
0471
0472
0473
0474
0475 std::vector<std::string_view> labelsToPrint;
0476 std::copy_if(
0477 unscheduledLabels.begin(),
0478 unscheduledLabels.end(),
0479 std::back_inserter(labelsToPrint),
0480 [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
0481
0482 if (not labelsToPrint.empty()) {
0483 edm::LogWarning log("NonConsumedConditionalModules");
0484 log << "The following modules were part of some ConditionalTask, but were not\n"
0485 << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
0486 << "was associated. Perhaps they should be either removed from the\n"
0487 << "job, or moved to a Task to make it explicit they are unscheduled.\n";
0488 for (auto const& modLabel : labelsToPrint) {
0489 log.format("\n {}", modLabel);
0490 }
0491 }
0492 }
0493
0494 for (auto const& worker : allWorkersLumisAndEvents()) {
0495 std::string const& moduleLabel = worker->description()->moduleLabel();
0496
0497
0498
0499
0500
0501
0502 Worker* workerBeginEnd =
0503 getWorker(moduleLabel, proc_pset, workerManagerBeginEnd_, preg, &prealloc, processConfiguration);
0504 if (workerBeginEnd) {
0505 workerManagerBeginEnd_.addToAllWorkers(workerBeginEnd);
0506 }
0507
0508 Worker* workerRuns = getWorker(moduleLabel, proc_pset, workerManagerRuns_, preg, &prealloc, processConfiguration);
0509 if (workerRuns) {
0510 workerManagerRuns_.addToAllWorkers(workerRuns);
0511 }
0512 }
0513
0514 }
0515
0516 void StreamSchedule::initializeEarlyDelete(ModuleRegistry& modReg,
0517 std::vector<std::string> const& branchesToDeleteEarly,
0518 std::multimap<std::string, std::string> const& referencesToBranches,
0519 std::vector<std::string> const& modulesToSkip,
0520 edm::ProductRegistry const& preg) {
0521
0522 std::multimap<std::string, Worker*> branchToReadingWorker;
0523 initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
0524
0525 const std::vector<std::string> kEmpty;
0526 std::map<Worker*, unsigned int> reserveSizeForWorker;
0527 unsigned int upperLimitOnReadingWorker = 0;
0528 unsigned int upperLimitOnIndicies = 0;
0529 unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
0530
0531
0532 modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
0533 auto comm = iHolder->createOutputModuleCommunicator();
0534 if (comm) {
0535 if (!branchToReadingWorker.empty()) {
0536
0537
0538 SelectedProductsForBranchType const& kept = comm->keptProducts();
0539 for (auto const& item : kept[InEvent]) {
0540 BranchDescription const& desc = *item.first;
0541 auto found = branchToReadingWorker.equal_range(desc.branchName());
0542 if (found.first != found.second) {
0543 --nUniqueBranchesToDelete;
0544 branchToReadingWorker.erase(found.first, found.second);
0545 }
0546 }
0547 }
0548 }
0549 });
0550
0551 if (branchToReadingWorker.empty()) {
0552 return;
0553 }
0554
0555 std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
0556 for (auto w : allWorkersLumisAndEvents()) {
0557 if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
0558 continue;
0559 }
0560
0561 auto consumes = w->consumesInfo();
0562 if (not consumes.empty()) {
0563 bool foundAtLeastOneMatchingBranch = false;
0564 for (auto const& product : consumes) {
0565 std::string branch = fmt::format("{}_{}_{}_{}",
0566 product.type().friendlyClassName(),
0567 product.label().data(),
0568 product.instance().data(),
0569 product.process().data());
0570 {
0571
0572 auto found = branchToReadingWorker.end();
0573 if (product.process().empty()) {
0574 auto startFound = branchToReadingWorker.lower_bound(branch);
0575 if (startFound != branchToReadingWorker.end()) {
0576 if (startFound->first.substr(0, branch.size()) == branch) {
0577
0578 found = startFound;
0579 }
0580 }
0581 } else {
0582 auto exactFound = branchToReadingWorker.equal_range(branch);
0583 if (exactFound.first != exactFound.second) {
0584 found = exactFound.first;
0585 }
0586 }
0587 if (found != branchToReadingWorker.end()) {
0588 if (not foundAtLeastOneMatchingBranch) {
0589 ++upperLimitOnReadingWorker;
0590 foundAtLeastOneMatchingBranch = true;
0591 }
0592 ++upperLimitOnIndicies;
0593 ++reserveSizeForWorker[w];
0594 if (nullptr == found->second) {
0595 found->second = w;
0596 } else {
0597 branchToReadingWorker.insert(make_pair(found->first, w));
0598 }
0599 }
0600 }
0601 {
0602
0603 auto found = referencesToBranches.end();
0604 if (product.process().empty()) {
0605 auto startFound = referencesToBranches.lower_bound(branch);
0606 if (startFound != referencesToBranches.end()) {
0607 if (startFound->first.substr(0, branch.size()) == branch) {
0608
0609 found = startFound;
0610 }
0611 }
0612 } else {
0613
0614 auto exactFound = referencesToBranches.equal_range(branch);
0615 if (exactFound.first != exactFound.second) {
0616 found = exactFound.first;
0617 }
0618 }
0619 if (found != referencesToBranches.end()) {
0620 for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
0621 auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
0622 if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
0623 continue;
0624 }
0625 if (not foundAtLeastOneMatchingBranch) {
0626 ++upperLimitOnReadingWorker;
0627 foundAtLeastOneMatchingBranch = true;
0628 }
0629 ++upperLimitOnIndicies;
0630 ++reserveSizeForWorker[w];
0631 if (nullptr == foundInBranchToReadingWorker->second) {
0632 foundInBranchToReadingWorker->second = w;
0633 } else {
0634 branchToReadingWorker.insert(make_pair(itr->second, w));
0635 }
0636 }
0637 }
0638 }
0639 }
0640 }
0641 }
0642 {
0643 auto it = branchToReadingWorker.begin();
0644 std::vector<std::string> unusedBranches;
0645 while (it != branchToReadingWorker.end()) {
0646 if (it->second == nullptr) {
0647 unusedBranches.push_back(it->first);
0648
0649 auto temp = it;
0650 ++it;
0651 branchToReadingWorker.erase(temp);
0652 } else {
0653 ++it;
0654 }
0655 }
0656 if (not unusedBranches.empty()) {
0657 LogWarning l("UnusedProductsForCanDeleteEarly");
0658 l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
0659 " If possible, remove the producer from the job.";
0660 for (auto const& n : unusedBranches) {
0661 l << "\n " << n;
0662 }
0663 }
0664 }
0665 if (!branchToReadingWorker.empty()) {
0666 earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
0667 earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
0668 earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
0669 std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
0670 std::string lastBranchName;
0671 size_t nextOpenIndex = 0;
0672 unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
0673 for (auto& branchAndWorker : branchToReadingWorker) {
0674 if (lastBranchName != branchAndWorker.first) {
0675
0676 BranchID bid(branchAndWorker.first + ".");
0677 earlyDeleteBranchToCount_.emplace_back(bid, 0U);
0678 lastBranchName = branchAndWorker.first;
0679 }
0680 auto found = alreadySeenWorkers.find(branchAndWorker.second);
0681 if (alreadySeenWorkers.end() == found) {
0682
0683
0684
0685
0686 size_t index = nextOpenIndex;
0687 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
0688 assert(index < earlyDeleteHelperToBranchIndicies_.size());
0689 earlyDeleteHelperToBranchIndicies_[index] = earlyDeleteBranchToCount_.size() - 1;
0690 earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
0691 branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
0692 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
0693 nextOpenIndex += nIndices;
0694 } else {
0695 found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
0696 }
0697 }
0698
0699
0700
0701 auto itLast = earlyDeleteHelpers_.begin();
0702 for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
0703 if (itLast->end() != it->begin()) {
0704
0705 unsigned int delta = it->begin() - itLast->end();
0706 it->shiftIndexPointers(delta);
0707
0708 earlyDeleteHelperToBranchIndicies_.erase(
0709 earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0710 earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
0711 }
0712 itLast = it;
0713 }
0714 earlyDeleteHelperToBranchIndicies_.erase(
0715 earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0716 earlyDeleteHelperToBranchIndicies_.end());
0717
0718
0719 for (auto& p : trig_paths_) {
0720 p.setEarlyDeleteHelpers(alreadySeenWorkers);
0721 }
0722 for (auto& p : end_paths_) {
0723 p.setEarlyDeleteHelpers(alreadySeenWorkers);
0724 }
0725 resetEarlyDelete();
0726 }
0727 }
0728
0729 std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
0730 Worker* worker,
0731 std::unordered_set<std::string>& conditionalModules,
0732 std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0733 std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0734 ParameterSet& proc_pset,
0735 ProductRegistry& preg,
0736 PreallocationConfiguration const* prealloc,
0737 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0738 std::vector<Worker*> returnValue;
0739 auto const& consumesInfo = worker->consumesInfo();
0740 auto moduleLabel = worker->description()->moduleLabel();
0741 using namespace productholderindexhelper;
0742 for (auto const& ci : consumesInfo) {
0743 if (not ci.skipCurrentProcess() and
0744 (ci.process().empty() or ci.process() == processConfiguration->processName())) {
0745 auto productModuleLabel = std::string(ci.label());
0746 bool productFromConditionalModule = false;
0747 auto itFound = conditionalModules.find(productModuleLabel);
0748 if (itFound == conditionalModules.end()) {
0749
0750
0751 auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
0752 if (foundAlias) {
0753 productModuleLabel = *foundAlias;
0754 productFromConditionalModule = true;
0755 itFound = conditionalModules.find(productModuleLabel);
0756
0757 if (itFound == conditionalModules.end()) {
0758 continue;
0759 }
0760 }
0761 } else {
0762
0763 auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
0764 for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
0765 if (itBranch->second->productInstanceName() == ci.instance()) {
0766 if (ci.kindOfType() == PRODUCT_TYPE) {
0767 if (ci.type() == itBranch->second->unwrappedTypeID()) {
0768 productFromConditionalModule = true;
0769 break;
0770 }
0771 } else {
0772
0773 if (typeIsViewCompatible(
0774 ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
0775 productFromConditionalModule = true;
0776 break;
0777 }
0778 }
0779 }
0780 }
0781 }
0782 if (productFromConditionalModule) {
0783 auto condWorker = getWorker(
0784 productModuleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0785 assert(condWorker);
0786
0787 conditionalModules.erase(itFound);
0788
0789 auto dependents = tryToPlaceConditionalModules(condWorker,
0790 conditionalModules,
0791 conditionalModuleBranches,
0792 aliasMap,
0793 proc_pset,
0794 preg,
0795 prealloc,
0796 processConfiguration);
0797 returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0798 returnValue.push_back(condWorker);
0799 }
0800 }
0801 }
0802 return returnValue;
0803 }
0804
0805 void StreamSchedule::fillWorkers(ParameterSet& proc_pset,
0806 ProductRegistry& preg,
0807 PreallocationConfiguration const* prealloc,
0808 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0809 std::string const& pathName,
0810 bool ignoreFilters,
0811 PathWorkers& out,
0812 std::vector<std::string> const& endPathNames,
0813 ConditionalTaskHelper const& conditionalTaskHelper,
0814 std::unordered_set<std::string>& allConditionalModules) {
0815 vstring modnames = proc_pset.getParameter<vstring>(pathName);
0816 PathWorkers tmpworkers;
0817
0818
0819 auto condRange = findConditionalTaskModulesRange(modnames);
0820
0821 std::unordered_set<std::string> conditionalmods;
0822
0823 std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
0824 std::unordered_map<std::string, unsigned int> conditionalModOrder;
0825 if (condRange.first != condRange.second) {
0826 for (auto it = condRange.first; it != condRange.second; ++it) {
0827
0828 conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
0829 }
0830
0831 conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
0832 std::make_move_iterator(condRange.second));
0833
0834 conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
0835 modnames.erase(std::prev(condRange.first), modnames.end());
0836
0837
0838 allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
0839 }
0840
0841 unsigned int placeInPath = 0;
0842 for (auto const& name : modnames) {
0843
0844 bool doNotRunConcurrently = false;
0845 WorkerInPath::FilterAction filterAction = WorkerInPath::Normal;
0846 if (name[0] == '!') {
0847 filterAction = WorkerInPath::Veto;
0848 } else if (name[0] == '-' or name[0] == '+') {
0849 filterAction = WorkerInPath::Ignore;
0850 }
0851 if (name[0] == '|' or name[0] == '+') {
0852
0853 doNotRunConcurrently = true;
0854 }
0855
0856 std::string moduleLabel = name;
0857 if (filterAction != WorkerInPath::Normal or name[0] == '|') {
0858 moduleLabel.erase(0, 1);
0859 }
0860
0861 Worker* worker =
0862 getWorker(moduleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0863 if (worker == nullptr) {
0864 std::string pathType("endpath");
0865 if (!search_all(endPathNames, pathName)) {
0866 pathType = std::string("path");
0867 }
0868 throw Exception(errors::Configuration)
0869 << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
0870 << "\"\n please check spelling or remove that label from the path.";
0871 }
0872
0873 if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
0874
0875
0876 std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
0877 if (!search_all(allowed_filters, worker->description()->moduleName())) {
0878
0879 filterAction = WorkerInPath::Ignore;
0880 LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
0881 << "' with module label '" << moduleLabel << "' appears on EndPath '"
0882 << pathName << "'.\n"
0883 << "The return value of the filter will be ignored.\n"
0884 << "To suppress this warning, either remove the filter from the endpath,\n"
0885 << "or explicitly ignore it in the configuration by using cms.ignore().\n";
0886 }
0887 }
0888 bool runConcurrently = not doNotRunConcurrently;
0889 if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
0890 runConcurrently = false;
0891 }
0892
0893 auto condModules = tryToPlaceConditionalModules(worker,
0894 conditionalmods,
0895 conditionalModsBranches,
0896 conditionalTaskHelper.aliasMap(),
0897 proc_pset,
0898 preg,
0899 prealloc,
0900 processConfiguration);
0901 for (auto condMod : condModules) {
0902 tmpworkers.emplace_back(
0903 condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
0904 }
0905
0906 tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
0907 ++placeInPath;
0908 }
0909
0910 out.swap(tmpworkers);
0911 }
0912
0913 void StreamSchedule::fillTrigPath(ParameterSet& proc_pset,
0914 ProductRegistry& preg,
0915 PreallocationConfiguration const* prealloc,
0916 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0917 int bitpos,
0918 std::string const& name,
0919 TrigResPtr trptr,
0920 std::vector<std::string> const& endPathNames,
0921 ConditionalTaskHelper const& conditionalTaskHelper,
0922 std::unordered_set<std::string>& allConditionalModules) {
0923 PathWorkers tmpworkers;
0924 fillWorkers(proc_pset,
0925 preg,
0926 prealloc,
0927 processConfiguration,
0928 name,
0929 false,
0930 tmpworkers,
0931 endPathNames,
0932 conditionalTaskHelper,
0933 allConditionalModules);
0934
0935
0936 if (!tmpworkers.empty()) {
0937 trig_paths_.emplace_back(
0938 bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
0939 } else {
0940 empty_trig_paths_.push_back(bitpos);
0941 }
0942 for (WorkerInPath const& workerInPath : tmpworkers) {
0943 addToAllWorkers(workerInPath.getWorker());
0944 }
0945 }
0946
0947 void StreamSchedule::fillEndPath(ParameterSet& proc_pset,
0948 ProductRegistry& preg,
0949 PreallocationConfiguration const* prealloc,
0950 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0951 int bitpos,
0952 std::string const& name,
0953 std::vector<std::string> const& endPathNames,
0954 ConditionalTaskHelper const& conditionalTaskHelper,
0955 std::unordered_set<std::string>& allConditionalModules) {
0956 PathWorkers tmpworkers;
0957 fillWorkers(proc_pset,
0958 preg,
0959 prealloc,
0960 processConfiguration,
0961 name,
0962 true,
0963 tmpworkers,
0964 endPathNames,
0965 conditionalTaskHelper,
0966 allConditionalModules);
0967
0968 if (!tmpworkers.empty()) {
0969 end_paths_.emplace_back(bitpos,
0970 name,
0971 tmpworkers,
0972 TrigResPtr(),
0973 actionTable(),
0974 actReg_,
0975 &streamContext_,
0976 PathContext::PathType::kEndPath);
0977 } else {
0978 empty_end_paths_.push_back(bitpos);
0979 }
0980 for (WorkerInPath const& workerInPath : tmpworkers) {
0981 addToAllWorkers(workerInPath.getWorker());
0982 }
0983 }
0984
0985 void StreamSchedule::beginStream() { workerManagerBeginEnd_.beginStream(streamID_, streamContext_); }
0986
0987 void StreamSchedule::endStream() { workerManagerBeginEnd_.endStream(streamID_, streamContext_); }
0988
0989 void StreamSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
0990 for (auto const& worker : allWorkersBeginEnd()) {
0991 if (worker->description()->moduleLabel() == iLabel) {
0992 iMod->replaceModuleFor(worker);
0993 worker->beginStream(streamID_, streamContext_);
0994 break;
0995 }
0996 }
0997
0998 for (auto const& worker : allWorkersRuns()) {
0999 if (worker->description()->moduleLabel() == iLabel) {
1000 iMod->replaceModuleFor(worker);
1001 break;
1002 }
1003 }
1004
1005 for (auto const& worker : allWorkersLumisAndEvents()) {
1006 if (worker->description()->moduleLabel() == iLabel) {
1007 iMod->replaceModuleFor(worker);
1008 break;
1009 }
1010 }
1011 }
1012
1013 void StreamSchedule::deleteModule(std::string const& iLabel) {
1014 workerManagerBeginEnd_.deleteModuleIfExists(iLabel);
1015 workerManagerRuns_.deleteModuleIfExists(iLabel);
1016 workerManagerLumisAndEvents_.deleteModuleIfExists(iLabel);
1017 }
1018
1019 std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
1020 std::vector<ModuleDescription const*> result;
1021 result.reserve(allWorkersLumisAndEvents().size());
1022
1023 for (auto const& worker : allWorkersLumisAndEvents()) {
1024 ModuleDescription const* p = worker->description();
1025 result.push_back(p);
1026 }
1027 return result;
1028 }
1029
1030 void StreamSchedule::processOneEventAsync(
1031 WaitingTaskHolder iTask,
1032 EventTransitionInfo& info,
1033 ServiceToken const& serviceToken,
1034 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
1035 EventPrincipal& ep = info.principal();
1036
1037
1038 CMS_SA_ALLOW try {
1039 this->resetAll();
1040
1041 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1042
1043 Traits::setStreamContext(streamContext_, ep);
1044
1045 ServiceRegistry::Operate guard(serviceToken);
1046 Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1047
1048
1049
1050
1051 workerManagerLumisAndEvents_.setupResolvers(ep);
1052 workerManagerLumisAndEvents_.setupOnDemandSystem(info);
1053
1054 HLTPathStatus hltPathStatus(hlt::Pass, 0);
1055 for (int empty_trig_path : empty_trig_paths_) {
1056 results_->at(empty_trig_path) = hltPathStatus;
1057 pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1058 std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1059 ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1060 info, streamID_, ParentContext(&streamContext_), &streamContext_);
1061 if (except) {
1062 iTask.doneWaiting(except);
1063 return;
1064 }
1065 }
1066 if (not endPathStatusInserterWorkers_.empty()) {
1067 for (int empty_end_path : empty_end_paths_) {
1068 std::exception_ptr except =
1069 endPathStatusInserterWorkers_[empty_end_path]
1070 ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1071 info, streamID_, ParentContext(&streamContext_), &streamContext_);
1072 if (except) {
1073 iTask.doneWaiting(except);
1074 return;
1075 }
1076 }
1077 }
1078
1079 ++total_events_;
1080
1081
1082 auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1083 auto pathErrorPtr = pathErrorHolder.get();
1084 ServiceWeakToken weakToken = serviceToken;
1085 auto allPathsDone = make_waiting_task(
1086 [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1087 ServiceRegistry::Operate operate(weakToken.lock());
1088
1089 std::exception_ptr ptr;
1090 if (pathError->load()) {
1091 ptr = *pathError->load();
1092 delete pathError->load();
1093 }
1094 if ((not ptr) and iPtr) {
1095 ptr = *iPtr;
1096 }
1097 iTask.doneWaiting(finishProcessOneEvent(ptr));
1098 });
1099
1100
1101
1102 WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1103
1104 auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1105 std::exception_ptr const* iPtr) mutable {
1106 ServiceRegistry::Operate operate(weakToken.lock());
1107
1108 if (iPtr) {
1109
1110
1111 pathErrorPtr->store(new std::exception_ptr(*iPtr));
1112 }
1113 finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1114 });
1115
1116
1117
1118
1119 WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1120
1121
1122 WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1123 for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1124 it->processEventUsingPathAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1125 }
1126
1127 for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1128 it->processEventUsingPathAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1129 }
1130
1131 ParentContext parentContext(&streamContext_);
1132 workerManagerLumisAndEvents_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1133 hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1134 } catch (...) {
1135 iTask.doneWaiting(std::current_exception());
1136 }
1137 }
1138
1139 void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1140 WaitingTaskHolder iWait,
1141 EventTransitionInfo& info) {
1142 if (iExcept) {
1143
1144 CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1145 exception_actions::ActionCodes action = actionTable().find(e.category());
1146 assert(action != exception_actions::IgnoreCompletely);
1147 if (action == exception_actions::TryToContinue) {
1148 edm::printCmsExceptionWarning("TryToContinue", e);
1149 *(iExcept.load()) = std::exception_ptr();
1150 } else {
1151 *(iExcept.load()) = std::current_exception();
1152 }
1153 } catch (...) {
1154 *(iExcept.load()) = std::current_exception();
1155 }
1156 }
1157
1158 if ((not iExcept) and results_->accept()) {
1159 ++total_passed_;
1160 }
1161
1162 if (nullptr != results_inserter_.get()) {
1163
1164 CMS_SA_ALLOW try {
1165
1166
1167 ParentContext parentContext(&streamContext_);
1168 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1169
1170 auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1171 if (expt) {
1172 std::rethrow_exception(expt);
1173 }
1174 } catch (cms::Exception& ex) {
1175 if (not iExcept) {
1176 if (ex.context().empty()) {
1177 std::ostringstream ost;
1178 ost << "Processing Event " << info.principal().id();
1179 ex.addContext(ost.str());
1180 }
1181 iExcept.store(new std::exception_ptr(std::current_exception()));
1182 }
1183 } catch (...) {
1184 if (not iExcept) {
1185 iExcept.store(new std::exception_ptr(std::current_exception()));
1186 }
1187 }
1188 }
1189 std::exception_ptr ptr;
1190 if (iExcept) {
1191 ptr = *iExcept.load();
1192 }
1193 iWait.doneWaiting(ptr);
1194 }
1195
1196 std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1197 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1198
1199 if (iExcept) {
1200
1201 try {
1202 convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1203 } catch (cms::Exception& ex) {
1204 bool const cleaningUpAfterException = false;
1205 if (ex.context().empty()) {
1206 addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1207 } else {
1208 addContextAndPrintException("", ex, cleaningUpAfterException);
1209 }
1210 iExcept = std::current_exception();
1211 }
1212
1213 actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1214 }
1215
1216 CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1217 if (not iExcept) {
1218 iExcept = std::current_exception();
1219 }
1220 }
1221 if (not iExcept) {
1222 resetEarlyDelete();
1223 }
1224
1225 return iExcept;
1226 }
1227
1228 void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1229 oLabelsToFill.reserve(trig_paths_.size());
1230 std::transform(trig_paths_.begin(),
1231 trig_paths_.end(),
1232 std::back_inserter(oLabelsToFill),
1233 std::bind(&Path::name, std::placeholders::_1));
1234 }
1235
1236 void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1237 TrigPaths::const_iterator itFound = std::find_if(
1238 trig_paths_.begin(),
1239 trig_paths_.end(),
1240 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1241 if (itFound != trig_paths_.end()) {
1242 oLabelsToFill.reserve(itFound->size());
1243 for (size_t i = 0; i < itFound->size(); ++i) {
1244 oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1245 }
1246 }
1247 }
1248
1249 void StreamSchedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1250 std::vector<ModuleDescription const*>& descriptions,
1251 unsigned int hint) const {
1252 descriptions.clear();
1253 bool found = false;
1254 TrigPaths::const_iterator itFound;
1255
1256 if (hint < trig_paths_.size()) {
1257 itFound = trig_paths_.begin() + hint;
1258 if (itFound->name() == iPathLabel)
1259 found = true;
1260 }
1261 if (!found) {
1262
1263 itFound = std::find_if(
1264 trig_paths_.begin(),
1265 trig_paths_.end(),
1266 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1267 if (itFound != trig_paths_.end())
1268 found = true;
1269 }
1270 if (found) {
1271 descriptions.reserve(itFound->size());
1272 for (size_t i = 0; i < itFound->size(); ++i) {
1273 descriptions.push_back(itFound->getWorker(i)->description());
1274 }
1275 }
1276 }
1277
1278 void StreamSchedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1279 std::vector<ModuleDescription const*>& descriptions,
1280 unsigned int hint) const {
1281 descriptions.clear();
1282 bool found = false;
1283 TrigPaths::const_iterator itFound;
1284
1285 if (hint < end_paths_.size()) {
1286 itFound = end_paths_.begin() + hint;
1287 if (itFound->name() == iEndPathLabel)
1288 found = true;
1289 }
1290 if (!found) {
1291
1292 itFound = std::find_if(
1293 end_paths_.begin(),
1294 end_paths_.end(),
1295 std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1296 if (itFound != end_paths_.end())
1297 found = true;
1298 }
1299 if (found) {
1300 descriptions.reserve(itFound->size());
1301 for (size_t i = 0; i < itFound->size(); ++i) {
1302 descriptions.push_back(itFound->getWorker(i)->description());
1303 }
1304 }
1305 }
1306
1307 static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1308 sum.timesVisited += path.timesVisited(which);
1309 sum.timesPassed += path.timesPassed(which);
1310 sum.timesFailed += path.timesFailed(which);
1311 sum.timesExcept += path.timesExcept(which);
1312 sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1313 sum.bitPosition = path.bitPosition(which);
1314 }
1315
1316 static void fillPathSummary(Path const& path, PathSummary& sum) {
1317 sum.name = path.name();
1318 sum.bitPosition = path.bitPosition();
1319 sum.timesRun += path.timesRun();
1320 sum.timesPassed += path.timesPassed();
1321 sum.timesFailed += path.timesFailed();
1322 sum.timesExcept += path.timesExcept();
1323
1324 Path::size_type sz = path.size();
1325 if (sum.moduleInPathSummaries.empty()) {
1326 std::vector<ModuleInPathSummary> temp(sz);
1327 for (size_t i = 0; i != sz; ++i) {
1328 fillModuleInPathSummary(path, i, temp[i]);
1329 }
1330 sum.moduleInPathSummaries.swap(temp);
1331 } else {
1332 assert(sz == sum.moduleInPathSummaries.size());
1333 for (size_t i = 0; i != sz; ++i) {
1334 fillModuleInPathSummary(path, i, sum.moduleInPathSummaries[i]);
1335 }
1336 }
1337 }
1338
1339 static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1340 sum.timesVisited += w.timesVisited();
1341 sum.timesRun += w.timesRun();
1342 sum.timesPassed += w.timesPassed();
1343 sum.timesFailed += w.timesFailed();
1344 sum.timesExcept += w.timesExcept();
1345 sum.moduleLabel = w.description()->moduleLabel();
1346 }
1347
1348 static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1349
1350 void StreamSchedule::getTriggerReport(TriggerReport& rep) const {
1351 rep.eventSummary.totalEvents += totalEvents();
1352 rep.eventSummary.totalEventsPassed += totalEventsPassed();
1353 rep.eventSummary.totalEventsFailed += totalEventsFailed();
1354
1355 fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1356 fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1357 fill_summary(allWorkersLumisAndEvents(), rep.workerSummaries, &fillWorkerSummary);
1358 }
1359
1360 void StreamSchedule::clearCounters() {
1361 using std::placeholders::_1;
1362 total_events_ = total_passed_ = 0;
1363 for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1364 for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1365 for_all(allWorkersLumisAndEvents(), std::bind(&Worker::clearCounters, _1));
1366 }
1367
1368 void StreamSchedule::resetAll() { results_->reset(); }
1369
1370 void StreamSchedule::addToAllWorkers(Worker* w) { workerManagerLumisAndEvents_.addToAllWorkers(w); }
1371
1372 void StreamSchedule::resetEarlyDelete() {
1373
1374 for (auto& count : earlyDeleteBranchToCount_) {
1375 count.count = 0;
1376 }
1377
1378 for (auto& index : earlyDeleteHelperToBranchIndicies_) {
1379 ++(earlyDeleteBranchToCount_[index].count);
1380 }
1381 for (auto& helper : earlyDeleteHelpers_) {
1382 helper.reset();
1383 }
1384 }
1385
1386 void StreamSchedule::makePathStatusInserters(
1387 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1388 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1389 ExceptionToActionTable const& actions) {
1390 int bitpos = 0;
1391 unsigned int indexEmpty = 0;
1392 unsigned int indexOfPath = 0;
1393 for (auto& pathStatusInserter : pathStatusInserters) {
1394 std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1395 WorkerPtr workerPtr(
1396 new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1397 pathStatusInserterWorkers_.emplace_back(workerPtr);
1398 workerPtr->setActivityRegistry(actReg_);
1399 addToAllWorkers(workerPtr.get());
1400
1401
1402
1403
1404 if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1405 ++indexEmpty;
1406 } else {
1407 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1408 ++indexOfPath;
1409 }
1410 ++bitpos;
1411 }
1412
1413 bitpos = 0;
1414 indexEmpty = 0;
1415 indexOfPath = 0;
1416 for (auto& endPathStatusInserter : endPathStatusInserters) {
1417 std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1418 WorkerPtr workerPtr(
1419 new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1420 endPathStatusInserterWorkers_.emplace_back(workerPtr);
1421 workerPtr->setActivityRegistry(actReg_);
1422 addToAllWorkers(workerPtr.get());
1423
1424
1425
1426
1427 if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1428 ++indexEmpty;
1429 } else {
1430 end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1431 ++indexOfPath;
1432 }
1433 ++bitpos;
1434 }
1435 }
1436
1437 void StreamSchedule::handleException(StreamContext const& streamContext,
1438 ServiceWeakToken const& weakToken,
1439 bool cleaningUpAfterException,
1440 std::exception_ptr& excpt) const noexcept {
1441
1442 try {
1443 convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
1444 } catch (cms::Exception& ex) {
1445 std::ostringstream ost;
1446
1447
1448 if (ex.context().empty()) {
1449 exceptionContext(ost, streamContext);
1450 }
1451 ServiceRegistry::Operate op(weakToken.lock());
1452 addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
1453 excpt = std::current_exception();
1454 }
1455
1456
1457 CMS_SA_ALLOW try {
1458 ServiceRegistry::Operate op(weakToken.lock());
1459 actReg_->preStreamEarlyTerminationSignal_(streamContext, TerminationOrigin::ExceptionFromThisContext);
1460 } catch (...) {
1461 }
1462 }
1463 }