File indexing completed on 2024-10-29 06:08:21
0001 #include "FWCore/Framework/interface/StreamSchedule.h"
0002
0003 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0004 #include "DataFormats/Provenance/interface/EventID.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0008 #include "DataFormats/Provenance/interface/Timestamp.h"
0009 #include "FWCore/Framework/src/OutputModuleDescription.h"
0010 #include "FWCore/Framework/interface/TriggerNamesService.h"
0011 #include "FWCore/Framework/src/TriggerReport.h"
0012 #include "FWCore/Framework/src/TriggerTimingReport.h"
0013 #include "FWCore/Framework/src/Factory.h"
0014 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0015 #include "FWCore/Framework/src/TriggerResultInserter.h"
0016 #include "FWCore/Framework/src/PathStatusInserter.h"
0017 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0018 #include "FWCore/Framework/interface/WorkerInPath.h"
0019 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0020 #include "FWCore/Framework/interface/maker/WorkerT.h"
0021 #include "FWCore/Framework/interface/ModuleRegistry.h"
0022 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0023 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0024 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0025 #include "FWCore/ParameterSet/interface/Registry.h"
0026 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0027 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0028 #include "FWCore/Utilities/interface/Algorithms.h"
0029 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0030 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0031 #include "FWCore/Utilities/interface/RunIndex.h"
0032
0033 #include "LuminosityBlockProcessingStatus.h"
0034 #include "processEDAliases.h"
0035
0036 #include <algorithm>
0037 #include <cassert>
0038 #include <cstdlib>
0039 #include <functional>
0040 #include <iomanip>
0041 #include <limits>
0042 #include <list>
0043 #include <map>
0044 #include <fmt/format.h>
0045
0046 namespace edm {
0047
0048 namespace {
0049
0050
0051
0052
0053
0054 template <typename InputIterator, typename ForwardIterator, typename Func>
0055 void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
0056 for (; begin != end; ++begin, ++out)
0057 func(*begin, *out);
0058 }
0059
0060
0061
0062
0063
0064
0065 template <typename FROM, typename TO, typename FUNC>
0066 void fill_summary(FROM const& from, TO& to, FUNC func) {
0067 if (to.size() != from.size()) {
0068 TO temp(from.size());
0069 transform_into(from.begin(), from.end(), temp.begin(), func);
0070 to.swap(temp);
0071 } else {
0072 transform_into(from.begin(), from.end(), to.begin(), func);
0073 }
0074 }
0075
0076 class BeginStreamTraits {
0077 public:
0078 static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0079 activityRegistry->preBeginStreamSignal_(*streamContext);
0080 }
0081 static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0082 activityRegistry->postBeginStreamSignal_(*streamContext);
0083 }
0084 };
0085
0086 class EndStreamTraits {
0087 public:
0088 static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0089 activityRegistry->preEndStreamSignal_(*streamContext);
0090 }
0091 static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0092 activityRegistry->postEndStreamSignal_(*streamContext);
0093 }
0094 };
0095
0096
0097
0098
0099
0100
0101 StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
0102 std::shared_ptr<ActivityRegistry> areg,
0103 std::shared_ptr<TriggerResultInserter> inserter) {
0104 StreamSchedule::WorkerPtr ptr(
0105 new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
0106 ptr->setActivityRegistry(areg);
0107 return ptr;
0108 }
0109
0110 void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
0111 ProductRegistry const& preg,
0112 std::multimap<std::string, Worker*>& branchToReadingWorker) {
0113 auto vBranchesToDeleteEarly = branchesToDeleteEarly;
0114
0115 std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
0116 vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
0117 vBranchesToDeleteEarly.end());
0118
0119
0120 auto allBranchNames = preg.allBranchNames();
0121
0122 for (auto& b : allBranchNames) {
0123 b.resize(b.size() - 1);
0124 }
0125 std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
0126 std::vector<std::string> temp;
0127 temp.reserve(vBranchesToDeleteEarly.size());
0128
0129 std::set_intersection(vBranchesToDeleteEarly.begin(),
0130 vBranchesToDeleteEarly.end(),
0131 allBranchNames.begin(),
0132 allBranchNames.end(),
0133 std::back_inserter(temp));
0134 vBranchesToDeleteEarly.swap(temp);
0135 if (temp.size() != vBranchesToDeleteEarly.size()) {
0136 std::vector<std::string> missingProducts;
0137 std::set_difference(temp.begin(),
0138 temp.end(),
0139 vBranchesToDeleteEarly.begin(),
0140 vBranchesToDeleteEarly.end(),
0141 std::back_inserter(missingProducts));
0142 LogInfo l("MissingProductsForCanDeleteEarly");
0143 l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
0144 for (auto const& n : missingProducts) {
0145 l << "\n " << n;
0146 }
0147 }
0148
0149
0150 for (auto const& branch : vBranchesToDeleteEarly) {
0151 branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
0152 }
0153 }
0154
0155 Worker* getWorker(std::string const& moduleLabel,
0156 ParameterSet& proc_pset,
0157 WorkerManager& workerManager,
0158 ProductRegistry& preg,
0159 PreallocationConfiguration const* prealloc,
0160 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0161 bool isTracked;
0162 ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0163 if (modpset == nullptr) {
0164 return nullptr;
0165 }
0166 assert(isTracked);
0167
0168 return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
0169 }
0170
0171
0172
0173
0174
0175
0176
0177 template <typename T>
0178 auto findConditionalTaskModulesRange(T& modnames) {
0179 auto beg = std::find(modnames.begin(), modnames.end(), "#");
0180 if (beg == modnames.end()) {
0181 return std::pair(modnames.end(), modnames.end());
0182 }
0183 return std::pair(beg + 1, std::prev(modnames.end()));
0184 }
0185
0186 std::optional<std::string> findBestMatchingAlias(
0187 std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0188 std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
0189 std::string const& productModuleLabel,
0190 ConsumesInfo const& consumesInfo) {
0191 std::optional<std::string> best;
0192 int wildcardsInBest = std::numeric_limits<int>::max();
0193 bool bestIsAmbiguous = false;
0194
0195 auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
0196 std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
0197 int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
0198 if (wildcards == 0) {
0199 bestIsAmbiguous = false;
0200 return true;
0201 }
0202 if (not best or wildcards < wildcardsInBest) {
0203 best = label;
0204 wildcardsInBest = wildcards;
0205 bestIsAmbiguous = false;
0206 } else if (best and *best != label and wildcardsInBest == wildcards) {
0207 bestIsAmbiguous = true;
0208 }
0209 return false;
0210 };
0211
0212 auto findAlias = aliasMap.equal_range(productModuleLabel);
0213 for (auto it = findAlias.first; it != findAlias.second; ++it) {
0214 std::string const& aliasInstanceLabel =
0215 it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
0216 bool const instanceIsWildcard = (aliasInstanceLabel == "*");
0217 if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
0218 bool const typeIsWildcard = it->second.friendlyClassName == "*";
0219 if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
0220 if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0221 return it->second.originalModuleLabel;
0222 }
0223 } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
0224
0225
0226 auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
0227 for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
0228 if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
0229 if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
0230 TypeID(itBranch->second->wrappedType().typeInfo()),
0231 itBranch->second->className())) {
0232 if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0233 return it->second.originalModuleLabel;
0234 }
0235 }
0236 }
0237 }
0238 }
0239 }
0240 }
0241 if (bestIsAmbiguous) {
0242 throw Exception(errors::UnimplementedFeature)
0243 << "Encountered ambiguity when trying to find a best-matching alias for\n"
0244 << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
0245 << " module label " << productModuleLabel << "\n"
0246 << " product instance name " << consumesInfo.instance() << "\n"
0247 << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
0248 "wildcards ("
0249 << wildcardsInBest << ")";
0250 }
0251 return best;
0252 }
0253 }
0254
0255
0256
0257 typedef std::vector<std::string> vstring;
0258
0259
0260
0261 class ConditionalTaskHelper {
0262 public:
0263 using AliasInfo = StreamSchedule::AliasInfo;
0264
0265 ConditionalTaskHelper(ParameterSet& proc_pset,
0266 ProductRegistry& preg,
0267 PreallocationConfiguration const* prealloc,
0268 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0269 WorkerManager& workerManagerLumisAndEvents,
0270 std::vector<std::string> const& trigPathNames) {
0271 std::unordered_set<std::string> allConditionalMods;
0272 for (auto const& pathName : trigPathNames) {
0273 auto const modnames = proc_pset.getParameter<vstring>(pathName);
0274
0275
0276 auto condRange = findConditionalTaskModulesRange(modnames);
0277 if (condRange.first == condRange.second)
0278 continue;
0279
0280
0281 allConditionalMods.insert(condRange.first, condRange.second);
0282 }
0283
0284 for (auto const& cond : allConditionalMods) {
0285
0286 (void)getWorker(cond, proc_pset, workerManagerLumisAndEvents, preg, prealloc, processConfiguration);
0287 }
0288
0289 fillAliasMap(proc_pset, allConditionalMods);
0290 processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
0291
0292
0293 for (auto const& prod : preg.productList()) {
0294 if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
0295 conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
0296 }
0297 }
0298 }
0299
0300 std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
0301
0302 std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
0303 std::unordered_set<std::string> const& conditionalmods) const {
0304 std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
0305 for (auto const& mod : conditionalmods) {
0306 auto range = conditionalModsBranches_.equal_range(mod);
0307 ret.insert(range.first, range.second);
0308 }
0309 return ret;
0310 }
0311
0312 private:
0313 void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
0314 auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0315 std::string const star("*");
0316 for (auto const& alias : aliases) {
0317 auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
0318 auto aliasedToModuleLabels = info.getParameterNames();
0319 for (auto const& mod : aliasedToModuleLabels) {
0320 if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
0321 auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
0322 for (auto const& aliasPSet : aliasVPSet) {
0323 std::string type = star;
0324 std::string instance = star;
0325 std::string originalInstance = star;
0326 if (aliasPSet.exists("type")) {
0327 type = aliasPSet.getParameter<std::string>("type");
0328 }
0329 if (aliasPSet.exists("toProductInstance")) {
0330 instance = aliasPSet.getParameter<std::string>("toProductInstance");
0331 }
0332 if (aliasPSet.exists("fromProductInstance")) {
0333 originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
0334 }
0335
0336 aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
0337 }
0338 }
0339 }
0340 }
0341 }
0342
0343 void processSwitchEDAliases(ParameterSet const& proc_pset,
0344 ProductRegistry& preg,
0345 ProcessConfiguration const& processConfiguration,
0346 std::unordered_set<std::string> const& allConditionalMods) {
0347 auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
0348 std::vector<std::string> switchEDAliases;
0349 for (auto const& module : all_modules) {
0350 auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
0351 if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
0352 auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
0353 for (auto const& case_label : all_cases) {
0354 auto range = aliasMap_.equal_range(case_label);
0355 if (range.first != range.second) {
0356 switchEDAliases.push_back(case_label);
0357 }
0358 }
0359 }
0360 }
0361 detail::processEDAliases(
0362 switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
0363 }
0364
0365 std::unordered_multimap<std::string, AliasInfo> aliasMap_;
0366 std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
0367 };
0368
0369
0370
0371 StreamSchedule::StreamSchedule(
0372 std::shared_ptr<TriggerResultInserter> inserter,
0373 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0374 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0375 std::shared_ptr<ModuleRegistry> modReg,
0376 ParameterSet& proc_pset,
0377 service::TriggerNamesService const& tns,
0378 PreallocationConfiguration const& prealloc,
0379 ProductRegistry& preg,
0380 ExceptionToActionTable const& actions,
0381 std::shared_ptr<ActivityRegistry> areg,
0382 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0383 StreamID streamID,
0384 ProcessContext const* processContext)
0385 : workerManagerBeginEnd_(modReg, areg, actions),
0386 workerManagerRuns_(modReg, areg, actions),
0387 workerManagerLumisAndEvents_(modReg, areg, actions),
0388 actReg_(areg),
0389 results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
0390 results_inserter_(),
0391 trig_paths_(),
0392 end_paths_(),
0393 total_events_(),
0394 total_passed_(),
0395 number_of_unscheduled_modules_(0),
0396 streamID_(streamID),
0397 streamContext_(streamID_, processContext) {
0398 bool hasPath = false;
0399 std::vector<std::string> const& pathNames = tns.getTrigPaths();
0400 std::vector<std::string> const& endPathNames = tns.getEndPaths();
0401
0402 ConditionalTaskHelper conditionalTaskHelper(
0403 proc_pset, preg, &prealloc, processConfiguration, workerManagerLumisAndEvents_, pathNames);
0404 std::unordered_set<std::string> conditionalModules;
0405
0406 int trig_bitpos = 0;
0407 trig_paths_.reserve(pathNames.size());
0408 for (auto const& trig_name : pathNames) {
0409 fillTrigPath(proc_pset,
0410 preg,
0411 &prealloc,
0412 processConfiguration,
0413 trig_bitpos,
0414 trig_name,
0415 results(),
0416 endPathNames,
0417 conditionalTaskHelper,
0418 conditionalModules);
0419 ++trig_bitpos;
0420 hasPath = true;
0421 }
0422
0423 if (hasPath) {
0424
0425 inserter->setTrigResultForStream(streamID.value(), results());
0426
0427 results_inserter_ = makeInserter(actions, actReg_, inserter);
0428 addToAllWorkers(results_inserter_.get());
0429 }
0430
0431
0432 int bitpos = 0;
0433 end_paths_.reserve(endPathNames.size());
0434 for (auto const& end_path_name : endPathNames) {
0435 fillEndPath(proc_pset,
0436 preg,
0437 &prealloc,
0438 processConfiguration,
0439 bitpos,
0440 end_path_name,
0441 endPathNames,
0442 conditionalTaskHelper,
0443 conditionalModules);
0444 ++bitpos;
0445 }
0446
0447 makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
0448
0449
0450 std::set<std::string> usedWorkerLabels;
0451 for (auto const& worker : allWorkersLumisAndEvents()) {
0452 usedWorkerLabels.insert(worker->description()->moduleLabel());
0453 }
0454 std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0455 std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0456 std::vector<std::string> unusedLabels;
0457 set_difference(modulesInConfigSet.begin(),
0458 modulesInConfigSet.end(),
0459 usedWorkerLabels.begin(),
0460 usedWorkerLabels.end(),
0461 back_inserter(unusedLabels));
0462 std::set<std::string> unscheduledLabels;
0463 std::vector<std::string> shouldBeUsedLabels;
0464 if (!unusedLabels.empty()) {
0465
0466
0467
0468
0469 for (auto const& label : unusedLabels) {
0470 bool isTracked;
0471 ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
0472 assert(isTracked);
0473 assert(modulePSet != nullptr);
0474 workerManagerLumisAndEvents_.addToUnscheduledWorkers(
0475 *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0476 }
0477 if (!shouldBeUsedLabels.empty()) {
0478 std::ostringstream unusedStream;
0479 unusedStream << "'" << shouldBeUsedLabels.front() << "'";
0480 for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
0481 itLabelEnd = shouldBeUsedLabels.end();
0482 itLabel != itLabelEnd;
0483 ++itLabel) {
0484 unusedStream << ",'" << *itLabel << "'";
0485 }
0486 LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
0487 }
0488 }
0489 number_of_unscheduled_modules_ = unscheduledLabels.size();
0490
0491
0492 if (streamID.value() == 0 and not conditionalModules.empty()) {
0493
0494
0495
0496
0497 std::vector<std::string_view> labelsToPrint;
0498 std::copy_if(
0499 unscheduledLabels.begin(),
0500 unscheduledLabels.end(),
0501 std::back_inserter(labelsToPrint),
0502 [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
0503
0504 if (not labelsToPrint.empty()) {
0505 edm::LogWarning log("NonConsumedConditionalModules");
0506 log << "The following modules were part of some ConditionalTask, but were not\n"
0507 << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
0508 << "was associated. Perhaps they should be either removed from the\n"
0509 << "job, or moved to a Task to make it explicit they are unscheduled.\n";
0510 for (auto const& modLabel : labelsToPrint) {
0511 log.format("\n {}", modLabel);
0512 }
0513 }
0514 }
0515
0516 for (auto const& worker : allWorkersLumisAndEvents()) {
0517 std::string const& moduleLabel = worker->description()->moduleLabel();
0518
0519
0520
0521
0522
0523
0524 Worker* workerBeginEnd =
0525 getWorker(moduleLabel, proc_pset, workerManagerBeginEnd_, preg, &prealloc, processConfiguration);
0526 if (workerBeginEnd) {
0527 workerManagerBeginEnd_.addToAllWorkers(workerBeginEnd);
0528 }
0529
0530 Worker* workerRuns = getWorker(moduleLabel, proc_pset, workerManagerRuns_, preg, &prealloc, processConfiguration);
0531 if (workerRuns) {
0532 workerManagerRuns_.addToAllWorkers(workerRuns);
0533 }
0534 }
0535
0536 }
0537
0538 void StreamSchedule::initializeEarlyDelete(ModuleRegistry& modReg,
0539 std::vector<std::string> const& branchesToDeleteEarly,
0540 std::multimap<std::string, std::string> const& referencesToBranches,
0541 std::vector<std::string> const& modulesToSkip,
0542 edm::ProductRegistry const& preg) {
0543
0544 std::multimap<std::string, Worker*> branchToReadingWorker;
0545 initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
0546
0547 const std::vector<std::string> kEmpty;
0548 std::map<Worker*, unsigned int> reserveSizeForWorker;
0549 unsigned int upperLimitOnReadingWorker = 0;
0550 unsigned int upperLimitOnIndicies = 0;
0551 unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
0552
0553
0554 modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
0555 auto comm = iHolder->createOutputModuleCommunicator();
0556 if (comm) {
0557 if (!branchToReadingWorker.empty()) {
0558
0559
0560 SelectedProductsForBranchType const& kept = comm->keptProducts();
0561 for (auto const& item : kept[InEvent]) {
0562 BranchDescription const& desc = *item.first;
0563 auto found = branchToReadingWorker.equal_range(desc.branchName());
0564 if (found.first != found.second) {
0565 --nUniqueBranchesToDelete;
0566 branchToReadingWorker.erase(found.first, found.second);
0567 }
0568 }
0569 }
0570 }
0571 });
0572
0573 if (branchToReadingWorker.empty()) {
0574 return;
0575 }
0576
0577 std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
0578 for (auto w : allWorkersLumisAndEvents()) {
0579 if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
0580 continue;
0581 }
0582
0583 auto consumes = w->consumesInfo();
0584 if (not consumes.empty()) {
0585 bool foundAtLeastOneMatchingBranch = false;
0586 for (auto const& product : consumes) {
0587 std::string branch = fmt::format("{}_{}_{}_{}",
0588 product.type().friendlyClassName(),
0589 product.label().data(),
0590 product.instance().data(),
0591 product.process().data());
0592 {
0593
0594 auto found = branchToReadingWorker.end();
0595 if (product.process().empty()) {
0596 auto startFound = branchToReadingWorker.lower_bound(branch);
0597 if (startFound != branchToReadingWorker.end()) {
0598 if (startFound->first.substr(0, branch.size()) == branch) {
0599
0600 found = startFound;
0601 }
0602 }
0603 } else {
0604 auto exactFound = branchToReadingWorker.equal_range(branch);
0605 if (exactFound.first != exactFound.second) {
0606 found = exactFound.first;
0607 }
0608 }
0609 if (found != branchToReadingWorker.end()) {
0610 if (not foundAtLeastOneMatchingBranch) {
0611 ++upperLimitOnReadingWorker;
0612 foundAtLeastOneMatchingBranch = true;
0613 }
0614 ++upperLimitOnIndicies;
0615 ++reserveSizeForWorker[w];
0616 if (nullptr == found->second) {
0617 found->second = w;
0618 } else {
0619 branchToReadingWorker.insert(make_pair(found->first, w));
0620 }
0621 }
0622 }
0623 {
0624
0625 auto found = referencesToBranches.end();
0626 if (product.process().empty()) {
0627 auto startFound = referencesToBranches.lower_bound(branch);
0628 if (startFound != referencesToBranches.end()) {
0629 if (startFound->first.substr(0, branch.size()) == branch) {
0630
0631 found = startFound;
0632 }
0633 }
0634 } else {
0635
0636 auto exactFound = referencesToBranches.equal_range(branch);
0637 if (exactFound.first != exactFound.second) {
0638 found = exactFound.first;
0639 }
0640 }
0641 if (found != referencesToBranches.end()) {
0642 for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
0643 auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
0644 if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
0645 continue;
0646 }
0647 if (not foundAtLeastOneMatchingBranch) {
0648 ++upperLimitOnReadingWorker;
0649 foundAtLeastOneMatchingBranch = true;
0650 }
0651 ++upperLimitOnIndicies;
0652 ++reserveSizeForWorker[w];
0653 if (nullptr == foundInBranchToReadingWorker->second) {
0654 foundInBranchToReadingWorker->second = w;
0655 } else {
0656 branchToReadingWorker.insert(make_pair(itr->second, w));
0657 }
0658 }
0659 }
0660 }
0661 }
0662 }
0663 }
0664 {
0665 auto it = branchToReadingWorker.begin();
0666 std::vector<std::string> unusedBranches;
0667 while (it != branchToReadingWorker.end()) {
0668 if (it->second == nullptr) {
0669 unusedBranches.push_back(it->first);
0670
0671 auto temp = it;
0672 ++it;
0673 branchToReadingWorker.erase(temp);
0674 } else {
0675 ++it;
0676 }
0677 }
0678 if (not unusedBranches.empty()) {
0679 LogWarning l("UnusedProductsForCanDeleteEarly");
0680 l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
0681 " If possible, remove the producer from the job.";
0682 for (auto const& n : unusedBranches) {
0683 l << "\n " << n;
0684 }
0685 }
0686 }
0687 if (!branchToReadingWorker.empty()) {
0688 earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
0689 earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
0690 earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
0691 std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
0692 std::string lastBranchName;
0693 size_t nextOpenIndex = 0;
0694 unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
0695 for (auto& branchAndWorker : branchToReadingWorker) {
0696 if (lastBranchName != branchAndWorker.first) {
0697
0698 BranchID bid(branchAndWorker.first + ".");
0699 earlyDeleteBranchToCount_.emplace_back(bid, 0U);
0700 lastBranchName = branchAndWorker.first;
0701 }
0702 auto found = alreadySeenWorkers.find(branchAndWorker.second);
0703 if (alreadySeenWorkers.end() == found) {
0704
0705
0706
0707
0708 size_t index = nextOpenIndex;
0709 size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
0710 assert(index < earlyDeleteHelperToBranchIndicies_.size());
0711 earlyDeleteHelperToBranchIndicies_[index] = earlyDeleteBranchToCount_.size() - 1;
0712 earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
0713 branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
0714 alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
0715 nextOpenIndex += nIndices;
0716 } else {
0717 found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
0718 }
0719 }
0720
0721
0722
0723 auto itLast = earlyDeleteHelpers_.begin();
0724 for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
0725 if (itLast->end() != it->begin()) {
0726
0727 unsigned int delta = it->begin() - itLast->end();
0728 it->shiftIndexPointers(delta);
0729
0730 earlyDeleteHelperToBranchIndicies_.erase(
0731 earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0732 earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
0733 }
0734 itLast = it;
0735 }
0736 earlyDeleteHelperToBranchIndicies_.erase(
0737 earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0738 earlyDeleteHelperToBranchIndicies_.end());
0739
0740
0741 for (auto& p : trig_paths_) {
0742 p.setEarlyDeleteHelpers(alreadySeenWorkers);
0743 }
0744 for (auto& p : end_paths_) {
0745 p.setEarlyDeleteHelpers(alreadySeenWorkers);
0746 }
0747 resetEarlyDelete();
0748 }
0749 }
0750
0751 std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
0752 Worker* worker,
0753 std::unordered_set<std::string>& conditionalModules,
0754 std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0755 std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0756 ParameterSet& proc_pset,
0757 ProductRegistry& preg,
0758 PreallocationConfiguration const* prealloc,
0759 std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0760 std::vector<Worker*> returnValue;
0761 auto const& consumesInfo = worker->consumesInfo();
0762 auto moduleLabel = worker->description()->moduleLabel();
0763 using namespace productholderindexhelper;
0764 for (auto const& ci : consumesInfo) {
0765 if (not ci.skipCurrentProcess() and
0766 (ci.process().empty() or ci.process() == processConfiguration->processName())) {
0767 auto productModuleLabel = std::string(ci.label());
0768 bool productFromConditionalModule = false;
0769 auto itFound = conditionalModules.find(productModuleLabel);
0770 if (itFound == conditionalModules.end()) {
0771
0772
0773 auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
0774 if (foundAlias) {
0775 productModuleLabel = *foundAlias;
0776 productFromConditionalModule = true;
0777 itFound = conditionalModules.find(productModuleLabel);
0778
0779 if (itFound == conditionalModules.end()) {
0780 continue;
0781 }
0782 }
0783 } else {
0784
0785 auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
0786 for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
0787 if (itBranch->second->productInstanceName() == ci.instance()) {
0788 if (ci.kindOfType() == PRODUCT_TYPE) {
0789 if (ci.type() == itBranch->second->unwrappedTypeID()) {
0790 productFromConditionalModule = true;
0791 break;
0792 }
0793 } else {
0794
0795 if (typeIsViewCompatible(
0796 ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
0797 productFromConditionalModule = true;
0798 break;
0799 }
0800 }
0801 }
0802 }
0803 }
0804 if (productFromConditionalModule) {
0805 auto condWorker = getWorker(
0806 productModuleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0807 assert(condWorker);
0808
0809 conditionalModules.erase(itFound);
0810
0811 auto dependents = tryToPlaceConditionalModules(condWorker,
0812 conditionalModules,
0813 conditionalModuleBranches,
0814 aliasMap,
0815 proc_pset,
0816 preg,
0817 prealloc,
0818 processConfiguration);
0819 returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0820 returnValue.push_back(condWorker);
0821 }
0822 }
0823 }
0824 return returnValue;
0825 }
0826
0827 void StreamSchedule::fillWorkers(ParameterSet& proc_pset,
0828 ProductRegistry& preg,
0829 PreallocationConfiguration const* prealloc,
0830 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0831 std::string const& pathName,
0832 bool ignoreFilters,
0833 PathWorkers& out,
0834 std::vector<std::string> const& endPathNames,
0835 ConditionalTaskHelper const& conditionalTaskHelper,
0836 std::unordered_set<std::string>& allConditionalModules) {
0837 vstring modnames = proc_pset.getParameter<vstring>(pathName);
0838 PathWorkers tmpworkers;
0839
0840
0841 auto condRange = findConditionalTaskModulesRange(modnames);
0842
0843 std::unordered_set<std::string> conditionalmods;
0844
0845 std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
0846 std::unordered_map<std::string, unsigned int> conditionalModOrder;
0847 if (condRange.first != condRange.second) {
0848 for (auto it = condRange.first; it != condRange.second; ++it) {
0849
0850 conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
0851 }
0852
0853 conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
0854 std::make_move_iterator(condRange.second));
0855
0856 conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
0857 modnames.erase(std::prev(condRange.first), modnames.end());
0858
0859
0860 allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
0861 }
0862
0863 unsigned int placeInPath = 0;
0864 for (auto const& name : modnames) {
0865
0866 bool doNotRunConcurrently = false;
0867 WorkerInPath::FilterAction filterAction = WorkerInPath::Normal;
0868 if (name[0] == '!') {
0869 filterAction = WorkerInPath::Veto;
0870 } else if (name[0] == '-' or name[0] == '+') {
0871 filterAction = WorkerInPath::Ignore;
0872 }
0873 if (name[0] == '|' or name[0] == '+') {
0874
0875 doNotRunConcurrently = true;
0876 }
0877
0878 std::string moduleLabel = name;
0879 if (filterAction != WorkerInPath::Normal or name[0] == '|') {
0880 moduleLabel.erase(0, 1);
0881 }
0882
0883 Worker* worker =
0884 getWorker(moduleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0885 if (worker == nullptr) {
0886 std::string pathType("endpath");
0887 if (!search_all(endPathNames, pathName)) {
0888 pathType = std::string("path");
0889 }
0890 throw Exception(errors::Configuration)
0891 << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
0892 << "\"\n please check spelling or remove that label from the path.";
0893 }
0894
0895 if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
0896
0897
0898 std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
0899 if (!search_all(allowed_filters, worker->description()->moduleName())) {
0900
0901 filterAction = WorkerInPath::Ignore;
0902 LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
0903 << "' with module label '" << moduleLabel << "' appears on EndPath '"
0904 << pathName << "'.\n"
0905 << "The return value of the filter will be ignored.\n"
0906 << "To suppress this warning, either remove the filter from the endpath,\n"
0907 << "or explicitly ignore it in the configuration by using cms.ignore().\n";
0908 }
0909 }
0910 bool runConcurrently = not doNotRunConcurrently;
0911 if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
0912 runConcurrently = false;
0913 }
0914
0915 auto condModules = tryToPlaceConditionalModules(worker,
0916 conditionalmods,
0917 conditionalModsBranches,
0918 conditionalTaskHelper.aliasMap(),
0919 proc_pset,
0920 preg,
0921 prealloc,
0922 processConfiguration);
0923 for (auto condMod : condModules) {
0924 tmpworkers.emplace_back(
0925 condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
0926 }
0927
0928 tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
0929 ++placeInPath;
0930 }
0931
0932 out.swap(tmpworkers);
0933 }
0934
0935 void StreamSchedule::fillTrigPath(ParameterSet& proc_pset,
0936 ProductRegistry& preg,
0937 PreallocationConfiguration const* prealloc,
0938 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0939 int bitpos,
0940 std::string const& name,
0941 TrigResPtr trptr,
0942 std::vector<std::string> const& endPathNames,
0943 ConditionalTaskHelper const& conditionalTaskHelper,
0944 std::unordered_set<std::string>& allConditionalModules) {
0945 PathWorkers tmpworkers;
0946 fillWorkers(proc_pset,
0947 preg,
0948 prealloc,
0949 processConfiguration,
0950 name,
0951 false,
0952 tmpworkers,
0953 endPathNames,
0954 conditionalTaskHelper,
0955 allConditionalModules);
0956
0957
0958 if (!tmpworkers.empty()) {
0959 trig_paths_.emplace_back(
0960 bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
0961 } else {
0962 empty_trig_paths_.push_back(bitpos);
0963 }
0964 for (WorkerInPath const& workerInPath : tmpworkers) {
0965 addToAllWorkers(workerInPath.getWorker());
0966 }
0967 }
0968
0969 void StreamSchedule::fillEndPath(ParameterSet& proc_pset,
0970 ProductRegistry& preg,
0971 PreallocationConfiguration const* prealloc,
0972 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0973 int bitpos,
0974 std::string const& name,
0975 std::vector<std::string> const& endPathNames,
0976 ConditionalTaskHelper const& conditionalTaskHelper,
0977 std::unordered_set<std::string>& allConditionalModules) {
0978 PathWorkers tmpworkers;
0979 fillWorkers(proc_pset,
0980 preg,
0981 prealloc,
0982 processConfiguration,
0983 name,
0984 true,
0985 tmpworkers,
0986 endPathNames,
0987 conditionalTaskHelper,
0988 allConditionalModules);
0989
0990 if (!tmpworkers.empty()) {
0991 end_paths_.emplace_back(bitpos,
0992 name,
0993 tmpworkers,
0994 TrigResPtr(),
0995 actionTable(),
0996 actReg_,
0997 &streamContext_,
0998 PathContext::PathType::kEndPath);
0999 } else {
1000 empty_end_paths_.push_back(bitpos);
1001 }
1002 for (WorkerInPath const& workerInPath : tmpworkers) {
1003 addToAllWorkers(workerInPath.getWorker());
1004 }
1005 }
1006
1007 void StreamSchedule::beginStream() {
1008 streamContext_.setTransition(StreamContext::Transition::kBeginStream);
1009 streamContext_.setEventID(EventID(0, 0, 0));
1010 streamContext_.setRunIndex(RunIndex::invalidRunIndex());
1011 streamContext_.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
1012 streamContext_.setTimestamp(Timestamp());
1013
1014 std::exception_ptr exceptionInStream;
1015 CMS_SA_ALLOW try {
1016 preScheduleSignal<BeginStreamTraits>(&streamContext_);
1017 workerManagerBeginEnd_.beginStream(streamID_, streamContext_);
1018 } catch (...) {
1019 exceptionInStream = std::current_exception();
1020 }
1021
1022 postScheduleSignal<BeginStreamTraits>(&streamContext_, exceptionInStream);
1023
1024 if (exceptionInStream) {
1025 bool cleaningUpAfterException = false;
1026 handleException(streamContext_, cleaningUpAfterException, exceptionInStream);
1027 }
1028 streamContext_.setTransition(StreamContext::Transition::kInvalid);
1029
1030 if (exceptionInStream) {
1031 std::rethrow_exception(exceptionInStream);
1032 }
1033 }
1034
1035 void StreamSchedule::endStream(ExceptionCollector& collector, std::mutex& collectorMutex) noexcept {
1036 streamContext_.setTransition(StreamContext::Transition::kEndStream);
1037 streamContext_.setEventID(EventID(0, 0, 0));
1038 streamContext_.setRunIndex(RunIndex::invalidRunIndex());
1039 streamContext_.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
1040 streamContext_.setTimestamp(Timestamp());
1041
1042 std::exception_ptr exceptionInStream;
1043 CMS_SA_ALLOW try {
1044 preScheduleSignal<EndStreamTraits>(&streamContext_);
1045 workerManagerBeginEnd_.endStream(streamID_, streamContext_, collector, collectorMutex);
1046 } catch (...) {
1047 exceptionInStream = std::current_exception();
1048 }
1049
1050 postScheduleSignal<EndStreamTraits>(&streamContext_, exceptionInStream);
1051
1052 if (exceptionInStream) {
1053 std::lock_guard<std::mutex> collectorLock(collectorMutex);
1054 collector.call([&exceptionInStream]() { std::rethrow_exception(exceptionInStream); });
1055 }
1056 streamContext_.setTransition(StreamContext::Transition::kInvalid);
1057 }
1058
1059 void StreamSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
1060 for (auto const& worker : allWorkersBeginEnd()) {
1061 if (worker->description()->moduleLabel() == iLabel) {
1062 iMod->replaceModuleFor(worker);
1063
1064 streamContext_.setTransition(StreamContext::Transition::kBeginStream);
1065 streamContext_.setEventID(EventID(0, 0, 0));
1066 streamContext_.setRunIndex(RunIndex::invalidRunIndex());
1067 streamContext_.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
1068 streamContext_.setTimestamp(Timestamp());
1069 try {
1070 worker->beginStream(streamID_, streamContext_);
1071 } catch (cms::Exception& ex) {
1072 streamContext_.setTransition(StreamContext::Transition::kInvalid);
1073 ex.addContext("Executing StreamSchedule::replaceModule");
1074 throw;
1075 }
1076 streamContext_.setTransition(StreamContext::Transition::kInvalid);
1077 break;
1078 }
1079 }
1080
1081 for (auto const& worker : allWorkersRuns()) {
1082 if (worker->description()->moduleLabel() == iLabel) {
1083 iMod->replaceModuleFor(worker);
1084 break;
1085 }
1086 }
1087
1088 for (auto const& worker : allWorkersLumisAndEvents()) {
1089 if (worker->description()->moduleLabel() == iLabel) {
1090 iMod->replaceModuleFor(worker);
1091 break;
1092 }
1093 }
1094 }
1095
1096 void StreamSchedule::deleteModule(std::string const& iLabel) {
1097 workerManagerBeginEnd_.deleteModuleIfExists(iLabel);
1098 workerManagerRuns_.deleteModuleIfExists(iLabel);
1099 workerManagerLumisAndEvents_.deleteModuleIfExists(iLabel);
1100 }
1101
1102 std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
1103 std::vector<ModuleDescription const*> result;
1104 result.reserve(allWorkersLumisAndEvents().size());
1105
1106 for (auto const& worker : allWorkersLumisAndEvents()) {
1107 ModuleDescription const* p = worker->description();
1108 result.push_back(p);
1109 }
1110 return result;
1111 }
1112
1113 void StreamSchedule::processOneEventAsync(
1114 WaitingTaskHolder iTask,
1115 EventTransitionInfo& info,
1116 ServiceToken const& serviceToken,
1117 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
1118 EventPrincipal& ep = info.principal();
1119
1120
1121 CMS_SA_ALLOW try {
1122 this->resetAll();
1123
1124 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1125
1126 Traits::setStreamContext(streamContext_, ep);
1127
1128 ServiceRegistry::Operate guard(serviceToken);
1129 Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1130
1131
1132
1133
1134 workerManagerLumisAndEvents_.setupResolvers(ep);
1135 workerManagerLumisAndEvents_.setupOnDemandSystem(info);
1136
1137 HLTPathStatus hltPathStatus(hlt::Pass, 0);
1138 for (int empty_trig_path : empty_trig_paths_) {
1139 results_->at(empty_trig_path) = hltPathStatus;
1140 pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1141 std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1142 ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1143 info, streamID_, ParentContext(&streamContext_), &streamContext_);
1144 if (except) {
1145 iTask.doneWaiting(except);
1146 return;
1147 }
1148 }
1149 if (not endPathStatusInserterWorkers_.empty()) {
1150 for (int empty_end_path : empty_end_paths_) {
1151 std::exception_ptr except =
1152 endPathStatusInserterWorkers_[empty_end_path]
1153 ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1154 info, streamID_, ParentContext(&streamContext_), &streamContext_);
1155 if (except) {
1156 iTask.doneWaiting(except);
1157 return;
1158 }
1159 }
1160 }
1161
1162 ++total_events_;
1163
1164
1165 auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1166 auto pathErrorPtr = pathErrorHolder.get();
1167 ServiceWeakToken weakToken = serviceToken;
1168 auto allPathsDone = make_waiting_task(
1169 [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1170 ServiceRegistry::Operate operate(weakToken.lock());
1171
1172 std::exception_ptr ptr;
1173 if (pathError->load()) {
1174 ptr = *pathError->load();
1175 delete pathError->load();
1176 }
1177 if ((not ptr) and iPtr) {
1178 ptr = *iPtr;
1179 }
1180 iTask.doneWaiting(finishProcessOneEvent(ptr));
1181 });
1182
1183
1184
1185 WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1186
1187 auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1188 std::exception_ptr const* iPtr) mutable {
1189 ServiceRegistry::Operate operate(weakToken.lock());
1190
1191 if (iPtr) {
1192
1193
1194 auto currentPtr = pathErrorPtr->exchange(new std::exception_ptr(*iPtr));
1195 assert(currentPtr == nullptr);
1196 }
1197 finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1198 });
1199
1200
1201
1202
1203 WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1204
1205
1206 WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1207 for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1208 it->processEventUsingPathAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1209 }
1210
1211 for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1212 it->processEventUsingPathAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1213 }
1214
1215 ParentContext parentContext(&streamContext_);
1216 workerManagerLumisAndEvents_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1217 hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1218 } catch (...) {
1219 iTask.doneWaiting(std::current_exception());
1220 }
1221 }
1222
1223 void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1224 WaitingTaskHolder iWait,
1225 EventTransitionInfo& info) {
1226 if (iExcept) {
1227
1228 CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1229 exception_actions::ActionCodes action = actionTable().find(e.category());
1230 assert(action != exception_actions::IgnoreCompletely);
1231 if (action == exception_actions::TryToContinue) {
1232 edm::printCmsExceptionWarning("TryToContinue", e);
1233 *(iExcept.load()) = std::exception_ptr();
1234 } else {
1235 *(iExcept.load()) = std::current_exception();
1236 }
1237 } catch (...) {
1238 *(iExcept.load()) = std::current_exception();
1239 }
1240 }
1241
1242 if ((not iExcept) and results_->accept()) {
1243 ++total_passed_;
1244 }
1245
1246 if (nullptr != results_inserter_.get()) {
1247
1248 CMS_SA_ALLOW try {
1249
1250
1251 ParentContext parentContext(&streamContext_);
1252 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1253
1254 auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1255 if (expt) {
1256 std::rethrow_exception(expt);
1257 }
1258 } catch (cms::Exception& ex) {
1259 if (not iExcept) {
1260 if (ex.context().empty()) {
1261 std::ostringstream ost;
1262 ost << "Processing Event " << info.principal().id();
1263 ex.addContext(ost.str());
1264 }
1265 iExcept.store(new std::exception_ptr(std::current_exception()));
1266 }
1267 } catch (...) {
1268 if (not iExcept) {
1269 iExcept.store(new std::exception_ptr(std::current_exception()));
1270 }
1271 }
1272 }
1273 std::exception_ptr ptr;
1274 if (iExcept) {
1275 ptr = *iExcept.load();
1276 }
1277 iWait.doneWaiting(ptr);
1278 }
1279
1280 std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1281 using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1282
1283 if (iExcept) {
1284
1285 try {
1286 convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1287 } catch (cms::Exception& ex) {
1288 bool const cleaningUpAfterException = false;
1289 if (ex.context().empty()) {
1290 addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1291 } else {
1292 addContextAndPrintException("", ex, cleaningUpAfterException);
1293 }
1294 iExcept = std::current_exception();
1295 }
1296
1297 actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1298 }
1299
1300 CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1301 if (not iExcept) {
1302 iExcept = std::current_exception();
1303 }
1304 }
1305 if (not iExcept) {
1306 resetEarlyDelete();
1307 }
1308
1309 return iExcept;
1310 }
1311
1312 void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1313 oLabelsToFill.reserve(trig_paths_.size());
1314 std::transform(trig_paths_.begin(),
1315 trig_paths_.end(),
1316 std::back_inserter(oLabelsToFill),
1317 std::bind(&Path::name, std::placeholders::_1));
1318 }
1319
1320 void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1321 TrigPaths::const_iterator itFound = std::find_if(
1322 trig_paths_.begin(),
1323 trig_paths_.end(),
1324 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1325 if (itFound != trig_paths_.end()) {
1326 oLabelsToFill.reserve(itFound->size());
1327 for (size_t i = 0; i < itFound->size(); ++i) {
1328 oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1329 }
1330 }
1331 }
1332
1333 void StreamSchedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1334 std::vector<ModuleDescription const*>& descriptions,
1335 unsigned int hint) const {
1336 descriptions.clear();
1337 bool found = false;
1338 TrigPaths::const_iterator itFound;
1339
1340 if (hint < trig_paths_.size()) {
1341 itFound = trig_paths_.begin() + hint;
1342 if (itFound->name() == iPathLabel)
1343 found = true;
1344 }
1345 if (!found) {
1346
1347 itFound = std::find_if(
1348 trig_paths_.begin(),
1349 trig_paths_.end(),
1350 std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1351 if (itFound != trig_paths_.end())
1352 found = true;
1353 }
1354 if (found) {
1355 descriptions.reserve(itFound->size());
1356 for (size_t i = 0; i < itFound->size(); ++i) {
1357 descriptions.push_back(itFound->getWorker(i)->description());
1358 }
1359 }
1360 }
1361
1362 void StreamSchedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1363 std::vector<ModuleDescription const*>& descriptions,
1364 unsigned int hint) const {
1365 descriptions.clear();
1366 bool found = false;
1367 TrigPaths::const_iterator itFound;
1368
1369 if (hint < end_paths_.size()) {
1370 itFound = end_paths_.begin() + hint;
1371 if (itFound->name() == iEndPathLabel)
1372 found = true;
1373 }
1374 if (!found) {
1375
1376 itFound = std::find_if(
1377 end_paths_.begin(),
1378 end_paths_.end(),
1379 std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1380 if (itFound != end_paths_.end())
1381 found = true;
1382 }
1383 if (found) {
1384 descriptions.reserve(itFound->size());
1385 for (size_t i = 0; i < itFound->size(); ++i) {
1386 descriptions.push_back(itFound->getWorker(i)->description());
1387 }
1388 }
1389 }
1390
1391 static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1392 sum.timesVisited += path.timesVisited(which);
1393 sum.timesPassed += path.timesPassed(which);
1394 sum.timesFailed += path.timesFailed(which);
1395 sum.timesExcept += path.timesExcept(which);
1396 sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1397 sum.bitPosition = path.bitPosition(which);
1398 }
1399
1400 static void fillPathSummary(Path const& path, PathSummary& sum) {
1401 sum.name = path.name();
1402 sum.bitPosition = path.bitPosition();
1403 sum.timesRun += path.timesRun();
1404 sum.timesPassed += path.timesPassed();
1405 sum.timesFailed += path.timesFailed();
1406 sum.timesExcept += path.timesExcept();
1407
1408 Path::size_type sz = path.size();
1409 if (sum.moduleInPathSummaries.empty()) {
1410 std::vector<ModuleInPathSummary> temp(sz);
1411 for (size_t i = 0; i != sz; ++i) {
1412 fillModuleInPathSummary(path, i, temp[i]);
1413 }
1414 sum.moduleInPathSummaries.swap(temp);
1415 } else {
1416 assert(sz == sum.moduleInPathSummaries.size());
1417 for (size_t i = 0; i != sz; ++i) {
1418 fillModuleInPathSummary(path, i, sum.moduleInPathSummaries[i]);
1419 }
1420 }
1421 }
1422
1423 static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1424 sum.timesVisited += w.timesVisited();
1425 sum.timesRun += w.timesRun();
1426 sum.timesPassed += w.timesPassed();
1427 sum.timesFailed += w.timesFailed();
1428 sum.timesExcept += w.timesExcept();
1429 sum.moduleLabel = w.description()->moduleLabel();
1430 }
1431
1432 static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1433
1434 void StreamSchedule::getTriggerReport(TriggerReport& rep) const {
1435 rep.eventSummary.totalEvents += totalEvents();
1436 rep.eventSummary.totalEventsPassed += totalEventsPassed();
1437 rep.eventSummary.totalEventsFailed += totalEventsFailed();
1438
1439 fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1440 fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1441 fill_summary(allWorkersLumisAndEvents(), rep.workerSummaries, &fillWorkerSummary);
1442 }
1443
1444 void StreamSchedule::clearCounters() {
1445 using std::placeholders::_1;
1446 total_events_ = total_passed_ = 0;
1447 for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1448 for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1449 for_all(allWorkersLumisAndEvents(), std::bind(&Worker::clearCounters, _1));
1450 }
1451
1452 void StreamSchedule::resetAll() { results_->reset(); }
1453
1454 void StreamSchedule::addToAllWorkers(Worker* w) { workerManagerLumisAndEvents_.addToAllWorkers(w); }
1455
1456 void StreamSchedule::resetEarlyDelete() {
1457
1458 for (auto& count : earlyDeleteBranchToCount_) {
1459 count.count = 0;
1460 }
1461
1462 for (auto& index : earlyDeleteHelperToBranchIndicies_) {
1463 ++(earlyDeleteBranchToCount_[index].count);
1464 }
1465 for (auto& helper : earlyDeleteHelpers_) {
1466 helper.reset();
1467 }
1468 }
1469
1470 void StreamSchedule::makePathStatusInserters(
1471 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1472 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1473 ExceptionToActionTable const& actions) {
1474 int bitpos = 0;
1475 unsigned int indexEmpty = 0;
1476 unsigned int indexOfPath = 0;
1477 for (auto& pathStatusInserter : pathStatusInserters) {
1478 std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1479 WorkerPtr workerPtr(
1480 new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1481 pathStatusInserterWorkers_.emplace_back(workerPtr);
1482 workerPtr->setActivityRegistry(actReg_);
1483 addToAllWorkers(workerPtr.get());
1484
1485
1486
1487
1488 if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1489 ++indexEmpty;
1490 } else {
1491 trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1492 ++indexOfPath;
1493 }
1494 ++bitpos;
1495 }
1496
1497 bitpos = 0;
1498 indexEmpty = 0;
1499 indexOfPath = 0;
1500 for (auto& endPathStatusInserter : endPathStatusInserters) {
1501 std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1502 WorkerPtr workerPtr(
1503 new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1504 endPathStatusInserterWorkers_.emplace_back(workerPtr);
1505 workerPtr->setActivityRegistry(actReg_);
1506 addToAllWorkers(workerPtr.get());
1507
1508
1509
1510
1511 if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1512 ++indexEmpty;
1513 } else {
1514 end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1515 ++indexOfPath;
1516 }
1517 ++bitpos;
1518 }
1519 }
1520
1521 void StreamSchedule::handleException(StreamContext const& streamContext,
1522 bool cleaningUpAfterException,
1523 std::exception_ptr& excpt) const noexcept {
1524
1525 try {
1526 convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
1527 } catch (cms::Exception& ex) {
1528 std::ostringstream ost;
1529
1530
1531 if (ex.context().empty()) {
1532 exceptionContext(ost, streamContext);
1533 }
1534 addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
1535 excpt = std::current_exception();
1536 }
1537
1538
1539 CMS_SA_ALLOW try {
1540 actReg_->preStreamEarlyTerminationSignal_(streamContext, TerminationOrigin::ExceptionFromThisContext);
1541 } catch (...) {
1542 }
1543 }
1544 }