File indexing completed on 2025-06-29 22:58:05
0001 #include "FWCore/Framework/interface/StreamSchedule.h"
0002
0003 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0004 #include "DataFormats/Provenance/interface/EventID.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0007 #include "DataFormats/Provenance/interface/Timestamp.h"
0008 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0009 #include "FWCore/Framework/src/OutputModuleDescription.h"
0010 #include "FWCore/Framework/interface/TriggerNamesService.h"
0011 #include "FWCore/Framework/src/TriggerReport.h"
0012 #include "FWCore/Framework/src/TriggerTimingReport.h"
0013 #include "FWCore/Framework/src/ModuleHolderFactory.h"
0014 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0015 #include "FWCore/Framework/src/TriggerResultInserter.h"
0016 #include "FWCore/Framework/src/PathStatusInserter.h"
0017 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0018 #include "FWCore/Framework/interface/WorkerInPath.h"
0019 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0020 #include "FWCore/Framework/interface/maker/WorkerT.h"
0021 #include "FWCore/Framework/interface/ModuleRegistry.h"
0022 #include "FWCore/Framework/interface/ModuleRegistryUtilities.h"
0023 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0024 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0025 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0026 #include "FWCore/ParameterSet/interface/Registry.h"
0027 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0028 #include "FWCore/ServiceRegistry/interface/ModuleConsumesInfo.h"
0029 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0030 #include "FWCore/Utilities/interface/Algorithms.h"
0031 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0032 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0033 #include "FWCore/Utilities/interface/RunIndex.h"
0034
0035 #include "LuminosityBlockProcessingStatus.h"
0036 #include "processEDAliases.h"
0037
0038 #include <algorithm>
0039 #include <cassert>
0040 #include <cstdlib>
0041 #include <functional>
0042 #include <iomanip>
0043 #include <limits>
0044 #include <list>
0045 #include <map>
0046 #include <fmt/format.h>
0047
0048 namespace edm {
0049
0050 namespace {
0051
0052
0053
0054
0055
0056 template <typename InputIterator, typename ForwardIterator, typename Func>
0057 void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
0058 for (; begin != end; ++begin, ++out)
0059 func(*begin, *out);
0060 }
0061
0062
0063
0064
0065
0066
0067 template <typename FROM, typename TO, typename FUNC>
0068 void fill_summary(FROM const& from, TO& to, FUNC func) {
0069 if (to.size() != from.size()) {
0070 TO temp(from.size());
0071 transform_into(from.begin(), from.end(), temp.begin(), func);
0072 to.swap(temp);
0073 } else {
0074 transform_into(from.begin(), from.end(), to.begin(), func);
0075 }
0076 }
0077
0078 class BeginStreamTraits {
0079 public:
0080 static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0081 activityRegistry->preBeginStreamSignal_(*streamContext);
0082 }
0083 static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0084 activityRegistry->postBeginStreamSignal_(*streamContext);
0085 }
0086 };
0087
0088 class EndStreamTraits {
0089 public:
0090 static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0091 activityRegistry->preEndStreamSignal_(*streamContext);
0092 }
0093 static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0094 activityRegistry->postEndStreamSignal_(*streamContext);
0095 }
0096 };
0097
0098
0099
0100 void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
0101 ProductRegistry const& preg,
0102 std::multimap<std::string, Worker*>& branchToReadingWorker) {
0103 auto vBranchesToDeleteEarly = branchesToDeleteEarly;
0104
0105 std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
0106 vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
0107 vBranchesToDeleteEarly.end());
0108
0109
0110 auto allBranchNames = preg.allBranchNames();
0111
0112 for (auto& b : allBranchNames) {
0113 b.resize(b.size() - 1);
0114 }
0115 std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
0116 std::vector<std::string> temp;
0117 temp.reserve(vBranchesToDeleteEarly.size());
0118
0119 std::set_intersection(vBranchesToDeleteEarly.begin(),
0120 vBranchesToDeleteEarly.end(),
0121 allBranchNames.begin(),
0122 allBranchNames.end(),
0123 std::back_inserter(temp));
0124 vBranchesToDeleteEarly.swap(temp);
0125 if (temp.size() != vBranchesToDeleteEarly.size()) {
0126 std::vector<std::string> missingProducts;
0127 std::set_difference(temp.begin(),
0128 temp.end(),
0129 vBranchesToDeleteEarly.begin(),
0130 vBranchesToDeleteEarly.end(),
0131 std::back_inserter(missingProducts));
0132 LogInfo l("MissingProductsForCanDeleteEarly");
0133 l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
0134 for (auto const& n : missingProducts) {
0135 l << "\n " << n;
0136 }
0137 }
0138
0139
0140 for (auto const& branch : vBranchesToDeleteEarly) {
0141 branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
0142 }
0143 }
0144
0145 Worker* getWorker(std::string const& moduleLabel,
0146 ParameterSet& proc_pset,
0147 WorkerManager& workerManager,
0148 SignallingProductRegistryFiller& preg,
0149 PreallocationConfiguration const* prealloc,
0150 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0151 bool addToAllWorkers = true) {
0152 bool isTracked;
0153 ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0154 if (modpset == nullptr) {
0155 return nullptr;
0156 }
0157 assert(isTracked);
0158
0159 return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel, addToAllWorkers);
0160 }
0161
0162
0163
0164
0165
0166
0167
0168 template <typename T>
0169 auto findConditionalTaskModulesRange(T& modnames) {
0170 auto beg = std::find(modnames.begin(), modnames.end(), "#");
0171 if (beg == modnames.end()) {
0172 return std::pair(modnames.end(), modnames.end());
0173 }
0174 return std::pair(beg + 1, std::prev(modnames.end()));
0175 }
0176
0177 std::optional<std::string> findBestMatchingAlias(
0178 std::unordered_multimap<std::string, edm::ProductDescription const*> const& conditionalModuleBranches,
0179 std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
0180 std::string const& productModuleLabel,
0181 ModuleConsumesInfo const& consumesInfo) {
0182 std::optional<std::string> best;
0183 int wildcardsInBest = std::numeric_limits<int>::max();
0184 bool bestIsAmbiguous = false;
0185
0186 auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
0187 std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
0188 int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
0189 if (wildcards == 0) {
0190 bestIsAmbiguous = false;
0191 return true;
0192 }
0193 if (not best or wildcards < wildcardsInBest) {
0194 best = label;
0195 wildcardsInBest = wildcards;
0196 bestIsAmbiguous = false;
0197 } else if (best and *best != label and wildcardsInBest == wildcards) {
0198 bestIsAmbiguous = true;
0199 }
0200 return false;
0201 };
0202
0203 auto findAlias = aliasMap.equal_range(productModuleLabel);
0204 for (auto it = findAlias.first; it != findAlias.second; ++it) {
0205 std::string const& aliasInstanceLabel =
0206 it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
0207 bool const instanceIsWildcard = (aliasInstanceLabel == "*");
0208 if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
0209 bool const typeIsWildcard = it->second.friendlyClassName == "*";
0210 if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
0211 if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0212 return it->second.originalModuleLabel;
0213 }
0214 } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
0215
0216
0217 auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
0218 for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
0219 if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
0220 if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
0221 TypeID(itBranch->second->wrappedType().typeInfo()),
0222 itBranch->second->className())) {
0223 if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0224 return it->second.originalModuleLabel;
0225 }
0226 }
0227 }
0228 }
0229 }
0230 }
0231 }
0232 if (bestIsAmbiguous) {
0233 throw Exception(errors::UnimplementedFeature)
0234 << "Encountered ambiguity when trying to find a best-matching alias for\n"
0235 << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
0236 << " module label " << productModuleLabel << "\n"
0237 << " product instance name " << consumesInfo.instance() << "\n"
0238 << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
0239 "wildcards ("
0240 << wildcardsInBest << ")";
0241 }
0242 return best;
0243 }
0244 }
0245
0246
0247
0248 typedef std::vector<std::string> vstring;
0249
0250
0251
0252 class ConditionalTaskHelper {
0253 public:
0254 using AliasInfo = StreamSchedule::AliasInfo;
0255
0256 ConditionalTaskHelper(ParameterSet& proc_pset,
0257 SignallingProductRegistryFiller& preg,
0258 PreallocationConfiguration const* prealloc,
0259 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0260 WorkerManager& workerManagerLumisAndEvents,
0261 std::vector<std::string> const& trigPathNames) {
0262 std::unordered_set<std::string> allConditionalMods;
0263 for (auto const& pathName : trigPathNames) {
0264 auto const modnames = proc_pset.getParameter<vstring>(pathName);
0265
0266
0267 auto condRange = findConditionalTaskModulesRange(modnames);
0268 if (condRange.first == condRange.second)
0269 continue;
0270
0271
0272 allConditionalMods.insert(condRange.first, condRange.second);
0273 }
0274
0275 for (auto const& cond : allConditionalMods) {
0276
0277
0278
0279 (void)getWorker(cond, proc_pset, workerManagerLumisAndEvents, preg, prealloc, processConfiguration, false);
0280 }
0281
0282 fillAliasMap(proc_pset, allConditionalMods);
0283 processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
0284
0285
0286 for (auto const& prod : preg.registry().productList()) {
0287 if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
0288 conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
0289 }
0290 }
0291 }
0292
0293 std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
0294
0295 std::unordered_multimap<std::string, edm::ProductDescription const*> conditionalModuleBranches(
0296 std::unordered_set<std::string> const& conditionalmods) const {
0297 std::unordered_multimap<std::string, edm::ProductDescription const*> ret;
0298 for (auto const& mod : conditionalmods) {
0299 auto range = conditionalModsBranches_.equal_range(mod);
0300 ret.insert(range.first, range.second);
0301 }
0302 return ret;
0303 }
0304
0305 private:
0306 void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
0307 auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0308 std::string const star("*");
0309 for (auto const& alias : aliases) {
0310 auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
0311 auto aliasedToModuleLabels = info.getParameterNames();
0312 for (auto const& mod : aliasedToModuleLabels) {
0313 if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
0314 auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
0315 for (auto const& aliasPSet : aliasVPSet) {
0316 std::string type = star;
0317 std::string instance = star;
0318 std::string originalInstance = star;
0319 if (aliasPSet.exists("type")) {
0320 type = aliasPSet.getParameter<std::string>("type");
0321 }
0322 if (aliasPSet.exists("toProductInstance")) {
0323 instance = aliasPSet.getParameter<std::string>("toProductInstance");
0324 }
0325 if (aliasPSet.exists("fromProductInstance")) {
0326 originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
0327 }
0328
0329 aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
0330 }
0331 }
0332 }
0333 }
0334 }
0335
0336 void processSwitchEDAliases(ParameterSet const& proc_pset,
0337 SignallingProductRegistryFiller& preg,
0338 ProcessConfiguration const& processConfiguration,
0339 std::unordered_set<std::string> const& allConditionalMods) {
0340 auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
0341 std::vector<std::string> switchEDAliases;
0342 for (auto const& module : all_modules) {
0343 auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
0344 if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
0345 auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
0346 for (auto const& case_label : all_cases) {
0347 auto range = aliasMap_.equal_range(case_label);
0348 if (range.first != range.second) {
0349 switchEDAliases.push_back(case_label);
0350 }
0351 }
0352 }
0353 }
0354 detail::processEDAliases(
0355 switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
0356 }
0357
0358 std::unordered_multimap<std::string, AliasInfo> aliasMap_;
0359 std::unordered_multimap<std::string, edm::ProductDescription const*> conditionalModsBranches_;
0360 };
0361
0362
0363
0364 StreamSchedule::StreamSchedule(
0365 std::shared_ptr<TriggerResultInserter> inserter,
0366 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0367 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0368 std::shared_ptr<ModuleRegistry> modReg,
0369 ParameterSet& proc_pset,
0370 service::TriggerNamesService const& tns,
0371 PreallocationConfiguration const& prealloc,
0372 SignallingProductRegistryFiller& preg,
0373 ExceptionToActionTable const& actions,
0374 std::shared_ptr<ActivityRegistry> areg,
0375 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0376 StreamID streamID,
0377 ProcessContext const* processContext)
0378 : workerManagerRuns_(modReg, areg, actions),
0379 workerManagerLumisAndEvents_(modReg, areg, actions),
0380 actReg_(areg),
0381 results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
0382 results_inserter_(),
0383 trig_paths_(),
0384 end_paths_(),
0385 total_events_(),
0386 total_passed_(),
0387 number_of_unscheduled_modules_(0),
0388 streamID_(streamID),
0389 streamContext_(streamID_, processContext) {
0390 bool hasPath = false;
0391 std::vector<std::string> const& pathNames = tns.getTrigPaths();
0392 std::vector<std::string> const& endPathNames = tns.getEndPaths();
0393
0394 ConditionalTaskHelper conditionalTaskHelper(
0395 proc_pset, preg, &prealloc, processConfiguration, workerManagerLumisAndEvents_, pathNames);
0396 std::unordered_set<std::string> conditionalModules;
0397
0398 int trig_bitpos = 0;
0399 trig_paths_.reserve(pathNames.size());
0400 for (auto const& trig_name : pathNames) {
0401 fillTrigPath(proc_pset,
0402 preg,
0403 &prealloc,
0404 processConfiguration,
0405 trig_bitpos,
0406 trig_name,
0407 results(),
0408 endPathNames,
0409 conditionalTaskHelper,
0410 conditionalModules);
0411 ++trig_bitpos;
0412 hasPath = true;
0413 }
0414
0415 if (hasPath) {
0416
0417 inserter->setTrigResultForStream(streamID.value(), results());
0418 results_inserter_ = workerManagerLumisAndEvents_.getWorkerForModule(*inserter);
0419 }
0420
0421
0422 int bitpos = 0;
0423 end_paths_.reserve(endPathNames.size());
0424 for (auto const& end_path_name : endPathNames) {
0425 fillEndPath(proc_pset,
0426 preg,
0427 &prealloc,
0428 processConfiguration,
0429 bitpos,
0430 end_path_name,
0431 endPathNames,
0432 conditionalTaskHelper,
0433 conditionalModules);
0434 ++bitpos;
0435 }
0436
0437 makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
0438
0439
0440 std::set<std::string> usedWorkerLabels;
0441 for (auto const& worker : allWorkersLumisAndEvents()) {
0442 usedWorkerLabels.insert(worker->description()->moduleLabel());
0443 }
0444 std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0445 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0446 std::vector<std::string> unusedLabels;
0447 set_difference(modulesInConfigSet.begin(),
0448 modulesInConfigSet.end(),
0449 usedWorkerLabels.begin(),
0450 usedWorkerLabels.end(),
0451 back_inserter(unusedLabels));
0452 std::set<std::string> unscheduledLabels;
0453 std::vector<std::string> shouldBeUsedLabels;
0454 if (!unusedLabels.empty()) {
0455
0456
0457
0458
0459 for (auto const& label : unusedLabels) {
0460 bool isTracked;
0461 ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
0462 assert(isTracked);
0463 assert(modulePSet != nullptr);
0464 workerManagerLumisAndEvents_.addToUnscheduledWorkers(
0465 *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0466 }
0467 if (!shouldBeUsedLabels.empty()) {
0468 std::ostringstream unusedStream;
0469 unusedStream << "'" << shouldBeUsedLabels.front() << "'";
0470 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
0471 itLabelEnd = shouldBeUsedLabels.end();
0472 itLabel != itLabelEnd;
0473 ++itLabel) {
0474 unusedStream << ",'" << *itLabel << "'";
0475 }
0476 LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
0477 }
0478 }
0479 number_of_unscheduled_modules_ = unscheduledLabels.size();
0480
0481
0482 if (streamID.value() == 0 and not conditionalModules.empty()) {
0483
0484
0485
0486
0487 std::vector<std::string_view> labelsToPrint;
0488 std::copy_if(
0489 unscheduledLabels.begin(),
0490 unscheduledLabels.end(),
0491 std::back_inserter(labelsToPrint),
0492 [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
0493
0494 if (not labelsToPrint.empty()) {
0495 edm::LogWarning log("NonConsumedConditionalModules");
0496 log << "The following modules were part of some ConditionalTask, but were not\n"
0497 << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
0498 << "was associated. Perhaps they should be either removed from the\n"
0499 << "job, or moved to a Task to make it explicit they are unscheduled.\n";
0500 for (auto const& modLabel : labelsToPrint) {
0501 log.format("\n {}", modLabel);
0502 }
0503 }
0504 }
0505
0506 for (auto const& worker : allWorkersLumisAndEvents()) {
0507 std::string const& moduleLabel = worker->description()->moduleLabel();
0508
0509
0510
0511
0512
0513 (void)getWorker(moduleLabel, proc_pset, workerManagerRuns_, preg, &prealloc, processConfiguration);
0514 }
0515
0516 }
0517
0518 void StreamSchedule::initializeEarlyDelete(ModuleRegistry& modReg,
0519 std::vector<std::string> const& branchesToDeleteEarly,
0520 std::multimap<std::string, std::string> const& referencesToBranches,
0521 std::vector<std::string> const& modulesToSkip,
0522 edm::ProductRegistry const& preg) {
0523
0524 std::multimap<std::string, Worker*> branchToReadingWorker;
0525 initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
0526
0527 const std::vector<std::string> kEmpty;
0528 std::map<Worker*, unsigned int> reserveSizeForWorker;
0529 unsigned int upperLimitOnReadingWorker = 0;
0530 unsigned int upperLimitOnIndicies = 0;
0531 unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
0532
0533
0534 modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
0535 auto comm = iHolder->createOutputModuleCommunicator();
0536 if (comm) {
0537 if (!branchToReadingWorker.empty()) {
0538
0539
0540 SelectedProductsForBranchType const& kept = comm->keptProducts();
0541 for (auto const& item : kept[InEvent]) {
0542 ProductDescription const& desc = *item.first;
0543 auto found = branchToReadingWorker.equal_range(desc.branchName());
0544 if (found.first != found.second) {
0545 --nUniqueBranchesToDelete;
0546 branchToReadingWorker.erase(found.first, found.second);
0547 }
0548 }
0549 }
0550 }
0551 });
0552
0553 if (branchToReadingWorker.empty()) {
0554 return;
0555 }
0556
0557 std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
0558 for (auto w : allWorkersLumisAndEvents()) {
0559 if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
0560 continue;
0561 }
0562
0563 auto consumes = w->moduleConsumesInfos();
0564 if (not consumes.empty()) {
0565 bool foundAtLeastOneMatchingBranch = false;
0566 for (auto const& product : consumes) {
0567 std::string branch = fmt::format("{}_{}_{}_{}",
0568 product.type().friendlyClassName(),
0569 product.label().data(),
0570 product.instance().data(),
0571 product.process().data());
0572 {
0573
0574 auto found = branchToReadingWorker.end();
0575 if (product.process().empty()) {
0576 auto startFound = branchToReadingWorker.lower_bound(branch);
0577 if (startFound != branchToReadingWorker.end()) {
0578 if (startFound->first.substr(0, branch.size()) == branch) {
0579
0580 found = startFound;
0581 }
0582 }
0583 } else {
0584 auto exactFound = branchToReadingWorker.equal_range(branch);
0585 if (exactFound.first != exactFound.second) {
0586 found = exactFound.first;
0587 }
0588 }
0589 if (found != branchToReadingWorker.end()) {
0590 if (not foundAtLeastOneMatchingBranch) {
0591 ++upperLimitOnReadingWorker;
0592 foundAtLeastOneMatchingBranch = true;
0593 }
0594 ++upperLimitOnIndicies;
0595 ++reserveSizeForWorker[w];
0596 if (nullptr == found->second) {
0597 found->second = w;
0598 } else {
0599 branchToReadingWorker.insert(make_pair(found->first, w));
0600 }
0601 }
0602 }
0603 {
0604
0605 auto found = referencesToBranches.end();
0606 if (product.process().empty()) {
0607 auto startFound = referencesToBranches.lower_bound(branch);
0608 if (startFound != referencesToBranches.end()) {
0609 if (startFound->first.substr(0, branch.size()) == branch) {
0610
0611 found = startFound;
0612 }
0613 }
0614 } else {
0615
0616 auto exactFound = referencesToBranches.equal_range(branch);
0617 if (exactFound.first != exactFound.second) {
0618 found = exactFound.first;
0619 }
0620 }
0621 if (found != referencesToBranches.end()) {
0622 for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
0623 auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
0624 if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
0625 continue;
0626 }
0627 if (not foundAtLeastOneMatchingBranch) {
0628 ++upperLimitOnReadingWorker;
0629 foundAtLeastOneMatchingBranch = true;
0630 }
0631 ++upperLimitOnIndicies;
0632 ++reserveSizeForWorker[w];
0633 if (nullptr == foundInBranchToReadingWorker->second) {
0634 foundInBranchToReadingWorker->second = w;
0635 } else {
0636 branchToReadingWorker.insert(make_pair(itr->second, w));
0637 }
0638 }
0639 }
0640 }
0641 }
0642 }
0643 }
0644 {
0645 auto it = branchToReadingWorker.begin();
0646 std::vector<std::string> unusedBranches;
0647 while (it != branchToReadingWorker.end()) {
0648 if (it->second == nullptr) {
0649 unusedBranches.push_back(it->first);
0650
0651 auto temp = it;
0652 ++it;
0653 branchToReadingWorker.erase(temp);
0654 } else {
0655 ++it;
0656 }
0657 }
0658 if (not unusedBranches.empty() and streamID_.value() == 0) {
0659 LogWarning l("UnusedProductsForCanDeleteEarly");
0660 l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
0661 " If possible, remove the producer from the job.";
0662 for (auto const& n : unusedBranches) {
0663 l << "\n " << n;
0664 }
0665 }
0666 }
0667 if (!branchToReadingWorker.empty()) {
0668 earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
0669 earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
0670 earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
0671 std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
0672 std::string lastBranchName;
0673 size_t nextOpenIndex = 0;
0674 unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
0675 for (auto& branchAndWorker : branchToReadingWorker) {
0676 if (lastBranchName != branchAndWorker.first) {
0677
0678 BranchID bid(branchAndWorker.first + ".");
0679 earlyDeleteBranchToCount_.emplace_back(bid, 0U);
0680 lastBranchName = branchAndWorker.first;
0681 }
0682 auto found = alreadySeenWorkers.find(branchAndWorker.second);
0683 if (alreadySeenWorkers.end() == found) {
0684
0685
0686
0687
0688 size_t index = nextOpenIndex;
0689 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
0690 assert(index < earlyDeleteHelperToBranchIndicies_.size());
0691 earlyDeleteHelperToBranchIndicies_[index] = earlyDeleteBranchToCount_.size() - 1;
0692 earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
0693 branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
0694 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
0695 nextOpenIndex += nIndices;
0696 } else {
0697 found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
0698 }
0699 }
0700
0701
0702
0703 auto itLast = earlyDeleteHelpers_.begin();
0704 for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
0705 if (itLast->end() != it->begin()) {
0706
0707 unsigned int delta = it->begin() - itLast->end();
0708 it->shiftIndexPointers(delta);
0709
0710 earlyDeleteHelperToBranchIndicies_.erase(
0711 earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0712 earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
0713 }
0714 itLast = it;
0715 }
0716 earlyDeleteHelperToBranchIndicies_.erase(
0717 earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0718 earlyDeleteHelperToBranchIndicies_.end());
0719
0720
0721 for (auto& p : trig_paths_) {
0722 p.setEarlyDeleteHelpers(alreadySeenWorkers);
0723 }
0724 for (auto& p : end_paths_) {
0725 p.setEarlyDeleteHelpers(alreadySeenWorkers);
0726 }
0727 resetEarlyDelete();
0728 }
0729 }
0730
0731 std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
0732 Worker* worker,
0733 std::unordered_set<std::string>& conditionalModules,
0734 std::unordered_multimap<std::string, edm::ProductDescription const*> const& conditionalModuleBranches,
0735 std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0736 ParameterSet& proc_pset,
0737 SignallingProductRegistryFiller& preg,
0738 PreallocationConfiguration const* prealloc,
0739 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0740 std::vector<Worker*> returnValue;
0741 auto const& consumesInfo = worker->moduleConsumesInfos();
0742 auto moduleLabel = worker->description()->moduleLabel();
0743 using namespace productholderindexhelper;
0744 for (auto const& ci : consumesInfo) {
0745 if (not ci.skipCurrentProcess() and
0746 (ci.process().empty() or ci.process() == processConfiguration->processName())) {
0747 auto productModuleLabel = std::string(ci.label());
0748 bool productFromConditionalModule = false;
0749 auto itFound = conditionalModules.find(productModuleLabel);
0750 if (itFound == conditionalModules.end()) {
0751
0752
0753 auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
0754 if (foundAlias) {
0755 productModuleLabel = *foundAlias;
0756 productFromConditionalModule = true;
0757 itFound = conditionalModules.find(productModuleLabel);
0758
0759 if (itFound == conditionalModules.end()) {
0760 continue;
0761 }
0762 }
0763 } else {
0764
0765 auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
0766 for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
0767 if (itBranch->second->productInstanceName() == ci.instance()) {
0768 if (ci.kindOfType() == PRODUCT_TYPE) {
0769 if (ci.type() == itBranch->second->unwrappedTypeID()) {
0770 productFromConditionalModule = true;
0771 break;
0772 }
0773 } else {
0774
0775 if (typeIsViewCompatible(
0776 ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
0777 productFromConditionalModule = true;
0778 break;
0779 }
0780 }
0781 }
0782 }
0783 }
0784 if (productFromConditionalModule) {
0785 auto condWorker = getWorker(
0786 productModuleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0787 assert(condWorker);
0788
0789 conditionalModules.erase(itFound);
0790
0791 auto dependents = tryToPlaceConditionalModules(condWorker,
0792 conditionalModules,
0793 conditionalModuleBranches,
0794 aliasMap,
0795 proc_pset,
0796 preg,
0797 prealloc,
0798 processConfiguration);
0799 returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0800 returnValue.push_back(condWorker);
0801 }
0802 }
0803 }
0804 return returnValue;
0805 }
0806
0807 StreamSchedule::PathWorkers StreamSchedule::fillWorkers(
0808 ParameterSet& proc_pset,
0809 SignallingProductRegistryFiller& preg,
0810 PreallocationConfiguration const* prealloc,
0811 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0812 std::string const& pathName,
0813 bool ignoreFilters,
0814 std::vector<std::string> const& endPathNames,
0815 ConditionalTaskHelper const& conditionalTaskHelper,
0816 std::unordered_set<std::string>& allConditionalModules) {
0817 vstring modnames = proc_pset.getParameter<vstring>(pathName);
0818 PathWorkers tmpworkers;
0819
0820
0821 auto condRange = findConditionalTaskModulesRange(modnames);
0822
0823 std::unordered_set<std::string> conditionalmods;
0824
0825 std::unordered_multimap<std::string, edm::ProductDescription const*> conditionalModsBranches;
0826 std::unordered_map<std::string, unsigned int> conditionalModOrder;
0827 if (condRange.first != condRange.second) {
0828 for (auto it = condRange.first; it != condRange.second; ++it) {
0829
0830 conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
0831 }
0832
0833 conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
0834 std::make_move_iterator(condRange.second));
0835
0836 conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
0837 modnames.erase(std::prev(condRange.first), modnames.end());
0838
0839
0840 allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
0841 }
0842
0843 unsigned int placeInPath = 0;
0844 for (auto const& name : modnames) {
0845
0846 bool doNotRunConcurrently = false;
0847 WorkerInPath::FilterAction filterAction = WorkerInPath::Normal;
0848 if (name[0] == '!') {
0849 filterAction = WorkerInPath::Veto;
0850 } else if (name[0] == '-' or name[0] == '+') {
0851 filterAction = WorkerInPath::Ignore;
0852 }
0853 if (name[0] == '|' or name[0] == '+') {
0854
0855 doNotRunConcurrently = true;
0856 }
0857
0858 std::string moduleLabel = name;
0859 if (filterAction != WorkerInPath::Normal or name[0] == '|') {
0860 moduleLabel.erase(0, 1);
0861 }
0862
0863 Worker* worker =
0864 getWorker(moduleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0865 if (worker == nullptr) {
0866 std::string pathType("endpath");
0867 if (!search_all(endPathNames, pathName)) {
0868 pathType = std::string("path");
0869 }
0870 throw Exception(errors::Configuration)
0871 << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
0872 << "\"\n please check spelling or remove that label from the path.";
0873 }
0874
0875 if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
0876
0877
0878 std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
0879 if (!search_all(allowed_filters, worker->description()->moduleName())) {
0880
0881 filterAction = WorkerInPath::Ignore;
0882 LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
0883 << "' with module label '" << moduleLabel << "' appears on EndPath '"
0884 << pathName << "'.\n"
0885 << "The return value of the filter will be ignored.\n"
0886 << "To suppress this warning, either remove the filter from the endpath,\n"
0887 << "or explicitly ignore it in the configuration by using cms.ignore().\n";
0888 }
0889 }
0890 bool runConcurrently = not doNotRunConcurrently;
0891 if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
0892 runConcurrently = false;
0893 }
0894
0895 auto condModules = tryToPlaceConditionalModules(worker,
0896 conditionalmods,
0897 conditionalModsBranches,
0898 conditionalTaskHelper.aliasMap(),
0899 proc_pset,
0900 preg,
0901 prealloc,
0902 processConfiguration);
0903 for (auto condMod : condModules) {
0904 tmpworkers.emplace_back(
0905 condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
0906 }
0907
0908 tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
0909 ++placeInPath;
0910 }
0911
0912 return tmpworkers;
0913 }
0914
0915 void StreamSchedule::fillTrigPath(ParameterSet& proc_pset,
0916 SignallingProductRegistryFiller& preg,
0917 PreallocationConfiguration const* prealloc,
0918 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0919 int bitpos,
0920 std::string const& name,
0921 TrigResPtr trptr,
0922 std::vector<std::string> const& endPathNames,
0923 ConditionalTaskHelper const& conditionalTaskHelper,
0924 std::unordered_set<std::string>& allConditionalModules) {
0925 PathWorkers tmpworkers = fillWorkers(proc_pset,
0926 preg,
0927 prealloc,
0928 processConfiguration,
0929 name,
0930 false,
0931 endPathNames,
0932 conditionalTaskHelper,
0933 allConditionalModules);
0934
0935
0936 if (!tmpworkers.empty()) {
0937 trig_paths_.emplace_back(
0938 bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
0939 } else {
0940 empty_trig_paths_.push_back(bitpos);
0941 }
0942 }
0943
0944 void StreamSchedule::fillEndPath(ParameterSet& proc_pset,
0945 SignallingProductRegistryFiller& preg,
0946 PreallocationConfiguration const* prealloc,
0947 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0948 int bitpos,
0949 std::string const& name,
0950 std::vector<std::string> const& endPathNames,
0951 ConditionalTaskHelper const& conditionalTaskHelper,
0952 std::unordered_set<std::string>& allConditionalModules) {
0953 PathWorkers tmpworkers = fillWorkers(proc_pset,
0954 preg,
0955 prealloc,
0956 processConfiguration,
0957 name,
0958 true,
0959 endPathNames,
0960 conditionalTaskHelper,
0961 allConditionalModules);
0962
0963 if (!tmpworkers.empty()) {
0964 end_paths_.emplace_back(bitpos,
0965 name,
0966 tmpworkers,
0967 TrigResPtr(),
0968 actionTable(),
0969 actReg_,
0970 &streamContext_,
0971 PathContext::PathType::kEndPath);
0972 } else {
0973 empty_end_paths_.push_back(bitpos);
0974 }
0975 }
0976
0977 void StreamSchedule::beginStream(ModuleRegistry& iModuleRegistry) {
0978 streamContext_.setTransition(StreamContext::Transition::kBeginStream);
0979 streamContext_.setEventID(EventID(0, 0, 0));
0980 streamContext_.setRunIndex(RunIndex::invalidRunIndex());
0981 streamContext_.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0982 streamContext_.setTimestamp(Timestamp());
0983
0984 std::exception_ptr exceptionInStream;
0985 CMS_SA_ALLOW try {
0986 preScheduleSignal<BeginStreamTraits>(&streamContext_);
0987 runBeginStreamForModules(streamContext_, iModuleRegistry, *actReg_, moduleBeginStreamFailed_);
0988 } catch (...) {
0989 exceptionInStream = std::current_exception();
0990 }
0991
0992 postScheduleSignal<BeginStreamTraits>(&streamContext_, exceptionInStream);
0993
0994 if (exceptionInStream) {
0995 bool cleaningUpAfterException = false;
0996 handleException(streamContext_, cleaningUpAfterException, exceptionInStream);
0997 }
0998 streamContext_.setTransition(StreamContext::Transition::kInvalid);
0999
1000 if (exceptionInStream) {
1001 std::rethrow_exception(exceptionInStream);
1002 }
1003 }
1004
1005 void StreamSchedule::endStream(ModuleRegistry& iModuleRegistry,
1006 ExceptionCollector& collector,
1007 std::mutex& collectorMutex) noexcept {
1008 streamContext_.setTransition(StreamContext::Transition::kEndStream);
1009 streamContext_.setEventID(EventID(0, 0, 0));
1010 streamContext_.setRunIndex(RunIndex::invalidRunIndex());
1011 streamContext_.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
1012 streamContext_.setTimestamp(Timestamp());
1013
1014 std::exception_ptr exceptionInStream;
1015 CMS_SA_ALLOW try {
1016 preScheduleSignal<EndStreamTraits>(&streamContext_);
1017 runEndStreamForModules(
1018 streamContext_, iModuleRegistry, *actReg_, collector, collectorMutex, moduleBeginStreamFailed_);
1019 } catch (...) {
1020 exceptionInStream = std::current_exception();
1021 }
1022
1023 postScheduleSignal<EndStreamTraits>(&streamContext_, exceptionInStream);
1024
1025 if (exceptionInStream) {
1026 std::lock_guard<std::mutex> collectorLock(collectorMutex);
1027 collector.call([&exceptionInStream]() { std::rethrow_exception(exceptionInStream); });
1028 }
1029 streamContext_.setTransition(StreamContext::Transition::kInvalid);
1030 }
1031
1032 void StreamSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
1033 for (auto const& worker : allWorkersRuns()) {
1034 if (worker->description()->moduleLabel() == iLabel) {
1035 iMod->replaceModuleFor(worker);
1036 try {
1037 convertException::wrap([&] { iMod->beginStream(streamID_); });
1038 } catch (cms::Exception& ex) {
1039 moduleBeginStreamFailed_.emplace_back(iMod->moduleDescription().id());
1040 ex.addContext("Executing StreamSchedule::replaceModule");
1041 throw;
1042 }
1043 break;
1044 }
1045 }
1046
1047 for (auto const& worker : allWorkersLumisAndEvents()) {
1048 if (worker->description()->moduleLabel() == iLabel) {
1049 iMod->replaceModuleFor(worker);
1050 break;
1051 }
1052 }
1053 }
1054
1055 void StreamSchedule::deleteModule(std::string const& iLabel) {
1056 workerManagerRuns_.deleteModuleIfExists(iLabel);
1057 workerManagerLumisAndEvents_.deleteModuleIfExists(iLabel);
1058 }
1059
1060 std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
1061 std::vector<ModuleDescription const*> result;
1062 result.reserve(allWorkersLumisAndEvents().size());
1063
1064 for (auto const& worker : allWorkersLumisAndEvents()) {
1065 ModuleDescription const* p = worker->description();
1066 result.push_back(p);
1067 }
1068 return result;
1069 }
1070
1071 void StreamSchedule::processOneEventAsync(
1072 WaitingTaskHolder iTask,
1073 EventTransitionInfo& info,
1074 ServiceToken const& serviceToken,
1075 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
1076 EventPrincipal& ep = info.principal();
1077
1078
1079 CMS_SA_ALLOW try {
1080 this->resetAll();
1081
1082 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1083
1084 Traits::setStreamContext(streamContext_, ep);
1085
1086 ServiceRegistry::Operate guard(serviceToken);
1087 Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1088
1089
1090
1091
1092 workerManagerLumisAndEvents_.setupResolvers(ep);
1093 workerManagerLumisAndEvents_.setupOnDemandSystem(info);
1094
1095 HLTPathStatus hltPathStatus(hlt::Pass, 0);
1096 for (int empty_trig_path : empty_trig_paths_) {
1097 results_->at(empty_trig_path) = hltPathStatus;
1098 pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1099 std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1100 ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1101 info, streamID_, ParentContext(&streamContext_), &streamContext_);
1102 if (except) {
1103 iTask.doneWaiting(except);
1104 return;
1105 }
1106 }
1107 if (not endPathStatusInserterWorkers_.empty()) {
1108 for (int empty_end_path : empty_end_paths_) {
1109 std::exception_ptr except =
1110 endPathStatusInserterWorkers_[empty_end_path]
1111 ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1112 info, streamID_, ParentContext(&streamContext_), &streamContext_);
1113 if (except) {
1114 iTask.doneWaiting(except);
1115 return;
1116 }
1117 }
1118 }
1119
1120 ++total_events_;
1121
1122
1123 auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1124 auto pathErrorPtr = pathErrorHolder.get();
1125 ServiceWeakToken weakToken = serviceToken;
1126 auto allPathsDone = make_waiting_task(
1127 [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1128 ServiceRegistry::Operate operate(weakToken.lock());
1129
1130 std::exception_ptr ptr;
1131 if (pathError->load()) {
1132 ptr = *pathError->load();
1133 delete pathError->load();
1134 }
1135 if ((not ptr) and iPtr) {
1136 ptr = *iPtr;
1137 }
1138 iTask.doneWaiting(finishProcessOneEvent(ptr));
1139 });
1140
1141
1142
1143 WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1144
1145 auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1146 std::exception_ptr const* iPtr) mutable {
1147 ServiceRegistry::Operate operate(weakToken.lock());
1148
1149 if (iPtr) {
1150
1151
1152 auto currentPtr = pathErrorPtr->exchange(new std::exception_ptr(*iPtr));
1153 assert(currentPtr == nullptr);
1154 }
1155 finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1156 });
1157
1158
1159
1160
1161 WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1162
1163
1164 WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1165 for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1166 it->processEventUsingPathAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1167 }
1168
1169 for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1170 it->processEventUsingPathAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1171 }
1172
1173 ParentContext parentContext(&streamContext_);
1174 workerManagerLumisAndEvents_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1175 hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1176 } catch (...) {
1177 iTask.doneWaiting(std::current_exception());
1178 }
1179 }
1180
1181 void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1182 WaitingTaskHolder iWait,
1183 EventTransitionInfo& info) {
1184 if (iExcept) {
1185
1186 CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1187 exception_actions::ActionCodes action = actionTable().find(e.category());
1188 assert(action != exception_actions::IgnoreCompletely);
1189 if (action == exception_actions::TryToContinue) {
1190 edm::printCmsExceptionWarning("TryToContinue", e);
1191 *(iExcept.load()) = std::exception_ptr();
1192 } else {
1193 *(iExcept.load()) = std::current_exception();
1194 }
1195 } catch (...) {
1196 *(iExcept.load()) = std::current_exception();
1197 }
1198 }
1199
1200 if ((not iExcept) and results_->accept()) {
1201 ++total_passed_;
1202 }
1203
1204 if (nullptr != results_inserter_.get()) {
1205
1206 CMS_SA_ALLOW try {
1207
1208
1209 ParentContext parentContext(&streamContext_);
1210 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1211
1212 auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1213 if (expt) {
1214 std::rethrow_exception(expt);
1215 }
1216 } catch (cms::Exception& ex) {
1217 if (not iExcept) {
1218 if (ex.context().empty()) {
1219 std::ostringstream ost;
1220 ost << "Processing Event " << info.principal().id();
1221 ex.addContext(ost.str());
1222 }
1223 iExcept.store(new std::exception_ptr(std::current_exception()));
1224 }
1225 } catch (...) {
1226 if (not iExcept) {
1227 iExcept.store(new std::exception_ptr(std::current_exception()));
1228 }
1229 }
1230 }
1231 std::exception_ptr ptr;
1232 if (iExcept) {
1233 ptr = *iExcept.load();
1234 }
1235 iWait.doneWaiting(ptr);
1236 }
1237
1238 std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1239 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1240
1241 if (iExcept) {
1242
1243 try {
1244 convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1245 } catch (cms::Exception& ex) {
1246 bool const cleaningUpAfterException = false;
1247 if (ex.context().empty()) {
1248 addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1249 } else {
1250 addContextAndPrintException("", ex, cleaningUpAfterException);
1251 }
1252 iExcept = std::current_exception();
1253 }
1254
1255 actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1256 }
1257
1258 CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1259 if (not iExcept) {
1260 iExcept = std::current_exception();
1261 }
1262 }
1263 if (not iExcept) {
1264 resetEarlyDelete();
1265 }
1266
1267 return iExcept;
1268 }
1269
1270 void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1271 oLabelsToFill.reserve(trig_paths_.size());
1272 std::transform(trig_paths_.begin(),
1273 trig_paths_.end(),
1274 std::back_inserter(oLabelsToFill),
1275 std::bind(&Path::name, std::placeholders::_1));
1276 }
1277
1278 void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1279 TrigPaths::const_iterator itFound = std::find_if(
1280 trig_paths_.begin(),
1281 trig_paths_.end(),
1282 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1283 if (itFound != trig_paths_.end()) {
1284 oLabelsToFill.reserve(itFound->size());
1285 for (size_t i = 0; i < itFound->size(); ++i) {
1286 oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1287 }
1288 }
1289 }
1290
1291 void StreamSchedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1292 std::vector<ModuleDescription const*>& descriptions,
1293 unsigned int hint) const {
1294 descriptions.clear();
1295 bool found = false;
1296 TrigPaths::const_iterator itFound;
1297
1298 if (hint < trig_paths_.size()) {
1299 itFound = trig_paths_.begin() + hint;
1300 if (itFound->name() == iPathLabel)
1301 found = true;
1302 }
1303 if (!found) {
1304
1305 itFound = std::find_if(
1306 trig_paths_.begin(),
1307 trig_paths_.end(),
1308 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1309 if (itFound != trig_paths_.end())
1310 found = true;
1311 }
1312 if (found) {
1313 descriptions.reserve(itFound->size());
1314 for (size_t i = 0; i < itFound->size(); ++i) {
1315 descriptions.push_back(itFound->getWorker(i)->description());
1316 }
1317 }
1318 }
1319
1320 void StreamSchedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1321 std::vector<ModuleDescription const*>& descriptions,
1322 unsigned int hint) const {
1323 descriptions.clear();
1324 bool found = false;
1325 TrigPaths::const_iterator itFound;
1326
1327 if (hint < end_paths_.size()) {
1328 itFound = end_paths_.begin() + hint;
1329 if (itFound->name() == iEndPathLabel)
1330 found = true;
1331 }
1332 if (!found) {
1333
1334 itFound = std::find_if(
1335 end_paths_.begin(),
1336 end_paths_.end(),
1337 std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1338 if (itFound != end_paths_.end())
1339 found = true;
1340 }
1341 if (found) {
1342 descriptions.reserve(itFound->size());
1343 for (size_t i = 0; i < itFound->size(); ++i) {
1344 descriptions.push_back(itFound->getWorker(i)->description());
1345 }
1346 }
1347 }
1348
1349 static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1350 sum.timesVisited += path.timesVisited(which);
1351 sum.timesPassed += path.timesPassed(which);
1352 sum.timesFailed += path.timesFailed(which);
1353 sum.timesExcept += path.timesExcept(which);
1354 sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1355 sum.bitPosition = path.bitPosition(which);
1356 }
1357
1358 static void fillPathSummary(Path const& path, PathSummary& sum) {
1359 sum.name = path.name();
1360 sum.bitPosition = path.bitPosition();
1361 sum.timesRun += path.timesRun();
1362 sum.timesPassed += path.timesPassed();
1363 sum.timesFailed += path.timesFailed();
1364 sum.timesExcept += path.timesExcept();
1365
1366 Path::size_type sz = path.size();
1367 if (sum.moduleInPathSummaries.empty()) {
1368 std::vector<ModuleInPathSummary> temp(sz);
1369 for (size_t i = 0; i != sz; ++i) {
1370 fillModuleInPathSummary(path, i, temp[i]);
1371 }
1372 sum.moduleInPathSummaries.swap(temp);
1373 } else {
1374 assert(sz == sum.moduleInPathSummaries.size());
1375 for (size_t i = 0; i != sz; ++i) {
1376 fillModuleInPathSummary(path, i, sum.moduleInPathSummaries[i]);
1377 }
1378 }
1379 }
1380
1381 static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1382 sum.timesVisited += w.timesVisited();
1383 sum.timesRun += w.timesRun();
1384 sum.timesPassed += w.timesPassed();
1385 sum.timesFailed += w.timesFailed();
1386 sum.timesExcept += w.timesExcept();
1387 sum.moduleLabel = w.description()->moduleLabel();
1388 }
1389
1390 static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1391
1392 void StreamSchedule::getTriggerReport(TriggerReport& rep) const {
1393 rep.eventSummary.totalEvents += totalEvents();
1394 rep.eventSummary.totalEventsPassed += totalEventsPassed();
1395 rep.eventSummary.totalEventsFailed += totalEventsFailed();
1396
1397 fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1398 fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1399 fill_summary(allWorkersLumisAndEvents(), rep.workerSummaries, &fillWorkerSummary);
1400 }
1401
1402 void StreamSchedule::clearCounters() {
1403 using std::placeholders::_1;
1404 total_events_ = total_passed_ = 0;
1405 for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1406 for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1407 for_all(allWorkersLumisAndEvents(), std::bind(&Worker::clearCounters, _1));
1408 }
1409
1410 void StreamSchedule::resetAll() { results_->reset(); }
1411
1412 void StreamSchedule::addToAllWorkers(Worker* w) { workerManagerLumisAndEvents_.addToAllWorkers(w); }
1413
1414 void StreamSchedule::resetEarlyDelete() {
1415
1416 for (auto& count : earlyDeleteBranchToCount_) {
1417 count.count = 0;
1418 }
1419
1420 for (auto& index : earlyDeleteHelperToBranchIndicies_) {
1421 ++(earlyDeleteBranchToCount_[index].count);
1422 }
1423 for (auto& helper : earlyDeleteHelpers_) {
1424 helper.reset();
1425 }
1426 }
1427
1428 void StreamSchedule::makePathStatusInserters(
1429 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1430 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1431 ExceptionToActionTable const& actions) {
1432 int bitpos = 0;
1433 unsigned int indexEmpty = 0;
1434 unsigned int indexOfPath = 0;
1435 for (auto& pathStatusInserter : pathStatusInserters) {
1436 std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1437 auto workerPtr = workerManagerLumisAndEvents_.getWorkerForModule(*inserterPtr);
1438 pathStatusInserterWorkers_.emplace_back(workerPtr);
1439
1440
1441
1442 if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1443 ++indexEmpty;
1444 } else {
1445 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr);
1446 ++indexOfPath;
1447 }
1448 ++bitpos;
1449 }
1450
1451 bitpos = 0;
1452 indexEmpty = 0;
1453 indexOfPath = 0;
1454 for (auto& endPathStatusInserter : endPathStatusInserters) {
1455 std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1456 auto workerPtr = workerManagerLumisAndEvents_.getWorkerForModule(*inserterPtr);
1457 endPathStatusInserterWorkers_.emplace_back(workerPtr);
1458
1459
1460
1461 if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1462 ++indexEmpty;
1463 } else {
1464 end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr);
1465 ++indexOfPath;
1466 }
1467 ++bitpos;
1468 }
1469 }
1470
1471 void StreamSchedule::handleException(StreamContext const& streamContext,
1472 bool cleaningUpAfterException,
1473 std::exception_ptr& excpt) const noexcept {
1474
1475 try {
1476 convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
1477 } catch (cms::Exception& ex) {
1478 std::ostringstream ost;
1479
1480
1481 if (ex.context().empty()) {
1482 exceptionContext(ost, streamContext);
1483 }
1484 addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
1485 excpt = std::current_exception();
1486 }
1487
1488
1489 CMS_SA_ALLOW try {
1490 actReg_->preStreamEarlyTerminationSignal_(streamContext, TerminationOrigin::ExceptionFromThisContext);
1491 } catch (...) {
1492 }
1493 }
1494 }