Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-29 06:08:21

0001 #include "FWCore/Framework/interface/StreamSchedule.h"
0002 
0003 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0004 #include "DataFormats/Provenance/interface/EventID.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0008 #include "DataFormats/Provenance/interface/Timestamp.h"
0009 #include "FWCore/Framework/src/OutputModuleDescription.h"
0010 #include "FWCore/Framework/interface/TriggerNamesService.h"
0011 #include "FWCore/Framework/src/TriggerReport.h"
0012 #include "FWCore/Framework/src/TriggerTimingReport.h"
0013 #include "FWCore/Framework/src/Factory.h"
0014 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0015 #include "FWCore/Framework/src/TriggerResultInserter.h"
0016 #include "FWCore/Framework/src/PathStatusInserter.h"
0017 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0018 #include "FWCore/Framework/interface/WorkerInPath.h"
0019 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0020 #include "FWCore/Framework/interface/maker/WorkerT.h"
0021 #include "FWCore/Framework/interface/ModuleRegistry.h"
0022 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0023 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0024 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0025 #include "FWCore/ParameterSet/interface/Registry.h"
0026 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0027 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0028 #include "FWCore/Utilities/interface/Algorithms.h"
0029 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0030 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0031 #include "FWCore/Utilities/interface/RunIndex.h"
0032 
0033 #include "LuminosityBlockProcessingStatus.h"
0034 #include "processEDAliases.h"
0035 
0036 #include <algorithm>
0037 #include <cassert>
0038 #include <cstdlib>
0039 #include <functional>
0040 #include <iomanip>
0041 #include <limits>
0042 #include <list>
0043 #include <map>
0044 #include <fmt/format.h>
0045 
0046 namespace edm {
0047 
0048   namespace {
0049 
0050     // Function template to transform each element in the input range to
0051     // a value placed into the output range. The supplied function
0052     // should take a const_reference to the 'input', and write to a
0053     // reference to the 'output'.
0054     template <typename InputIterator, typename ForwardIterator, typename Func>
0055     void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
0056       for (; begin != end; ++begin, ++out)
0057         func(*begin, *out);
0058     }
0059 
0060     // Function template that takes a sequence 'from', a sequence
0061     // 'to', and a callable object 'func'. It and applies
0062     // transform_into to fill the 'to' sequence with the values
0063     // calcuated by the callable object, taking care to fill the
0064     // outupt only if all calls succeed.
0065     template <typename FROM, typename TO, typename FUNC>
0066     void fill_summary(FROM const& from, TO& to, FUNC func) {
0067       if (to.size() != from.size()) {
0068         TO temp(from.size());
0069         transform_into(from.begin(), from.end(), temp.begin(), func);
0070         to.swap(temp);
0071       } else {
0072         transform_into(from.begin(), from.end(), to.begin(), func);
0073       }
0074     }
0075 
0076     class BeginStreamTraits {
0077     public:
0078       static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0079         activityRegistry->preBeginStreamSignal_(*streamContext);
0080       }
0081       static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0082         activityRegistry->postBeginStreamSignal_(*streamContext);
0083       }
0084     };
0085 
0086     class EndStreamTraits {
0087     public:
0088       static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0089         activityRegistry->preEndStreamSignal_(*streamContext);
0090       }
0091       static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0092         activityRegistry->postEndStreamSignal_(*streamContext);
0093       }
0094     };
0095 
0096     // -----------------------------
0097 
0098     // Here we make the trigger results inserter directly.  This should
0099     // probably be a utility in the WorkerRegistry or elsewhere.
0100 
0101     StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
0102                                            std::shared_ptr<ActivityRegistry> areg,
0103                                            std::shared_ptr<TriggerResultInserter> inserter) {
0104       StreamSchedule::WorkerPtr ptr(
0105           new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
0106       ptr->setActivityRegistry(areg);
0107       return ptr;
0108     }
0109 
0110     void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
0111                                          ProductRegistry const& preg,
0112                                          std::multimap<std::string, Worker*>& branchToReadingWorker) {
0113       auto vBranchesToDeleteEarly = branchesToDeleteEarly;
0114       // Remove any duplicates
0115       std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
0116       vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
0117                                    vBranchesToDeleteEarly.end());
0118 
0119       // Are the requested items in the product registry?
0120       auto allBranchNames = preg.allBranchNames();
0121       //the branch names all end with a period, which we do not want to compare with
0122       for (auto& b : allBranchNames) {
0123         b.resize(b.size() - 1);
0124       }
0125       std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
0126       std::vector<std::string> temp;
0127       temp.reserve(vBranchesToDeleteEarly.size());
0128 
0129       std::set_intersection(vBranchesToDeleteEarly.begin(),
0130                             vBranchesToDeleteEarly.end(),
0131                             allBranchNames.begin(),
0132                             allBranchNames.end(),
0133                             std::back_inserter(temp));
0134       vBranchesToDeleteEarly.swap(temp);
0135       if (temp.size() != vBranchesToDeleteEarly.size()) {
0136         std::vector<std::string> missingProducts;
0137         std::set_difference(temp.begin(),
0138                             temp.end(),
0139                             vBranchesToDeleteEarly.begin(),
0140                             vBranchesToDeleteEarly.end(),
0141                             std::back_inserter(missingProducts));
0142         LogInfo l("MissingProductsForCanDeleteEarly");
0143         l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
0144         for (auto const& n : missingProducts) {
0145           l << "\n " << n;
0146         }
0147       }
0148       //set placeholder for the branch, we will remove the nullptr if a
0149       // module actually wants the branch.
0150       for (auto const& branch : vBranchesToDeleteEarly) {
0151         branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
0152       }
0153     }
0154 
0155     Worker* getWorker(std::string const& moduleLabel,
0156                       ParameterSet& proc_pset,
0157                       WorkerManager& workerManager,
0158                       ProductRegistry& preg,
0159                       PreallocationConfiguration const* prealloc,
0160                       std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0161       bool isTracked;
0162       ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0163       if (modpset == nullptr) {
0164         return nullptr;
0165       }
0166       assert(isTracked);
0167 
0168       return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
0169     }
0170 
0171     // If ConditionalTask modules exist in the container of module
0172     // names, returns the range (std::pair) for the modules. The range
0173     // excludes the special markers '#' (right before the
0174     // ConditionalTask modules) and '@' (last element).
0175     // If the module name container does not contain ConditionalTask
0176     // modules, returns std::pair of end iterators.
0177     template <typename T>
0178     auto findConditionalTaskModulesRange(T& modnames) {
0179       auto beg = std::find(modnames.begin(), modnames.end(), "#");
0180       if (beg == modnames.end()) {
0181         return std::pair(modnames.end(), modnames.end());
0182       }
0183       return std::pair(beg + 1, std::prev(modnames.end()));
0184     }
0185 
0186     std::optional<std::string> findBestMatchingAlias(
0187         std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0188         std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
0189         std::string const& productModuleLabel,
0190         ConsumesInfo const& consumesInfo) {
0191       std::optional<std::string> best;
0192       int wildcardsInBest = std::numeric_limits<int>::max();
0193       bool bestIsAmbiguous = false;
0194 
0195       auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
0196                             std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
0197         int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
0198         if (wildcards == 0) {
0199           bestIsAmbiguous = false;
0200           return true;
0201         }
0202         if (not best or wildcards < wildcardsInBest) {
0203           best = label;
0204           wildcardsInBest = wildcards;
0205           bestIsAmbiguous = false;
0206         } else if (best and *best != label and wildcardsInBest == wildcards) {
0207           bestIsAmbiguous = true;
0208         }
0209         return false;
0210       };
0211 
0212       auto findAlias = aliasMap.equal_range(productModuleLabel);
0213       for (auto it = findAlias.first; it != findAlias.second; ++it) {
0214         std::string const& aliasInstanceLabel =
0215             it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
0216         bool const instanceIsWildcard = (aliasInstanceLabel == "*");
0217         if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
0218           bool const typeIsWildcard = it->second.friendlyClassName == "*";
0219           if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
0220             if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0221               return it->second.originalModuleLabel;
0222             }
0223           } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
0224             //consume is a View so need to do more intrusive search
0225             //find matching branches in module
0226             auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
0227             for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
0228               if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
0229                 if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
0230                                                                    TypeID(itBranch->second->wrappedType().typeInfo()),
0231                                                                    itBranch->second->className())) {
0232                   if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0233                     return it->second.originalModuleLabel;
0234                   }
0235                 }
0236               }
0237             }
0238           }
0239         }
0240       }
0241       if (bestIsAmbiguous) {
0242         throw Exception(errors::UnimplementedFeature)
0243             << "Encountered ambiguity when trying to find a best-matching alias for\n"
0244             << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
0245             << " module label " << productModuleLabel << "\n"
0246             << " product instance name " << consumesInfo.instance() << "\n"
0247             << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
0248                "wildcards ("
0249             << wildcardsInBest << ")";
0250       }
0251       return best;
0252     }
0253   }  // namespace
0254 
0255   // -----------------------------
0256 
0257   typedef std::vector<std::string> vstring;
0258 
0259   // -----------------------------
0260 
0261   class ConditionalTaskHelper {
0262   public:
0263     using AliasInfo = StreamSchedule::AliasInfo;
0264 
0265     ConditionalTaskHelper(ParameterSet& proc_pset,
0266                           ProductRegistry& preg,
0267                           PreallocationConfiguration const* prealloc,
0268                           std::shared_ptr<ProcessConfiguration const> processConfiguration,
0269                           WorkerManager& workerManagerLumisAndEvents,
0270                           std::vector<std::string> const& trigPathNames) {
0271       std::unordered_set<std::string> allConditionalMods;
0272       for (auto const& pathName : trigPathNames) {
0273         auto const modnames = proc_pset.getParameter<vstring>(pathName);
0274 
0275         //Pull out ConditionalTask modules
0276         auto condRange = findConditionalTaskModulesRange(modnames);
0277         if (condRange.first == condRange.second)
0278           continue;
0279 
0280         //the last entry should be ignored since it is required to be "@"
0281         allConditionalMods.insert(condRange.first, condRange.second);
0282       }
0283 
0284       for (auto const& cond : allConditionalMods) {
0285         //force the creation of the conditional modules so alias check can work
0286         (void)getWorker(cond, proc_pset, workerManagerLumisAndEvents, preg, prealloc, processConfiguration);
0287       }
0288 
0289       fillAliasMap(proc_pset, allConditionalMods);
0290       processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
0291 
0292       //find branches created by the conditional modules
0293       for (auto const& prod : preg.productList()) {
0294         if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
0295           conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
0296         }
0297       }
0298     }
0299 
0300     std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
0301 
0302     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
0303         std::unordered_set<std::string> const& conditionalmods) const {
0304       std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
0305       for (auto const& mod : conditionalmods) {
0306         auto range = conditionalModsBranches_.equal_range(mod);
0307         ret.insert(range.first, range.second);
0308       }
0309       return ret;
0310     }
0311 
0312   private:
0313     void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
0314       auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0315       std::string const star("*");
0316       for (auto const& alias : aliases) {
0317         auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
0318         auto aliasedToModuleLabels = info.getParameterNames();
0319         for (auto const& mod : aliasedToModuleLabels) {
0320           if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
0321             auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
0322             for (auto const& aliasPSet : aliasVPSet) {
0323               std::string type = star;
0324               std::string instance = star;
0325               std::string originalInstance = star;
0326               if (aliasPSet.exists("type")) {
0327                 type = aliasPSet.getParameter<std::string>("type");
0328               }
0329               if (aliasPSet.exists("toProductInstance")) {
0330                 instance = aliasPSet.getParameter<std::string>("toProductInstance");
0331               }
0332               if (aliasPSet.exists("fromProductInstance")) {
0333                 originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
0334               }
0335 
0336               aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
0337             }
0338           }
0339         }
0340       }
0341     }
0342 
0343     void processSwitchEDAliases(ParameterSet const& proc_pset,
0344                                 ProductRegistry& preg,
0345                                 ProcessConfiguration const& processConfiguration,
0346                                 std::unordered_set<std::string> const& allConditionalMods) {
0347       auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
0348       std::vector<std::string> switchEDAliases;
0349       for (auto const& module : all_modules) {
0350         auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
0351         if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
0352           auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
0353           for (auto const& case_label : all_cases) {
0354             auto range = aliasMap_.equal_range(case_label);
0355             if (range.first != range.second) {
0356               switchEDAliases.push_back(case_label);
0357             }
0358           }
0359         }
0360       }
0361       detail::processEDAliases(
0362           switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
0363     }
0364 
0365     std::unordered_multimap<std::string, AliasInfo> aliasMap_;
0366     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
0367   };
0368 
0369   // -----------------------------
0370 
0371   StreamSchedule::StreamSchedule(
0372       std::shared_ptr<TriggerResultInserter> inserter,
0373       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0374       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0375       std::shared_ptr<ModuleRegistry> modReg,
0376       ParameterSet& proc_pset,
0377       service::TriggerNamesService const& tns,
0378       PreallocationConfiguration const& prealloc,
0379       ProductRegistry& preg,
0380       ExceptionToActionTable const& actions,
0381       std::shared_ptr<ActivityRegistry> areg,
0382       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0383       StreamID streamID,
0384       ProcessContext const* processContext)
0385       : workerManagerBeginEnd_(modReg, areg, actions),
0386         workerManagerRuns_(modReg, areg, actions),
0387         workerManagerLumisAndEvents_(modReg, areg, actions),
0388         actReg_(areg),
0389         results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
0390         results_inserter_(),
0391         trig_paths_(),
0392         end_paths_(),
0393         total_events_(),
0394         total_passed_(),
0395         number_of_unscheduled_modules_(0),
0396         streamID_(streamID),
0397         streamContext_(streamID_, processContext) {
0398     bool hasPath = false;
0399     std::vector<std::string> const& pathNames = tns.getTrigPaths();
0400     std::vector<std::string> const& endPathNames = tns.getEndPaths();
0401 
0402     ConditionalTaskHelper conditionalTaskHelper(
0403         proc_pset, preg, &prealloc, processConfiguration, workerManagerLumisAndEvents_, pathNames);
0404     std::unordered_set<std::string> conditionalModules;
0405 
0406     int trig_bitpos = 0;
0407     trig_paths_.reserve(pathNames.size());
0408     for (auto const& trig_name : pathNames) {
0409       fillTrigPath(proc_pset,
0410                    preg,
0411                    &prealloc,
0412                    processConfiguration,
0413                    trig_bitpos,
0414                    trig_name,
0415                    results(),
0416                    endPathNames,
0417                    conditionalTaskHelper,
0418                    conditionalModules);
0419       ++trig_bitpos;
0420       hasPath = true;
0421     }
0422 
0423     if (hasPath) {
0424       // the results inserter stands alone
0425       inserter->setTrigResultForStream(streamID.value(), results());
0426 
0427       results_inserter_ = makeInserter(actions, actReg_, inserter);
0428       addToAllWorkers(results_inserter_.get());
0429     }
0430 
0431     // fill normal endpaths
0432     int bitpos = 0;
0433     end_paths_.reserve(endPathNames.size());
0434     for (auto const& end_path_name : endPathNames) {
0435       fillEndPath(proc_pset,
0436                   preg,
0437                   &prealloc,
0438                   processConfiguration,
0439                   bitpos,
0440                   end_path_name,
0441                   endPathNames,
0442                   conditionalTaskHelper,
0443                   conditionalModules);
0444       ++bitpos;
0445     }
0446 
0447     makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
0448 
0449     //See if all modules were used
0450     std::set<std::string> usedWorkerLabels;
0451     for (auto const& worker : allWorkersLumisAndEvents()) {
0452       usedWorkerLabels.insert(worker->description()->moduleLabel());
0453     }
0454     std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0455     std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0456     std::vector<std::string> unusedLabels;
0457     set_difference(modulesInConfigSet.begin(),
0458                    modulesInConfigSet.end(),
0459                    usedWorkerLabels.begin(),
0460                    usedWorkerLabels.end(),
0461                    back_inserter(unusedLabels));
0462     std::set<std::string> unscheduledLabels;
0463     std::vector<std::string> shouldBeUsedLabels;
0464     if (!unusedLabels.empty()) {
0465       //Need to
0466       // 1) create worker
0467       // 2) if it is a WorkerT<EDProducer>, add it to our list
0468       // 3) hand list to our delayed reader
0469       for (auto const& label : unusedLabels) {
0470         bool isTracked;
0471         ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
0472         assert(isTracked);
0473         assert(modulePSet != nullptr);
0474         workerManagerLumisAndEvents_.addToUnscheduledWorkers(
0475             *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0476       }
0477       if (!shouldBeUsedLabels.empty()) {
0478         std::ostringstream unusedStream;
0479         unusedStream << "'" << shouldBeUsedLabels.front() << "'";
0480         for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
0481                                                 itLabelEnd = shouldBeUsedLabels.end();
0482              itLabel != itLabelEnd;
0483              ++itLabel) {
0484           unusedStream << ",'" << *itLabel << "'";
0485         }
0486         LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
0487       }
0488     }
0489     number_of_unscheduled_modules_ = unscheduledLabels.size();
0490 
0491     // Print conditional modules that were not consumed in any of their associated Paths
0492     if (streamID.value() == 0 and not conditionalModules.empty()) {
0493       // Intersection of unscheduled and ConditionalTask modules gives
0494       // directly the set of conditional modules that were not
0495       // consumed by anything in the Paths associated to the
0496       // corresponding ConditionalTask.
0497       std::vector<std::string_view> labelsToPrint;
0498       std::copy_if(
0499           unscheduledLabels.begin(),
0500           unscheduledLabels.end(),
0501           std::back_inserter(labelsToPrint),
0502           [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
0503 
0504       if (not labelsToPrint.empty()) {
0505         edm::LogWarning log("NonConsumedConditionalModules");
0506         log << "The following modules were part of some ConditionalTask, but were not\n"
0507             << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
0508             << "was associated. Perhaps they should be either removed from the\n"
0509             << "job, or moved to a Task to make it explicit they are unscheduled.\n";
0510         for (auto const& modLabel : labelsToPrint) {
0511           log.format("\n {}", modLabel);
0512         }
0513       }
0514     }
0515 
0516     for (auto const& worker : allWorkersLumisAndEvents()) {
0517       std::string const& moduleLabel = worker->description()->moduleLabel();
0518 
0519       // The new worker pointers will be null for the TriggerResultsInserter, PathStatusInserter, and
0520       // EndPathStatusInserter because there are no ParameterSets for those in the configuration.
0521       // We could add special code to create workers for those, but instead we skip them because they
0522       // do not have beginStream, endStream, or run/lumi begin/end stream transition functions.
0523 
0524       Worker* workerBeginEnd =
0525           getWorker(moduleLabel, proc_pset, workerManagerBeginEnd_, preg, &prealloc, processConfiguration);
0526       if (workerBeginEnd) {
0527         workerManagerBeginEnd_.addToAllWorkers(workerBeginEnd);
0528       }
0529 
0530       Worker* workerRuns = getWorker(moduleLabel, proc_pset, workerManagerRuns_, preg, &prealloc, processConfiguration);
0531       if (workerRuns) {
0532         workerManagerRuns_.addToAllWorkers(workerRuns);
0533       }
0534     }
0535 
0536   }  // StreamSchedule::StreamSchedule
0537 
0538   void StreamSchedule::initializeEarlyDelete(ModuleRegistry& modReg,
0539                                              std::vector<std::string> const& branchesToDeleteEarly,
0540                                              std::multimap<std::string, std::string> const& referencesToBranches,
0541                                              std::vector<std::string> const& modulesToSkip,
0542                                              edm::ProductRegistry const& preg) {
0543     // setup the list with those products actually registered for this job
0544     std::multimap<std::string, Worker*> branchToReadingWorker;
0545     initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
0546 
0547     const std::vector<std::string> kEmpty;
0548     std::map<Worker*, unsigned int> reserveSizeForWorker;
0549     unsigned int upperLimitOnReadingWorker = 0;
0550     unsigned int upperLimitOnIndicies = 0;
0551     unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
0552 
0553     //talk with output modules first
0554     modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
0555       auto comm = iHolder->createOutputModuleCommunicator();
0556       if (comm) {
0557         if (!branchToReadingWorker.empty()) {
0558           //If an OutputModule needs a product, we can't delete it early
0559           // so we should remove it from our list
0560           SelectedProductsForBranchType const& kept = comm->keptProducts();
0561           for (auto const& item : kept[InEvent]) {
0562             BranchDescription const& desc = *item.first;
0563             auto found = branchToReadingWorker.equal_range(desc.branchName());
0564             if (found.first != found.second) {
0565               --nUniqueBranchesToDelete;
0566               branchToReadingWorker.erase(found.first, found.second);
0567             }
0568           }
0569         }
0570       }
0571     });
0572 
0573     if (branchToReadingWorker.empty()) {
0574       return;
0575     }
0576 
0577     std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
0578     for (auto w : allWorkersLumisAndEvents()) {
0579       if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
0580         continue;
0581       }
0582       //determine if this module could read a branch we want to delete early
0583       auto consumes = w->consumesInfo();
0584       if (not consumes.empty()) {
0585         bool foundAtLeastOneMatchingBranch = false;
0586         for (auto const& product : consumes) {
0587           std::string branch = fmt::format("{}_{}_{}_{}",
0588                                            product.type().friendlyClassName(),
0589                                            product.label().data(),
0590                                            product.instance().data(),
0591                                            product.process().data());
0592           {
0593             //Handle case where worker directly consumes product
0594             auto found = branchToReadingWorker.end();
0595             if (product.process().empty()) {
0596               auto startFound = branchToReadingWorker.lower_bound(branch);
0597               if (startFound != branchToReadingWorker.end()) {
0598                 if (startFound->first.substr(0, branch.size()) == branch) {
0599                   //match all processNames here, even if it means multiple matches will happen
0600                   found = startFound;
0601                 }
0602               }
0603             } else {
0604               auto exactFound = branchToReadingWorker.equal_range(branch);
0605               if (exactFound.first != exactFound.second) {
0606                 found = exactFound.first;
0607               }
0608             }
0609             if (found != branchToReadingWorker.end()) {
0610               if (not foundAtLeastOneMatchingBranch) {
0611                 ++upperLimitOnReadingWorker;
0612                 foundAtLeastOneMatchingBranch = true;
0613               }
0614               ++upperLimitOnIndicies;
0615               ++reserveSizeForWorker[w];
0616               if (nullptr == found->second) {
0617                 found->second = w;
0618               } else {
0619                 branchToReadingWorker.insert(make_pair(found->first, w));
0620               }
0621             }
0622           }
0623           {
0624             //Handle case where indirectly consumes product
0625             auto found = referencesToBranches.end();
0626             if (product.process().empty()) {
0627               auto startFound = referencesToBranches.lower_bound(branch);
0628               if (startFound != referencesToBranches.end()) {
0629                 if (startFound->first.substr(0, branch.size()) == branch) {
0630                   //match all processNames here, even if it means multiple matches will happen
0631                   found = startFound;
0632                 }
0633               }
0634             } else {
0635               //can match exactly
0636               auto exactFound = referencesToBranches.equal_range(branch);
0637               if (exactFound.first != exactFound.second) {
0638                 found = exactFound.first;
0639               }
0640             }
0641             if (found != referencesToBranches.end()) {
0642               for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
0643                 auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
0644                 if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
0645                   continue;
0646                 }
0647                 if (not foundAtLeastOneMatchingBranch) {
0648                   ++upperLimitOnReadingWorker;
0649                   foundAtLeastOneMatchingBranch = true;
0650                 }
0651                 ++upperLimitOnIndicies;
0652                 ++reserveSizeForWorker[w];
0653                 if (nullptr == foundInBranchToReadingWorker->second) {
0654                   foundInBranchToReadingWorker->second = w;
0655                 } else {
0656                   branchToReadingWorker.insert(make_pair(itr->second, w));
0657                 }
0658               }
0659             }
0660           }
0661         }
0662       }
0663     }
0664     {
0665       auto it = branchToReadingWorker.begin();
0666       std::vector<std::string> unusedBranches;
0667       while (it != branchToReadingWorker.end()) {
0668         if (it->second == nullptr) {
0669           unusedBranches.push_back(it->first);
0670           //erasing the object invalidates the iterator so must advance it first
0671           auto temp = it;
0672           ++it;
0673           branchToReadingWorker.erase(temp);
0674         } else {
0675           ++it;
0676         }
0677       }
0678       if (not unusedBranches.empty()) {
0679         LogWarning l("UnusedProductsForCanDeleteEarly");
0680         l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
0681              " If possible, remove the producer from the job.";
0682         for (auto const& n : unusedBranches) {
0683           l << "\n " << n;
0684         }
0685       }
0686     }
0687     if (!branchToReadingWorker.empty()) {
0688       earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
0689       earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
0690       earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
0691       std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
0692       std::string lastBranchName;
0693       size_t nextOpenIndex = 0;
0694       unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
0695       for (auto& branchAndWorker : branchToReadingWorker) {
0696         if (lastBranchName != branchAndWorker.first) {
0697           //have to put back the period we removed earlier in order to get the proper name
0698           BranchID bid(branchAndWorker.first + ".");
0699           earlyDeleteBranchToCount_.emplace_back(bid, 0U);
0700           lastBranchName = branchAndWorker.first;
0701         }
0702         auto found = alreadySeenWorkers.find(branchAndWorker.second);
0703         if (alreadySeenWorkers.end() == found) {
0704           //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
0705           // all the branches that might be read by this worker. However, initially we will only tell the
0706           // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
0707           // EarlyDeleteHelper will automatically advance its internal end pointer.
0708           size_t index = nextOpenIndex;
0709           size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
0710           assert(index < earlyDeleteHelperToBranchIndicies_.size());
0711           earlyDeleteHelperToBranchIndicies_[index] = earlyDeleteBranchToCount_.size() - 1;
0712           earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
0713           branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
0714           alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
0715           nextOpenIndex += nIndices;
0716         } else {
0717           found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
0718         }
0719       }
0720 
0721       //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
0722       // space needed for each module
0723       auto itLast = earlyDeleteHelpers_.begin();
0724       for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
0725         if (itLast->end() != it->begin()) {
0726           //figure the offset for next Worker since it hasn't been moved yet so it has the original address
0727           unsigned int delta = it->begin() - itLast->end();
0728           it->shiftIndexPointers(delta);
0729 
0730           earlyDeleteHelperToBranchIndicies_.erase(
0731               earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0732               earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
0733         }
0734         itLast = it;
0735       }
0736       earlyDeleteHelperToBranchIndicies_.erase(
0737           earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0738           earlyDeleteHelperToBranchIndicies_.end());
0739 
0740       //now tell the paths about the deleters
0741       for (auto& p : trig_paths_) {
0742         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0743       }
0744       for (auto& p : end_paths_) {
0745         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0746       }
0747       resetEarlyDelete();
0748     }
0749   }
0750 
0751   std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
0752       Worker* worker,
0753       std::unordered_set<std::string>& conditionalModules,
0754       std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0755       std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0756       ParameterSet& proc_pset,
0757       ProductRegistry& preg,
0758       PreallocationConfiguration const* prealloc,
0759       std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0760     std::vector<Worker*> returnValue;
0761     auto const& consumesInfo = worker->consumesInfo();
0762     auto moduleLabel = worker->description()->moduleLabel();
0763     using namespace productholderindexhelper;
0764     for (auto const& ci : consumesInfo) {
0765       if (not ci.skipCurrentProcess() and
0766           (ci.process().empty() or ci.process() == processConfiguration->processName())) {
0767         auto productModuleLabel = std::string(ci.label());
0768         bool productFromConditionalModule = false;
0769         auto itFound = conditionalModules.find(productModuleLabel);
0770         if (itFound == conditionalModules.end()) {
0771           //Check to see if this was an alias
0772           //note that aliasMap was previously filtered so only the conditional modules remain there
0773           auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
0774           if (foundAlias) {
0775             productModuleLabel = *foundAlias;
0776             productFromConditionalModule = true;
0777             itFound = conditionalModules.find(productModuleLabel);
0778             //check that the alias-for conditional module has not been used
0779             if (itFound == conditionalModules.end()) {
0780               continue;
0781             }
0782           }
0783         } else {
0784           //need to check the rest of the data product info
0785           auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
0786           for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
0787             if (itBranch->second->productInstanceName() == ci.instance()) {
0788               if (ci.kindOfType() == PRODUCT_TYPE) {
0789                 if (ci.type() == itBranch->second->unwrappedTypeID()) {
0790                   productFromConditionalModule = true;
0791                   break;
0792                 }
0793               } else {
0794                 //this is a view
0795                 if (typeIsViewCompatible(
0796                         ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
0797                   productFromConditionalModule = true;
0798                   break;
0799                 }
0800               }
0801             }
0802           }
0803         }
0804         if (productFromConditionalModule) {
0805           auto condWorker = getWorker(
0806               productModuleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0807           assert(condWorker);
0808 
0809           conditionalModules.erase(itFound);
0810 
0811           auto dependents = tryToPlaceConditionalModules(condWorker,
0812                                                          conditionalModules,
0813                                                          conditionalModuleBranches,
0814                                                          aliasMap,
0815                                                          proc_pset,
0816                                                          preg,
0817                                                          prealloc,
0818                                                          processConfiguration);
0819           returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0820           returnValue.push_back(condWorker);
0821         }
0822       }
0823     }
0824     return returnValue;
0825   }
0826 
0827   void StreamSchedule::fillWorkers(ParameterSet& proc_pset,
0828                                    ProductRegistry& preg,
0829                                    PreallocationConfiguration const* prealloc,
0830                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0831                                    std::string const& pathName,
0832                                    bool ignoreFilters,
0833                                    PathWorkers& out,
0834                                    std::vector<std::string> const& endPathNames,
0835                                    ConditionalTaskHelper const& conditionalTaskHelper,
0836                                    std::unordered_set<std::string>& allConditionalModules) {
0837     vstring modnames = proc_pset.getParameter<vstring>(pathName);
0838     PathWorkers tmpworkers;
0839 
0840     //Pull out ConditionalTask modules
0841     auto condRange = findConditionalTaskModulesRange(modnames);
0842 
0843     std::unordered_set<std::string> conditionalmods;
0844     //An EDAlias may be redirecting to a module on a ConditionalTask
0845     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
0846     std::unordered_map<std::string, unsigned int> conditionalModOrder;
0847     if (condRange.first != condRange.second) {
0848       for (auto it = condRange.first; it != condRange.second; ++it) {
0849         // ordering needs to skip the # token in the path list
0850         conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
0851       }
0852       //the last entry should be ignored since it is required to be "@"
0853       conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
0854                                                         std::make_move_iterator(condRange.second));
0855 
0856       conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
0857       modnames.erase(std::prev(condRange.first), modnames.end());
0858 
0859       // Make a union of all conditional modules from all Paths
0860       allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
0861     }
0862 
0863     unsigned int placeInPath = 0;
0864     for (auto const& name : modnames) {
0865       //Modules except EDFilters are set to run concurrently by default
0866       bool doNotRunConcurrently = false;
0867       WorkerInPath::FilterAction filterAction = WorkerInPath::Normal;
0868       if (name[0] == '!') {
0869         filterAction = WorkerInPath::Veto;
0870       } else if (name[0] == '-' or name[0] == '+') {
0871         filterAction = WorkerInPath::Ignore;
0872       }
0873       if (name[0] == '|' or name[0] == '+') {
0874         //cms.wait was specified so do not run concurrently
0875         doNotRunConcurrently = true;
0876       }
0877 
0878       std::string moduleLabel = name;
0879       if (filterAction != WorkerInPath::Normal or name[0] == '|') {
0880         moduleLabel.erase(0, 1);
0881       }
0882 
0883       Worker* worker =
0884           getWorker(moduleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0885       if (worker == nullptr) {
0886         std::string pathType("endpath");
0887         if (!search_all(endPathNames, pathName)) {
0888           pathType = std::string("path");
0889         }
0890         throw Exception(errors::Configuration)
0891             << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
0892             << "\"\n please check spelling or remove that label from the path.";
0893       }
0894 
0895       if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
0896         // We have a filter on an end path, and the filter is not explicitly ignored.
0897         // See if the filter is allowed.
0898         std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
0899         if (!search_all(allowed_filters, worker->description()->moduleName())) {
0900           // Filter is not allowed. Ignore the result, and issue a warning.
0901           filterAction = WorkerInPath::Ignore;
0902           LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
0903                                         << "' with module label '" << moduleLabel << "' appears on EndPath '"
0904                                         << pathName << "'.\n"
0905                                         << "The return value of the filter will be ignored.\n"
0906                                         << "To suppress this warning, either remove the filter from the endpath,\n"
0907                                         << "or explicitly ignore it in the configuration by using cms.ignore().\n";
0908         }
0909       }
0910       bool runConcurrently = not doNotRunConcurrently;
0911       if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
0912         runConcurrently = false;
0913       }
0914 
0915       auto condModules = tryToPlaceConditionalModules(worker,
0916                                                       conditionalmods,
0917                                                       conditionalModsBranches,
0918                                                       conditionalTaskHelper.aliasMap(),
0919                                                       proc_pset,
0920                                                       preg,
0921                                                       prealloc,
0922                                                       processConfiguration);
0923       for (auto condMod : condModules) {
0924         tmpworkers.emplace_back(
0925             condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
0926       }
0927 
0928       tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
0929       ++placeInPath;
0930     }
0931 
0932     out.swap(tmpworkers);
0933   }
0934 
0935   void StreamSchedule::fillTrigPath(ParameterSet& proc_pset,
0936                                     ProductRegistry& preg,
0937                                     PreallocationConfiguration const* prealloc,
0938                                     std::shared_ptr<ProcessConfiguration const> processConfiguration,
0939                                     int bitpos,
0940                                     std::string const& name,
0941                                     TrigResPtr trptr,
0942                                     std::vector<std::string> const& endPathNames,
0943                                     ConditionalTaskHelper const& conditionalTaskHelper,
0944                                     std::unordered_set<std::string>& allConditionalModules) {
0945     PathWorkers tmpworkers;
0946     fillWorkers(proc_pset,
0947                 preg,
0948                 prealloc,
0949                 processConfiguration,
0950                 name,
0951                 false,
0952                 tmpworkers,
0953                 endPathNames,
0954                 conditionalTaskHelper,
0955                 allConditionalModules);
0956 
0957     // an empty path will cause an extra bit that is not used
0958     if (!tmpworkers.empty()) {
0959       trig_paths_.emplace_back(
0960           bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
0961     } else {
0962       empty_trig_paths_.push_back(bitpos);
0963     }
0964     for (WorkerInPath const& workerInPath : tmpworkers) {
0965       addToAllWorkers(workerInPath.getWorker());
0966     }
0967   }
0968 
0969   void StreamSchedule::fillEndPath(ParameterSet& proc_pset,
0970                                    ProductRegistry& preg,
0971                                    PreallocationConfiguration const* prealloc,
0972                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0973                                    int bitpos,
0974                                    std::string const& name,
0975                                    std::vector<std::string> const& endPathNames,
0976                                    ConditionalTaskHelper const& conditionalTaskHelper,
0977                                    std::unordered_set<std::string>& allConditionalModules) {
0978     PathWorkers tmpworkers;
0979     fillWorkers(proc_pset,
0980                 preg,
0981                 prealloc,
0982                 processConfiguration,
0983                 name,
0984                 true,
0985                 tmpworkers,
0986                 endPathNames,
0987                 conditionalTaskHelper,
0988                 allConditionalModules);
0989 
0990     if (!tmpworkers.empty()) {
0991       end_paths_.emplace_back(bitpos,
0992                               name,
0993                               tmpworkers,
0994                               TrigResPtr(),
0995                               actionTable(),
0996                               actReg_,
0997                               &streamContext_,
0998                               PathContext::PathType::kEndPath);
0999     } else {
1000       empty_end_paths_.push_back(bitpos);
1001     }
1002     for (WorkerInPath const& workerInPath : tmpworkers) {
1003       addToAllWorkers(workerInPath.getWorker());
1004     }
1005   }
1006 
1007   void StreamSchedule::beginStream() {
1008     streamContext_.setTransition(StreamContext::Transition::kBeginStream);
1009     streamContext_.setEventID(EventID(0, 0, 0));
1010     streamContext_.setRunIndex(RunIndex::invalidRunIndex());
1011     streamContext_.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
1012     streamContext_.setTimestamp(Timestamp());
1013 
1014     std::exception_ptr exceptionInStream;
1015     CMS_SA_ALLOW try {
1016       preScheduleSignal<BeginStreamTraits>(&streamContext_);
1017       workerManagerBeginEnd_.beginStream(streamID_, streamContext_);
1018     } catch (...) {
1019       exceptionInStream = std::current_exception();
1020     }
1021 
1022     postScheduleSignal<BeginStreamTraits>(&streamContext_, exceptionInStream);
1023 
1024     if (exceptionInStream) {
1025       bool cleaningUpAfterException = false;
1026       handleException(streamContext_, cleaningUpAfterException, exceptionInStream);
1027     }
1028     streamContext_.setTransition(StreamContext::Transition::kInvalid);
1029 
1030     if (exceptionInStream) {
1031       std::rethrow_exception(exceptionInStream);
1032     }
1033   }
1034 
1035   void StreamSchedule::endStream(ExceptionCollector& collector, std::mutex& collectorMutex) noexcept {
1036     streamContext_.setTransition(StreamContext::Transition::kEndStream);
1037     streamContext_.setEventID(EventID(0, 0, 0));
1038     streamContext_.setRunIndex(RunIndex::invalidRunIndex());
1039     streamContext_.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
1040     streamContext_.setTimestamp(Timestamp());
1041 
1042     std::exception_ptr exceptionInStream;
1043     CMS_SA_ALLOW try {
1044       preScheduleSignal<EndStreamTraits>(&streamContext_);
1045       workerManagerBeginEnd_.endStream(streamID_, streamContext_, collector, collectorMutex);
1046     } catch (...) {
1047       exceptionInStream = std::current_exception();
1048     }
1049 
1050     postScheduleSignal<EndStreamTraits>(&streamContext_, exceptionInStream);
1051 
1052     if (exceptionInStream) {
1053       std::lock_guard<std::mutex> collectorLock(collectorMutex);
1054       collector.call([&exceptionInStream]() { std::rethrow_exception(exceptionInStream); });
1055     }
1056     streamContext_.setTransition(StreamContext::Transition::kInvalid);
1057   }
1058 
1059   void StreamSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
1060     for (auto const& worker : allWorkersBeginEnd()) {
1061       if (worker->description()->moduleLabel() == iLabel) {
1062         iMod->replaceModuleFor(worker);
1063 
1064         streamContext_.setTransition(StreamContext::Transition::kBeginStream);
1065         streamContext_.setEventID(EventID(0, 0, 0));
1066         streamContext_.setRunIndex(RunIndex::invalidRunIndex());
1067         streamContext_.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
1068         streamContext_.setTimestamp(Timestamp());
1069         try {
1070           worker->beginStream(streamID_, streamContext_);
1071         } catch (cms::Exception& ex) {
1072           streamContext_.setTransition(StreamContext::Transition::kInvalid);
1073           ex.addContext("Executing StreamSchedule::replaceModule");
1074           throw;
1075         }
1076         streamContext_.setTransition(StreamContext::Transition::kInvalid);
1077         break;
1078       }
1079     }
1080 
1081     for (auto const& worker : allWorkersRuns()) {
1082       if (worker->description()->moduleLabel() == iLabel) {
1083         iMod->replaceModuleFor(worker);
1084         break;
1085       }
1086     }
1087 
1088     for (auto const& worker : allWorkersLumisAndEvents()) {
1089       if (worker->description()->moduleLabel() == iLabel) {
1090         iMod->replaceModuleFor(worker);
1091         break;
1092       }
1093     }
1094   }
1095 
1096   void StreamSchedule::deleteModule(std::string const& iLabel) {
1097     workerManagerBeginEnd_.deleteModuleIfExists(iLabel);
1098     workerManagerRuns_.deleteModuleIfExists(iLabel);
1099     workerManagerLumisAndEvents_.deleteModuleIfExists(iLabel);
1100   }
1101 
1102   std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
1103     std::vector<ModuleDescription const*> result;
1104     result.reserve(allWorkersLumisAndEvents().size());
1105 
1106     for (auto const& worker : allWorkersLumisAndEvents()) {
1107       ModuleDescription const* p = worker->description();
1108       result.push_back(p);
1109     }
1110     return result;
1111   }
1112 
1113   void StreamSchedule::processOneEventAsync(
1114       WaitingTaskHolder iTask,
1115       EventTransitionInfo& info,
1116       ServiceToken const& serviceToken,
1117       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
1118     EventPrincipal& ep = info.principal();
1119 
1120     // Caught exception is propagated via WaitingTaskHolder
1121     CMS_SA_ALLOW try {
1122       this->resetAll();
1123 
1124       using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1125 
1126       Traits::setStreamContext(streamContext_, ep);
1127       //a service may want to communicate with another service
1128       ServiceRegistry::Operate guard(serviceToken);
1129       Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1130 
1131       // Data dependencies need to be set up before marking empty
1132       // (End)Paths complete in case something consumes the status of
1133       // the empty (EndPath)
1134       workerManagerLumisAndEvents_.setupResolvers(ep);
1135       workerManagerLumisAndEvents_.setupOnDemandSystem(info);
1136 
1137       HLTPathStatus hltPathStatus(hlt::Pass, 0);
1138       for (int empty_trig_path : empty_trig_paths_) {
1139         results_->at(empty_trig_path) = hltPathStatus;
1140         pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1141         std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1142                                         ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1143                                             info, streamID_, ParentContext(&streamContext_), &streamContext_);
1144         if (except) {
1145           iTask.doneWaiting(except);
1146           return;
1147         }
1148       }
1149       if (not endPathStatusInserterWorkers_.empty()) {
1150         for (int empty_end_path : empty_end_paths_) {
1151           std::exception_ptr except =
1152               endPathStatusInserterWorkers_[empty_end_path]
1153                   ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1154                       info, streamID_, ParentContext(&streamContext_), &streamContext_);
1155           if (except) {
1156             iTask.doneWaiting(except);
1157             return;
1158           }
1159         }
1160       }
1161 
1162       ++total_events_;
1163 
1164       //use to give priorities on an error to ones from Paths
1165       auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1166       auto pathErrorPtr = pathErrorHolder.get();
1167       ServiceWeakToken weakToken = serviceToken;
1168       auto allPathsDone = make_waiting_task(
1169           [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1170             ServiceRegistry::Operate operate(weakToken.lock());
1171 
1172             std::exception_ptr ptr;
1173             if (pathError->load()) {
1174               ptr = *pathError->load();
1175               delete pathError->load();
1176             }
1177             if ((not ptr) and iPtr) {
1178               ptr = *iPtr;
1179             }
1180             iTask.doneWaiting(finishProcessOneEvent(ptr));
1181           });
1182       //The holder guarantees that if the paths finish before the loop ends
1183       // that we do not start too soon. It also guarantees that the task will
1184       // run under that condition.
1185       WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1186 
1187       auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1188                                              std::exception_ptr const* iPtr) mutable {
1189         ServiceRegistry::Operate operate(weakToken.lock());
1190 
1191         if (iPtr) {
1192           // free previous value of pathErrorPtr, if any;
1193           // prioritize this error over one that happens in EndPath or Accumulate
1194           auto currentPtr = pathErrorPtr->exchange(new std::exception_ptr(*iPtr));
1195           assert(currentPtr == nullptr);
1196         }
1197         finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1198       });
1199 
1200       //The holder guarantees that if the paths finish before the loop ends
1201       // that we do not start too soon. It also guarantees that the task will
1202       // run under that condition.
1203       WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1204 
1205       //start end paths first so on single threaded the paths will run first
1206       WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1207       for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1208         it->processEventUsingPathAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1209       }
1210 
1211       for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1212         it->processEventUsingPathAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1213       }
1214 
1215       ParentContext parentContext(&streamContext_);
1216       workerManagerLumisAndEvents_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1217           hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1218     } catch (...) {
1219       iTask.doneWaiting(std::current_exception());
1220     }
1221   }
1222 
1223   void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1224                                      WaitingTaskHolder iWait,
1225                                      EventTransitionInfo& info) {
1226     if (iExcept) {
1227       // Caught exception is propagated via WaitingTaskHolder
1228       CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1229         exception_actions::ActionCodes action = actionTable().find(e.category());
1230         assert(action != exception_actions::IgnoreCompletely);
1231         if (action == exception_actions::TryToContinue) {
1232           edm::printCmsExceptionWarning("TryToContinue", e);
1233           *(iExcept.load()) = std::exception_ptr();
1234         } else {
1235           *(iExcept.load()) = std::current_exception();
1236         }
1237       } catch (...) {
1238         *(iExcept.load()) = std::current_exception();
1239       }
1240     }
1241 
1242     if ((not iExcept) and results_->accept()) {
1243       ++total_passed_;
1244     }
1245 
1246     if (nullptr != results_inserter_.get()) {
1247       // Caught exception is propagated to the caller
1248       CMS_SA_ALLOW try {
1249         //Even if there was an exception, we need to allow results inserter
1250         // to run since some module may be waiting on its results.
1251         ParentContext parentContext(&streamContext_);
1252         using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1253 
1254         auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1255         if (expt) {
1256           std::rethrow_exception(expt);
1257         }
1258       } catch (cms::Exception& ex) {
1259         if (not iExcept) {
1260           if (ex.context().empty()) {
1261             std::ostringstream ost;
1262             ost << "Processing Event " << info.principal().id();
1263             ex.addContext(ost.str());
1264           }
1265           iExcept.store(new std::exception_ptr(std::current_exception()));
1266         }
1267       } catch (...) {
1268         if (not iExcept) {
1269           iExcept.store(new std::exception_ptr(std::current_exception()));
1270         }
1271       }
1272     }
1273     std::exception_ptr ptr;
1274     if (iExcept) {
1275       ptr = *iExcept.load();
1276     }
1277     iWait.doneWaiting(ptr);
1278   }
1279 
1280   std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1281     using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1282 
1283     if (iExcept) {
1284       //add context information to the exception and print message
1285       try {
1286         convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1287       } catch (cms::Exception& ex) {
1288         bool const cleaningUpAfterException = false;
1289         if (ex.context().empty()) {
1290           addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1291         } else {
1292           addContextAndPrintException("", ex, cleaningUpAfterException);
1293         }
1294         iExcept = std::current_exception();
1295       }
1296 
1297       actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1298     }
1299     // Caught exception is propagated to the caller
1300     CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1301       if (not iExcept) {
1302         iExcept = std::current_exception();
1303       }
1304     }
1305     if (not iExcept) {
1306       resetEarlyDelete();
1307     }
1308 
1309     return iExcept;
1310   }
1311 
1312   void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1313     oLabelsToFill.reserve(trig_paths_.size());
1314     std::transform(trig_paths_.begin(),
1315                    trig_paths_.end(),
1316                    std::back_inserter(oLabelsToFill),
1317                    std::bind(&Path::name, std::placeholders::_1));
1318   }
1319 
1320   void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1321     TrigPaths::const_iterator itFound = std::find_if(
1322         trig_paths_.begin(),
1323         trig_paths_.end(),
1324         std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1325     if (itFound != trig_paths_.end()) {
1326       oLabelsToFill.reserve(itFound->size());
1327       for (size_t i = 0; i < itFound->size(); ++i) {
1328         oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1329       }
1330     }
1331   }
1332 
1333   void StreamSchedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1334                                                 std::vector<ModuleDescription const*>& descriptions,
1335                                                 unsigned int hint) const {
1336     descriptions.clear();
1337     bool found = false;
1338     TrigPaths::const_iterator itFound;
1339 
1340     if (hint < trig_paths_.size()) {
1341       itFound = trig_paths_.begin() + hint;
1342       if (itFound->name() == iPathLabel)
1343         found = true;
1344     }
1345     if (!found) {
1346       // if the hint did not work, do it the slow way
1347       itFound = std::find_if(
1348           trig_paths_.begin(),
1349           trig_paths_.end(),
1350           std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1351       if (itFound != trig_paths_.end())
1352         found = true;
1353     }
1354     if (found) {
1355       descriptions.reserve(itFound->size());
1356       for (size_t i = 0; i < itFound->size(); ++i) {
1357         descriptions.push_back(itFound->getWorker(i)->description());
1358       }
1359     }
1360   }
1361 
1362   void StreamSchedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1363                                                    std::vector<ModuleDescription const*>& descriptions,
1364                                                    unsigned int hint) const {
1365     descriptions.clear();
1366     bool found = false;
1367     TrigPaths::const_iterator itFound;
1368 
1369     if (hint < end_paths_.size()) {
1370       itFound = end_paths_.begin() + hint;
1371       if (itFound->name() == iEndPathLabel)
1372         found = true;
1373     }
1374     if (!found) {
1375       // if the hint did not work, do it the slow way
1376       itFound = std::find_if(
1377           end_paths_.begin(),
1378           end_paths_.end(),
1379           std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1380       if (itFound != end_paths_.end())
1381         found = true;
1382     }
1383     if (found) {
1384       descriptions.reserve(itFound->size());
1385       for (size_t i = 0; i < itFound->size(); ++i) {
1386         descriptions.push_back(itFound->getWorker(i)->description());
1387       }
1388     }
1389   }
1390 
1391   static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1392     sum.timesVisited += path.timesVisited(which);
1393     sum.timesPassed += path.timesPassed(which);
1394     sum.timesFailed += path.timesFailed(which);
1395     sum.timesExcept += path.timesExcept(which);
1396     sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1397     sum.bitPosition = path.bitPosition(which);
1398   }
1399 
1400   static void fillPathSummary(Path const& path, PathSummary& sum) {
1401     sum.name = path.name();
1402     sum.bitPosition = path.bitPosition();
1403     sum.timesRun += path.timesRun();
1404     sum.timesPassed += path.timesPassed();
1405     sum.timesFailed += path.timesFailed();
1406     sum.timesExcept += path.timesExcept();
1407 
1408     Path::size_type sz = path.size();
1409     if (sum.moduleInPathSummaries.empty()) {
1410       std::vector<ModuleInPathSummary> temp(sz);
1411       for (size_t i = 0; i != sz; ++i) {
1412         fillModuleInPathSummary(path, i, temp[i]);
1413       }
1414       sum.moduleInPathSummaries.swap(temp);
1415     } else {
1416       assert(sz == sum.moduleInPathSummaries.size());
1417       for (size_t i = 0; i != sz; ++i) {
1418         fillModuleInPathSummary(path, i, sum.moduleInPathSummaries[i]);
1419       }
1420     }
1421   }
1422 
1423   static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1424     sum.timesVisited += w.timesVisited();
1425     sum.timesRun += w.timesRun();
1426     sum.timesPassed += w.timesPassed();
1427     sum.timesFailed += w.timesFailed();
1428     sum.timesExcept += w.timesExcept();
1429     sum.moduleLabel = w.description()->moduleLabel();
1430   }
1431 
1432   static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1433 
1434   void StreamSchedule::getTriggerReport(TriggerReport& rep) const {
1435     rep.eventSummary.totalEvents += totalEvents();
1436     rep.eventSummary.totalEventsPassed += totalEventsPassed();
1437     rep.eventSummary.totalEventsFailed += totalEventsFailed();
1438 
1439     fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1440     fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1441     fill_summary(allWorkersLumisAndEvents(), rep.workerSummaries, &fillWorkerSummary);
1442   }
1443 
1444   void StreamSchedule::clearCounters() {
1445     using std::placeholders::_1;
1446     total_events_ = total_passed_ = 0;
1447     for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1448     for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1449     for_all(allWorkersLumisAndEvents(), std::bind(&Worker::clearCounters, _1));
1450   }
1451 
1452   void StreamSchedule::resetAll() { results_->reset(); }
1453 
1454   void StreamSchedule::addToAllWorkers(Worker* w) { workerManagerLumisAndEvents_.addToAllWorkers(w); }
1455 
1456   void StreamSchedule::resetEarlyDelete() {
1457     //must be sure we have cleared the count first
1458     for (auto& count : earlyDeleteBranchToCount_) {
1459       count.count = 0;
1460     }
1461     //now reset based on how many helpers use that branch
1462     for (auto& index : earlyDeleteHelperToBranchIndicies_) {
1463       ++(earlyDeleteBranchToCount_[index].count);
1464     }
1465     for (auto& helper : earlyDeleteHelpers_) {
1466       helper.reset();
1467     }
1468   }
1469 
1470   void StreamSchedule::makePathStatusInserters(
1471       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1472       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1473       ExceptionToActionTable const& actions) {
1474     int bitpos = 0;
1475     unsigned int indexEmpty = 0;
1476     unsigned int indexOfPath = 0;
1477     for (auto& pathStatusInserter : pathStatusInserters) {
1478       std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1479       WorkerPtr workerPtr(
1480           new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1481       pathStatusInserterWorkers_.emplace_back(workerPtr);
1482       workerPtr->setActivityRegistry(actReg_);
1483       addToAllWorkers(workerPtr.get());
1484 
1485       // A little complexity here because a C++ Path object is not
1486       // instantiated and put into end_paths if there are no modules
1487       // on the configured path.
1488       if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1489         ++indexEmpty;
1490       } else {
1491         trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1492         ++indexOfPath;
1493       }
1494       ++bitpos;
1495     }
1496 
1497     bitpos = 0;
1498     indexEmpty = 0;
1499     indexOfPath = 0;
1500     for (auto& endPathStatusInserter : endPathStatusInserters) {
1501       std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1502       WorkerPtr workerPtr(
1503           new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1504       endPathStatusInserterWorkers_.emplace_back(workerPtr);
1505       workerPtr->setActivityRegistry(actReg_);
1506       addToAllWorkers(workerPtr.get());
1507 
1508       // A little complexity here because a C++ Path object is not
1509       // instantiated and put into end_paths if there are no modules
1510       // on the configured path.
1511       if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1512         ++indexEmpty;
1513       } else {
1514         end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1515         ++indexOfPath;
1516       }
1517       ++bitpos;
1518     }
1519   }
1520 
1521   void StreamSchedule::handleException(StreamContext const& streamContext,
1522                                        bool cleaningUpAfterException,
1523                                        std::exception_ptr& excpt) const noexcept {
1524     //add context information to the exception and print message
1525     try {
1526       convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
1527     } catch (cms::Exception& ex) {
1528       std::ostringstream ost;
1529       // In most cases the exception will already have context at this point,
1530       // but add some context here in those rare cases where it does not.
1531       if (ex.context().empty()) {
1532         exceptionContext(ost, streamContext);
1533       }
1534       addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
1535       excpt = std::current_exception();
1536     }
1537     // We are already handling an earlier exception, so ignore it
1538     // if this signal results in another exception being thrown.
1539     CMS_SA_ALLOW try {
1540       actReg_->preStreamEarlyTerminationSignal_(streamContext, TerminationOrigin::ExceptionFromThisContext);
1541     } catch (...) {
1542     }
1543   }
1544 }  // namespace edm