Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-09-30 01:18:08

0001 #include "FWCore/Framework/interface/StreamSchedule.h"
0002 
0003 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0004 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0005 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0007 #include "FWCore/Framework/src/OutputModuleDescription.h"
0008 #include "FWCore/Framework/interface/TriggerNamesService.h"
0009 #include "FWCore/Framework/src/TriggerReport.h"
0010 #include "FWCore/Framework/src/TriggerTimingReport.h"
0011 #include "FWCore/Framework/src/Factory.h"
0012 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0013 #include "FWCore/Framework/src/TriggerResultInserter.h"
0014 #include "FWCore/Framework/src/PathStatusInserter.h"
0015 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0016 #include "FWCore/Framework/interface/WorkerInPath.h"
0017 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0018 #include "FWCore/Framework/interface/maker/WorkerT.h"
0019 #include "FWCore/Framework/interface/ModuleRegistry.h"
0020 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 #include "FWCore/ParameterSet/interface/Registry.h"
0024 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0025 #include "FWCore/Utilities/interface/Algorithms.h"
0026 #include "FWCore/Utilities/interface/ConvertException.h"
0027 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0028 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0029 
0030 #include "LuminosityBlockProcessingStatus.h"
0031 #include "processEDAliases.h"
0032 
0033 #include <algorithm>
0034 #include <cassert>
0035 #include <cstdlib>
0036 #include <functional>
0037 #include <iomanip>
0038 #include <limits>
0039 #include <list>
0040 #include <map>
0041 #include <exception>
0042 #include <fmt/format.h>
0043 
0044 namespace edm {
0045 
0046   namespace {
0047 
0048     // Function template to transform each element in the input range to
0049     // a value placed into the output range. The supplied function
0050     // should take a const_reference to the 'input', and write to a
0051     // reference to the 'output'.
0052     template <typename InputIterator, typename ForwardIterator, typename Func>
0053     void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
0054       for (; begin != end; ++begin, ++out)
0055         func(*begin, *out);
0056     }
0057 
0058     // Function template that takes a sequence 'from', a sequence
0059     // 'to', and a callable object 'func'. It and applies
0060     // transform_into to fill the 'to' sequence with the values
0061     // calcuated by the callable object, taking care to fill the
0062     // outupt only if all calls succeed.
0063     template <typename FROM, typename TO, typename FUNC>
0064     void fill_summary(FROM const& from, TO& to, FUNC func) {
0065       if (to.size() != from.size()) {
0066         TO temp(from.size());
0067         transform_into(from.begin(), from.end(), temp.begin(), func);
0068         to.swap(temp);
0069       } else {
0070         transform_into(from.begin(), from.end(), to.begin(), func);
0071       }
0072     }
0073 
0074     // -----------------------------
0075 
0076     // Here we make the trigger results inserter directly.  This should
0077     // probably be a utility in the WorkerRegistry or elsewhere.
0078 
0079     StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
0080                                            std::shared_ptr<ActivityRegistry> areg,
0081                                            std::shared_ptr<TriggerResultInserter> inserter) {
0082       StreamSchedule::WorkerPtr ptr(
0083           new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
0084       ptr->setActivityRegistry(areg);
0085       return ptr;
0086     }
0087 
0088     void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
0089                                          ProductRegistry const& preg,
0090                                          std::multimap<std::string, Worker*>& branchToReadingWorker) {
0091       auto vBranchesToDeleteEarly = branchesToDeleteEarly;
0092       // Remove any duplicates
0093       std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
0094       vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
0095                                    vBranchesToDeleteEarly.end());
0096 
0097       // Are the requested items in the product registry?
0098       auto allBranchNames = preg.allBranchNames();
0099       //the branch names all end with a period, which we do not want to compare with
0100       for (auto& b : allBranchNames) {
0101         b.resize(b.size() - 1);
0102       }
0103       std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
0104       std::vector<std::string> temp;
0105       temp.reserve(vBranchesToDeleteEarly.size());
0106 
0107       std::set_intersection(vBranchesToDeleteEarly.begin(),
0108                             vBranchesToDeleteEarly.end(),
0109                             allBranchNames.begin(),
0110                             allBranchNames.end(),
0111                             std::back_inserter(temp));
0112       vBranchesToDeleteEarly.swap(temp);
0113       if (temp.size() != vBranchesToDeleteEarly.size()) {
0114         std::vector<std::string> missingProducts;
0115         std::set_difference(temp.begin(),
0116                             temp.end(),
0117                             vBranchesToDeleteEarly.begin(),
0118                             vBranchesToDeleteEarly.end(),
0119                             std::back_inserter(missingProducts));
0120         LogInfo l("MissingProductsForCanDeleteEarly");
0121         l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
0122         for (auto const& n : missingProducts) {
0123           l << "\n " << n;
0124         }
0125       }
0126       //set placeholder for the branch, we will remove the nullptr if a
0127       // module actually wants the branch.
0128       for (auto const& branch : vBranchesToDeleteEarly) {
0129         branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
0130       }
0131     }
0132 
0133     Worker* getWorker(std::string const& moduleLabel,
0134                       ParameterSet& proc_pset,
0135                       WorkerManager& workerManager,
0136                       ProductRegistry& preg,
0137                       PreallocationConfiguration const* prealloc,
0138                       std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0139       bool isTracked;
0140       ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0141       if (modpset == nullptr) {
0142         return nullptr;
0143       }
0144       assert(isTracked);
0145 
0146       return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
0147     }
0148 
0149     // If ConditionalTask modules exist in the container of module
0150     // names, returns the range (std::pair) for the modules. The range
0151     // excludes the special markers '#' (right before the
0152     // ConditionalTask modules) and '@' (last element).
0153     // If the module name container does not contain ConditionalTask
0154     // modules, returns std::pair of end iterators.
0155     template <typename T>
0156     auto findConditionalTaskModulesRange(T& modnames) {
0157       auto beg = std::find(modnames.begin(), modnames.end(), "#");
0158       if (beg == modnames.end()) {
0159         return std::pair(modnames.end(), modnames.end());
0160       }
0161       return std::pair(beg + 1, std::prev(modnames.end()));
0162     }
0163 
0164     std::optional<std::string> findBestMatchingAlias(
0165         std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0166         std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
0167         std::string const& productModuleLabel,
0168         ConsumesInfo const& consumesInfo) {
0169       std::optional<std::string> best;
0170       int wildcardsInBest = std::numeric_limits<int>::max();
0171       bool bestIsAmbiguous = false;
0172 
0173       auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
0174                             std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
0175         int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
0176         if (wildcards == 0) {
0177           bestIsAmbiguous = false;
0178           return true;
0179         }
0180         if (not best or wildcards < wildcardsInBest) {
0181           best = label;
0182           wildcardsInBest = wildcards;
0183           bestIsAmbiguous = false;
0184         } else if (best and *best != label and wildcardsInBest == wildcards) {
0185           bestIsAmbiguous = true;
0186         }
0187         return false;
0188       };
0189 
0190       auto findAlias = aliasMap.equal_range(productModuleLabel);
0191       for (auto it = findAlias.first; it != findAlias.second; ++it) {
0192         std::string const& aliasInstanceLabel =
0193             it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
0194         bool const instanceIsWildcard = (aliasInstanceLabel == "*");
0195         if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
0196           bool const typeIsWildcard = it->second.friendlyClassName == "*";
0197           if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
0198             if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0199               return it->second.originalModuleLabel;
0200             }
0201           } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
0202             //consume is a View so need to do more intrusive search
0203             //find matching branches in module
0204             auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
0205             for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
0206               if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
0207                 if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
0208                                                                    TypeID(itBranch->second->wrappedType().typeInfo()),
0209                                                                    itBranch->second->className())) {
0210                   if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0211                     return it->second.originalModuleLabel;
0212                   }
0213                 }
0214               }
0215             }
0216           }
0217         }
0218       }
0219       if (bestIsAmbiguous) {
0220         throw Exception(errors::UnimplementedFeature)
0221             << "Encountered ambiguity when trying to find a best-matching alias for\n"
0222             << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
0223             << " module label " << productModuleLabel << "\n"
0224             << " product instance name " << consumesInfo.instance() << "\n"
0225             << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
0226                "wildcards ("
0227             << wildcardsInBest << ")";
0228       }
0229       return best;
0230     }
0231   }  // namespace
0232 
0233   // -----------------------------
0234 
0235   typedef std::vector<std::string> vstring;
0236 
0237   // -----------------------------
0238 
0239   class ConditionalTaskHelper {
0240   public:
0241     using AliasInfo = StreamSchedule::AliasInfo;
0242 
0243     ConditionalTaskHelper(ParameterSet& proc_pset,
0244                           ProductRegistry& preg,
0245                           PreallocationConfiguration const* prealloc,
0246                           std::shared_ptr<ProcessConfiguration const> processConfiguration,
0247                           WorkerManager& workerManager,
0248                           std::vector<std::string> const& trigPathNames) {
0249       std::unordered_set<std::string> allConditionalMods;
0250       for (auto const& pathName : trigPathNames) {
0251         auto const modnames = proc_pset.getParameter<vstring>(pathName);
0252 
0253         //Pull out ConditionalTask modules
0254         auto condRange = findConditionalTaskModulesRange(modnames);
0255         if (condRange.first == condRange.second)
0256           continue;
0257 
0258         //the last entry should be ignored since it is required to be "@"
0259         allConditionalMods.insert(condRange.first, condRange.second);
0260       }
0261 
0262       for (auto const& cond : allConditionalMods) {
0263         //force the creation of the conditional modules so alias check can work
0264         (void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
0265       }
0266 
0267       fillAliasMap(proc_pset, allConditionalMods);
0268       processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
0269 
0270       //find branches created by the conditional modules
0271       for (auto const& prod : preg.productList()) {
0272         if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
0273           conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
0274         }
0275       }
0276     }
0277 
0278     std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
0279 
0280     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
0281         std::unordered_set<std::string> const& conditionalmods) const {
0282       std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
0283       for (auto const& mod : conditionalmods) {
0284         auto range = conditionalModsBranches_.equal_range(mod);
0285         ret.insert(range.first, range.second);
0286       }
0287       return ret;
0288     }
0289 
0290   private:
0291     void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
0292       auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0293       std::string const star("*");
0294       for (auto const& alias : aliases) {
0295         auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
0296         auto aliasedToModuleLabels = info.getParameterNames();
0297         for (auto const& mod : aliasedToModuleLabels) {
0298           if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
0299             auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
0300             for (auto const& aliasPSet : aliasVPSet) {
0301               std::string type = star;
0302               std::string instance = star;
0303               std::string originalInstance = star;
0304               if (aliasPSet.exists("type")) {
0305                 type = aliasPSet.getParameter<std::string>("type");
0306               }
0307               if (aliasPSet.exists("toProductInstance")) {
0308                 instance = aliasPSet.getParameter<std::string>("toProductInstance");
0309               }
0310               if (aliasPSet.exists("fromProductInstance")) {
0311                 originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
0312               }
0313 
0314               aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
0315             }
0316           }
0317         }
0318       }
0319     }
0320 
0321     void processSwitchEDAliases(ParameterSet const& proc_pset,
0322                                 ProductRegistry& preg,
0323                                 ProcessConfiguration const& processConfiguration,
0324                                 std::unordered_set<std::string> const& allConditionalMods) {
0325       auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
0326       std::vector<std::string> switchEDAliases;
0327       for (auto const& module : all_modules) {
0328         auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
0329         if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
0330           auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
0331           for (auto const& case_label : all_cases) {
0332             auto range = aliasMap_.equal_range(case_label);
0333             if (range.first != range.second) {
0334               switchEDAliases.push_back(case_label);
0335             }
0336           }
0337         }
0338       }
0339       detail::processEDAliases(
0340           switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
0341     }
0342 
0343     std::unordered_multimap<std::string, AliasInfo> aliasMap_;
0344     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
0345   };
0346 
0347   // -----------------------------
0348 
0349   StreamSchedule::StreamSchedule(
0350       std::shared_ptr<TriggerResultInserter> inserter,
0351       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0352       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0353       std::shared_ptr<ModuleRegistry> modReg,
0354       ParameterSet& proc_pset,
0355       service::TriggerNamesService const& tns,
0356       PreallocationConfiguration const& prealloc,
0357       ProductRegistry& preg,
0358       ExceptionToActionTable const& actions,
0359       std::shared_ptr<ActivityRegistry> areg,
0360       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0361       StreamID streamID,
0362       ProcessContext const* processContext)
0363       : workerManager_(modReg, areg, actions),
0364         actReg_(areg),
0365         results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
0366         results_inserter_(),
0367         trig_paths_(),
0368         end_paths_(),
0369         total_events_(),
0370         total_passed_(),
0371         number_of_unscheduled_modules_(0),
0372         streamID_(streamID),
0373         streamContext_(streamID_, processContext),
0374         skippingEvent_(false) {
0375     bool hasPath = false;
0376     std::vector<std::string> const& pathNames = tns.getTrigPaths();
0377     std::vector<std::string> const& endPathNames = tns.getEndPaths();
0378 
0379     ConditionalTaskHelper conditionalTaskHelper(
0380         proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
0381 
0382     int trig_bitpos = 0;
0383     trig_paths_.reserve(pathNames.size());
0384     for (auto const& trig_name : pathNames) {
0385       fillTrigPath(proc_pset,
0386                    preg,
0387                    &prealloc,
0388                    processConfiguration,
0389                    trig_bitpos,
0390                    trig_name,
0391                    results(),
0392                    endPathNames,
0393                    conditionalTaskHelper);
0394       ++trig_bitpos;
0395       hasPath = true;
0396     }
0397 
0398     if (hasPath) {
0399       // the results inserter stands alone
0400       inserter->setTrigResultForStream(streamID.value(), results());
0401 
0402       results_inserter_ = makeInserter(actions, actReg_, inserter);
0403       addToAllWorkers(results_inserter_.get());
0404     }
0405 
0406     // fill normal endpaths
0407     int bitpos = 0;
0408     end_paths_.reserve(endPathNames.size());
0409     for (auto const& end_path_name : endPathNames) {
0410       fillEndPath(
0411           proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
0412       ++bitpos;
0413     }
0414 
0415     makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
0416 
0417     //See if all modules were used
0418     std::set<std::string> usedWorkerLabels;
0419     for (auto const& worker : allWorkers()) {
0420       usedWorkerLabels.insert(worker->description()->moduleLabel());
0421     }
0422     std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0423     std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0424     std::vector<std::string> unusedLabels;
0425     set_difference(modulesInConfigSet.begin(),
0426                    modulesInConfigSet.end(),
0427                    usedWorkerLabels.begin(),
0428                    usedWorkerLabels.end(),
0429                    back_inserter(unusedLabels));
0430     std::set<std::string> unscheduledLabels;
0431     std::vector<std::string> shouldBeUsedLabels;
0432     if (!unusedLabels.empty()) {
0433       //Need to
0434       // 1) create worker
0435       // 2) if it is a WorkerT<EDProducer>, add it to our list
0436       // 3) hand list to our delayed reader
0437       for (auto const& label : unusedLabels) {
0438         bool isTracked;
0439         ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
0440         assert(isTracked);
0441         assert(modulePSet != nullptr);
0442         workerManager_.addToUnscheduledWorkers(
0443             *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0444       }
0445       if (!shouldBeUsedLabels.empty()) {
0446         std::ostringstream unusedStream;
0447         unusedStream << "'" << shouldBeUsedLabels.front() << "'";
0448         for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
0449                                                 itLabelEnd = shouldBeUsedLabels.end();
0450              itLabel != itLabelEnd;
0451              ++itLabel) {
0452           unusedStream << ",'" << *itLabel << "'";
0453         }
0454         LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
0455       }
0456     }
0457     number_of_unscheduled_modules_ = unscheduledLabels.size();
0458   }  // StreamSchedule::StreamSchedule
0459 
0460   void StreamSchedule::initializeEarlyDelete(ModuleRegistry& modReg,
0461                                              std::vector<std::string> const& branchesToDeleteEarly,
0462                                              std::multimap<std::string, std::string> const& referencesToBranches,
0463                                              std::vector<std::string> const& modulesToSkip,
0464                                              edm::ProductRegistry const& preg) {
0465     // setup the list with those products actually registered for this job
0466     std::multimap<std::string, Worker*> branchToReadingWorker;
0467     initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
0468 
0469     const std::vector<std::string> kEmpty;
0470     std::map<Worker*, unsigned int> reserveSizeForWorker;
0471     unsigned int upperLimitOnReadingWorker = 0;
0472     unsigned int upperLimitOnIndicies = 0;
0473     unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
0474 
0475     //talk with output modules first
0476     modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
0477       auto comm = iHolder->createOutputModuleCommunicator();
0478       if (comm) {
0479         if (!branchToReadingWorker.empty()) {
0480           //If an OutputModule needs a product, we can't delete it early
0481           // so we should remove it from our list
0482           SelectedProductsForBranchType const& kept = comm->keptProducts();
0483           for (auto const& item : kept[InEvent]) {
0484             BranchDescription const& desc = *item.first;
0485             auto found = branchToReadingWorker.equal_range(desc.branchName());
0486             if (found.first != found.second) {
0487               --nUniqueBranchesToDelete;
0488               branchToReadingWorker.erase(found.first, found.second);
0489             }
0490           }
0491         }
0492       }
0493     });
0494 
0495     if (branchToReadingWorker.empty()) {
0496       return;
0497     }
0498 
0499     std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
0500     for (auto w : allWorkers()) {
0501       if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
0502         continue;
0503       }
0504       //determine if this module could read a branch we want to delete early
0505       auto consumes = w->consumesInfo();
0506       if (not consumes.empty()) {
0507         bool foundAtLeastOneMatchingBranch = false;
0508         for (auto const& product : consumes) {
0509           std::string branch = fmt::format("{}_{}_{}_{}",
0510                                            product.type().friendlyClassName(),
0511                                            product.label().data(),
0512                                            product.instance().data(),
0513                                            product.process().data());
0514           {
0515             //Handle case where worker directly consumes product
0516             auto found = branchToReadingWorker.end();
0517             if (product.process().empty()) {
0518               auto startFound = branchToReadingWorker.lower_bound(branch);
0519               if (startFound != branchToReadingWorker.end()) {
0520                 if (startFound->first.substr(0, branch.size()) == branch) {
0521                   //match all processNames here, even if it means multiple matches will happen
0522                   found = startFound;
0523                 }
0524               }
0525             } else {
0526               auto exactFound = branchToReadingWorker.equal_range(branch);
0527               if (exactFound.first != exactFound.second) {
0528                 found = exactFound.first;
0529               }
0530             }
0531             if (found != branchToReadingWorker.end()) {
0532               if (not foundAtLeastOneMatchingBranch) {
0533                 ++upperLimitOnReadingWorker;
0534                 foundAtLeastOneMatchingBranch = true;
0535               }
0536               ++upperLimitOnIndicies;
0537               ++reserveSizeForWorker[w];
0538               if (nullptr == found->second) {
0539                 found->second = w;
0540               } else {
0541                 branchToReadingWorker.insert(make_pair(found->first, w));
0542               }
0543             }
0544           }
0545           {
0546             //Handle case where indirectly consumes product
0547             auto found = referencesToBranches.end();
0548             if (product.process().empty()) {
0549               auto startFound = referencesToBranches.lower_bound(branch);
0550               if (startFound != referencesToBranches.end()) {
0551                 if (startFound->first.substr(0, branch.size()) == branch) {
0552                   //match all processNames here, even if it means multiple matches will happen
0553                   found = startFound;
0554                 }
0555               }
0556             } else {
0557               //can match exactly
0558               auto exactFound = referencesToBranches.equal_range(branch);
0559               if (exactFound.first != exactFound.second) {
0560                 found = exactFound.first;
0561               }
0562             }
0563             if (found != referencesToBranches.end()) {
0564               for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
0565                 auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
0566                 if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
0567                   continue;
0568                 }
0569                 if (not foundAtLeastOneMatchingBranch) {
0570                   ++upperLimitOnReadingWorker;
0571                   foundAtLeastOneMatchingBranch = true;
0572                 }
0573                 ++upperLimitOnIndicies;
0574                 ++reserveSizeForWorker[w];
0575                 if (nullptr == foundInBranchToReadingWorker->second) {
0576                   foundInBranchToReadingWorker->second = w;
0577                 } else {
0578                   branchToReadingWorker.insert(make_pair(itr->second, w));
0579                 }
0580               }
0581             }
0582           }
0583         }
0584       }
0585     }
0586     {
0587       auto it = branchToReadingWorker.begin();
0588       std::vector<std::string> unusedBranches;
0589       while (it != branchToReadingWorker.end()) {
0590         if (it->second == nullptr) {
0591           unusedBranches.push_back(it->first);
0592           //erasing the object invalidates the iterator so must advance it first
0593           auto temp = it;
0594           ++it;
0595           branchToReadingWorker.erase(temp);
0596         } else {
0597           ++it;
0598         }
0599       }
0600       if (not unusedBranches.empty()) {
0601         LogWarning l("UnusedProductsForCanDeleteEarly");
0602         l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
0603              " If possible, remove the producer from the job.";
0604         for (auto const& n : unusedBranches) {
0605           l << "\n " << n;
0606         }
0607       }
0608     }
0609     if (!branchToReadingWorker.empty()) {
0610       earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
0611       earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
0612       earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
0613       std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
0614       std::string lastBranchName;
0615       size_t nextOpenIndex = 0;
0616       unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
0617       for (auto& branchAndWorker : branchToReadingWorker) {
0618         if (lastBranchName != branchAndWorker.first) {
0619           //have to put back the period we removed earlier in order to get the proper name
0620           BranchID bid(branchAndWorker.first + ".");
0621           earlyDeleteBranchToCount_.emplace_back(bid, 0U);
0622           lastBranchName = branchAndWorker.first;
0623         }
0624         auto found = alreadySeenWorkers.find(branchAndWorker.second);
0625         if (alreadySeenWorkers.end() == found) {
0626           //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
0627           // all the branches that might be read by this worker. However, initially we will only tell the
0628           // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
0629           // EarlyDeleteHelper will automatically advance its internal end pointer.
0630           size_t index = nextOpenIndex;
0631           size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
0632           assert(index < earlyDeleteHelperToBranchIndicies_.size());
0633           earlyDeleteHelperToBranchIndicies_[index] = earlyDeleteBranchToCount_.size() - 1;
0634           earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
0635           branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
0636           alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
0637           nextOpenIndex += nIndices;
0638         } else {
0639           found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
0640         }
0641       }
0642 
0643       //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
0644       // space needed for each module
0645       auto itLast = earlyDeleteHelpers_.begin();
0646       for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
0647         if (itLast->end() != it->begin()) {
0648           //figure the offset for next Worker since it hasn't been moved yet so it has the original address
0649           unsigned int delta = it->begin() - itLast->end();
0650           it->shiftIndexPointers(delta);
0651 
0652           earlyDeleteHelperToBranchIndicies_.erase(
0653               earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0654               earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
0655         }
0656         itLast = it;
0657       }
0658       earlyDeleteHelperToBranchIndicies_.erase(
0659           earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0660           earlyDeleteHelperToBranchIndicies_.end());
0661 
0662       //now tell the paths about the deleters
0663       for (auto& p : trig_paths_) {
0664         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0665       }
0666       for (auto& p : end_paths_) {
0667         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0668       }
0669       resetEarlyDelete();
0670     }
0671   }
0672 
0673   std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
0674       Worker* worker,
0675       std::unordered_set<std::string>& conditionalModules,
0676       std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0677       std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0678       ParameterSet& proc_pset,
0679       ProductRegistry& preg,
0680       PreallocationConfiguration const* prealloc,
0681       std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0682     std::vector<Worker*> returnValue;
0683     auto const& consumesInfo = worker->consumesInfo();
0684     auto moduleLabel = worker->description()->moduleLabel();
0685     using namespace productholderindexhelper;
0686     for (auto const& ci : consumesInfo) {
0687       if (not ci.skipCurrentProcess() and
0688           (ci.process().empty() or ci.process() == processConfiguration->processName())) {
0689         auto productModuleLabel = std::string(ci.label());
0690         if (productModuleLabel.empty()) {
0691           //this is a consumesMany request
0692           for (auto const& branch : conditionalModuleBranches) {
0693             //check that the conditional module has not been used
0694             if (conditionalModules.find(branch.first) == conditionalModules.end()) {
0695               continue;
0696             }
0697             if (ci.kindOfType() == edm::PRODUCT_TYPE) {
0698               if (branch.second->unwrappedTypeID() != ci.type()) {
0699                 continue;
0700               }
0701             } else {
0702               if (not typeIsViewCompatible(
0703                       ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
0704                 continue;
0705               }
0706             }
0707 
0708             auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0709             assert(condWorker);
0710 
0711             conditionalModules.erase(branch.first);
0712 
0713             auto dependents = tryToPlaceConditionalModules(condWorker,
0714                                                            conditionalModules,
0715                                                            conditionalModuleBranches,
0716                                                            aliasMap,
0717                                                            proc_pset,
0718                                                            preg,
0719                                                            prealloc,
0720                                                            processConfiguration);
0721             returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0722             returnValue.push_back(condWorker);
0723           }
0724         } else {
0725           //just a regular consumes
0726           bool productFromConditionalModule = false;
0727           auto itFound = conditionalModules.find(productModuleLabel);
0728           if (itFound == conditionalModules.end()) {
0729             //Check to see if this was an alias
0730             //note that aliasMap was previously filtered so only the conditional modules remain there
0731             auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
0732             if (foundAlias) {
0733               productModuleLabel = *foundAlias;
0734               productFromConditionalModule = true;
0735               itFound = conditionalModules.find(productModuleLabel);
0736               //check that the alias-for conditional module has not been used
0737               if (itFound == conditionalModules.end()) {
0738                 continue;
0739               }
0740             }
0741           } else {
0742             //need to check the rest of the data product info
0743             auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
0744             for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
0745               if (itBranch->second->productInstanceName() == ci.instance()) {
0746                 if (ci.kindOfType() == PRODUCT_TYPE) {
0747                   if (ci.type() == itBranch->second->unwrappedTypeID()) {
0748                     productFromConditionalModule = true;
0749                     break;
0750                   }
0751                 } else {
0752                   //this is a view
0753                   if (typeIsViewCompatible(ci.type(),
0754                                            TypeID(itBranch->second->wrappedType().typeInfo()),
0755                                            itBranch->second->className())) {
0756                     productFromConditionalModule = true;
0757                     break;
0758                   }
0759                 }
0760               }
0761             }
0762           }
0763           if (productFromConditionalModule) {
0764             auto condWorker =
0765                 getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0766             assert(condWorker);
0767 
0768             conditionalModules.erase(itFound);
0769 
0770             auto dependents = tryToPlaceConditionalModules(condWorker,
0771                                                            conditionalModules,
0772                                                            conditionalModuleBranches,
0773                                                            aliasMap,
0774                                                            proc_pset,
0775                                                            preg,
0776                                                            prealloc,
0777                                                            processConfiguration);
0778             returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0779             returnValue.push_back(condWorker);
0780           }
0781         }
0782       }
0783     }
0784     return returnValue;
0785   }
0786 
0787   void StreamSchedule::fillWorkers(ParameterSet& proc_pset,
0788                                    ProductRegistry& preg,
0789                                    PreallocationConfiguration const* prealloc,
0790                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0791                                    std::string const& pathName,
0792                                    bool ignoreFilters,
0793                                    PathWorkers& out,
0794                                    std::vector<std::string> const& endPathNames,
0795                                    ConditionalTaskHelper const& conditionalTaskHelper) {
0796     vstring modnames = proc_pset.getParameter<vstring>(pathName);
0797     PathWorkers tmpworkers;
0798 
0799     //Pull out ConditionalTask modules
0800     auto condRange = findConditionalTaskModulesRange(modnames);
0801 
0802     std::unordered_set<std::string> conditionalmods;
0803     //An EDAlias may be redirecting to a module on a ConditionalTask
0804     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
0805     std::unordered_map<std::string, unsigned int> conditionalModOrder;
0806     if (condRange.first != condRange.second) {
0807       for (auto it = condRange.first; it != condRange.second; ++it) {
0808         // ordering needs to skip the # token in the path list
0809         conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
0810       }
0811       //the last entry should be ignored since it is required to be "@"
0812       conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
0813                                                         std::make_move_iterator(condRange.second));
0814 
0815       conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
0816       modnames.erase(std::prev(condRange.first), modnames.end());
0817     }
0818 
0819     unsigned int placeInPath = 0;
0820     for (auto const& name : modnames) {
0821       //Modules except EDFilters are set to run concurrently by default
0822       bool doNotRunConcurrently = false;
0823       WorkerInPath::FilterAction filterAction = WorkerInPath::Normal;
0824       if (name[0] == '!') {
0825         filterAction = WorkerInPath::Veto;
0826       } else if (name[0] == '-' or name[0] == '+') {
0827         filterAction = WorkerInPath::Ignore;
0828       }
0829       if (name[0] == '|' or name[0] == '+') {
0830         //cms.wait was specified so do not run concurrently
0831         doNotRunConcurrently = true;
0832       }
0833 
0834       std::string moduleLabel = name;
0835       if (filterAction != WorkerInPath::Normal or name[0] == '|') {
0836         moduleLabel.erase(0, 1);
0837       }
0838 
0839       Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0840       if (worker == nullptr) {
0841         std::string pathType("endpath");
0842         if (!search_all(endPathNames, pathName)) {
0843           pathType = std::string("path");
0844         }
0845         throw Exception(errors::Configuration)
0846             << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
0847             << "\"\n please check spelling or remove that label from the path.";
0848       }
0849 
0850       if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
0851         // We have a filter on an end path, and the filter is not explicitly ignored.
0852         // See if the filter is allowed.
0853         std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
0854         if (!search_all(allowed_filters, worker->description()->moduleName())) {
0855           // Filter is not allowed. Ignore the result, and issue a warning.
0856           filterAction = WorkerInPath::Ignore;
0857           LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
0858                                         << "' with module label '" << moduleLabel << "' appears on EndPath '"
0859                                         << pathName << "'.\n"
0860                                         << "The return value of the filter will be ignored.\n"
0861                                         << "To suppress this warning, either remove the filter from the endpath,\n"
0862                                         << "or explicitly ignore it in the configuration by using cms.ignore().\n";
0863         }
0864       }
0865       bool runConcurrently = not doNotRunConcurrently;
0866       if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
0867         runConcurrently = false;
0868       }
0869 
0870       auto condModules = tryToPlaceConditionalModules(worker,
0871                                                       conditionalmods,
0872                                                       conditionalModsBranches,
0873                                                       conditionalTaskHelper.aliasMap(),
0874                                                       proc_pset,
0875                                                       preg,
0876                                                       prealloc,
0877                                                       processConfiguration);
0878       for (auto condMod : condModules) {
0879         tmpworkers.emplace_back(
0880             condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
0881       }
0882 
0883       tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
0884       ++placeInPath;
0885     }
0886 
0887     out.swap(tmpworkers);
0888   }
0889 
0890   void StreamSchedule::fillTrigPath(ParameterSet& proc_pset,
0891                                     ProductRegistry& preg,
0892                                     PreallocationConfiguration const* prealloc,
0893                                     std::shared_ptr<ProcessConfiguration const> processConfiguration,
0894                                     int bitpos,
0895                                     std::string const& name,
0896                                     TrigResPtr trptr,
0897                                     std::vector<std::string> const& endPathNames,
0898                                     ConditionalTaskHelper const& conditionalTaskHelper) {
0899     PathWorkers tmpworkers;
0900     fillWorkers(
0901         proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
0902 
0903     // an empty path will cause an extra bit that is not used
0904     if (!tmpworkers.empty()) {
0905       trig_paths_.emplace_back(bitpos,
0906                                name,
0907                                tmpworkers,
0908                                trptr,
0909                                actionTable(),
0910                                actReg_,
0911                                &streamContext_,
0912                                &skippingEvent_,
0913                                PathContext::PathType::kPath);
0914     } else {
0915       empty_trig_paths_.push_back(bitpos);
0916     }
0917     for (WorkerInPath const& workerInPath : tmpworkers) {
0918       addToAllWorkers(workerInPath.getWorker());
0919     }
0920   }
0921 
0922   void StreamSchedule::fillEndPath(ParameterSet& proc_pset,
0923                                    ProductRegistry& preg,
0924                                    PreallocationConfiguration const* prealloc,
0925                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0926                                    int bitpos,
0927                                    std::string const& name,
0928                                    std::vector<std::string> const& endPathNames,
0929                                    ConditionalTaskHelper const& conditionalTaskHelper) {
0930     PathWorkers tmpworkers;
0931     fillWorkers(
0932         proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
0933 
0934     if (!tmpworkers.empty()) {
0935       //EndPaths are not supposed to stop if SkipEvent type exception happens
0936       end_paths_.emplace_back(bitpos,
0937                               name,
0938                               tmpworkers,
0939                               TrigResPtr(),
0940                               actionTable(),
0941                               actReg_,
0942                               &streamContext_,
0943                               nullptr,
0944                               PathContext::PathType::kEndPath);
0945     } else {
0946       empty_end_paths_.push_back(bitpos);
0947     }
0948     for (WorkerInPath const& workerInPath : tmpworkers) {
0949       addToAllWorkers(workerInPath.getWorker());
0950     }
0951   }
0952 
0953   void StreamSchedule::beginStream() { workerManager_.beginStream(streamID_, streamContext_); }
0954 
0955   void StreamSchedule::endStream() { workerManager_.endStream(streamID_, streamContext_); }
0956 
0957   void StreamSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
0958     Worker* found = nullptr;
0959     for (auto const& worker : allWorkers()) {
0960       if (worker->description()->moduleLabel() == iLabel) {
0961         found = worker;
0962         break;
0963       }
0964     }
0965     if (nullptr == found) {
0966       return;
0967     }
0968 
0969     iMod->replaceModuleFor(found);
0970     found->beginStream(streamID_, streamContext_);
0971   }
0972 
0973   void StreamSchedule::deleteModule(std::string const& iLabel) { workerManager_.deleteModuleIfExists(iLabel); }
0974 
0975   std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
0976     std::vector<ModuleDescription const*> result;
0977     result.reserve(allWorkers().size());
0978 
0979     for (auto const& worker : allWorkers()) {
0980       ModuleDescription const* p = worker->description();
0981       result.push_back(p);
0982     }
0983     return result;
0984   }
0985 
0986   void StreamSchedule::processOneEventAsync(
0987       WaitingTaskHolder iTask,
0988       EventTransitionInfo& info,
0989       ServiceToken const& serviceToken,
0990       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
0991     EventPrincipal& ep = info.principal();
0992 
0993     // Caught exception is propagated via WaitingTaskHolder
0994     CMS_SA_ALLOW try {
0995       this->resetAll();
0996 
0997       using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
0998 
0999       Traits::setStreamContext(streamContext_, ep);
1000       //a service may want to communicate with another service
1001       ServiceRegistry::Operate guard(serviceToken);
1002       Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1003 
1004       // Data dependencies need to be set up before marking empty
1005       // (End)Paths complete in case something consumes the status of
1006       // the empty (EndPath)
1007       workerManager_.setupResolvers(ep);
1008       workerManager_.setupOnDemandSystem(info);
1009 
1010       HLTPathStatus hltPathStatus(hlt::Pass, 0);
1011       for (int empty_trig_path : empty_trig_paths_) {
1012         results_->at(empty_trig_path) = hltPathStatus;
1013         pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1014         std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1015                                         ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1016                                             info, streamID_, ParentContext(&streamContext_), &streamContext_);
1017         if (except) {
1018           iTask.doneWaiting(except);
1019           return;
1020         }
1021       }
1022       for (int empty_end_path : empty_end_paths_) {
1023         std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
1024                                         ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1025                                             info, streamID_, ParentContext(&streamContext_), &streamContext_);
1026         if (except) {
1027           iTask.doneWaiting(except);
1028           return;
1029         }
1030       }
1031 
1032       ++total_events_;
1033 
1034       //use to give priorities on an error to ones from Paths
1035       auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1036       auto pathErrorPtr = pathErrorHolder.get();
1037       ServiceWeakToken weakToken = serviceToken;
1038       auto allPathsDone = make_waiting_task(
1039           [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1040             ServiceRegistry::Operate operate(weakToken.lock());
1041 
1042             std::exception_ptr ptr;
1043             if (pathError->load()) {
1044               ptr = *pathError->load();
1045               delete pathError->load();
1046             }
1047             if ((not ptr) and iPtr) {
1048               ptr = *iPtr;
1049             }
1050             iTask.doneWaiting(finishProcessOneEvent(ptr));
1051           });
1052       //The holder guarantees that if the paths finish before the loop ends
1053       // that we do not start too soon. It also guarantees that the task will
1054       // run under that condition.
1055       WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1056 
1057       auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1058                                              std::exception_ptr const* iPtr) mutable {
1059         ServiceRegistry::Operate operate(weakToken.lock());
1060 
1061         if (iPtr) {
1062           //this is used to prioritize this error over one
1063           // that happens in EndPath or Accumulate
1064           pathErrorPtr->store(new std::exception_ptr(*iPtr));
1065         }
1066         finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1067       });
1068 
1069       //The holder guarantees that if the paths finish before the loop ends
1070       // that we do not start too soon. It also guarantees that the task will
1071       // run under that condition.
1072       WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1073 
1074       //start end paths first so on single threaded the paths will run first
1075       WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1076       for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1077         it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1078       }
1079 
1080       for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1081         it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1082       }
1083 
1084       ParentContext parentContext(&streamContext_);
1085       workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1086           hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1087     } catch (...) {
1088       iTask.doneWaiting(std::current_exception());
1089     }
1090   }
1091 
1092   void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1093                                      WaitingTaskHolder iWait,
1094                                      EventTransitionInfo& info) {
1095     if (iExcept) {
1096       // Caught exception is propagated via WaitingTaskHolder
1097       CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1098         exception_actions::ActionCodes action = actionTable().find(e.category());
1099         assert(action != exception_actions::IgnoreCompletely);
1100         assert(action != exception_actions::FailPath);
1101         if (action == exception_actions::SkipEvent) {
1102           edm::printCmsExceptionWarning("SkipEvent", e);
1103           *(iExcept.load()) = std::exception_ptr();
1104         } else {
1105           *(iExcept.load()) = std::current_exception();
1106         }
1107       } catch (...) {
1108         *(iExcept.load()) = std::current_exception();
1109       }
1110     }
1111 
1112     if ((not iExcept) and results_->accept()) {
1113       ++total_passed_;
1114     }
1115 
1116     if (nullptr != results_inserter_.get()) {
1117       // Caught exception is propagated to the caller
1118       CMS_SA_ALLOW try {
1119         //Even if there was an exception, we need to allow results inserter
1120         // to run since some module may be waiting on its results.
1121         ParentContext parentContext(&streamContext_);
1122         using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1123 
1124         auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1125         if (expt) {
1126           std::rethrow_exception(expt);
1127         }
1128       } catch (cms::Exception& ex) {
1129         if (not iExcept) {
1130           if (ex.context().empty()) {
1131             std::ostringstream ost;
1132             ost << "Processing Event " << info.principal().id();
1133             ex.addContext(ost.str());
1134           }
1135           iExcept.store(new std::exception_ptr(std::current_exception()));
1136         }
1137       } catch (...) {
1138         if (not iExcept) {
1139           iExcept.store(new std::exception_ptr(std::current_exception()));
1140         }
1141       }
1142     }
1143     std::exception_ptr ptr;
1144     if (iExcept) {
1145       ptr = *iExcept.load();
1146     }
1147     iWait.doneWaiting(ptr);
1148   }
1149 
1150   std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1151     using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1152 
1153     if (iExcept) {
1154       //add context information to the exception and print message
1155       try {
1156         convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1157       } catch (cms::Exception& ex) {
1158         bool const cleaningUpAfterException = false;
1159         if (ex.context().empty()) {
1160           addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1161         } else {
1162           addContextAndPrintException("", ex, cleaningUpAfterException);
1163         }
1164         iExcept = std::current_exception();
1165       }
1166 
1167       actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1168     }
1169     // Caught exception is propagated to the caller
1170     CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1171       if (not iExcept) {
1172         iExcept = std::current_exception();
1173       }
1174     }
1175     if (not iExcept) {
1176       resetEarlyDelete();
1177     }
1178 
1179     return iExcept;
1180   }
1181 
1182   void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1183     oLabelsToFill.reserve(trig_paths_.size());
1184     std::transform(trig_paths_.begin(),
1185                    trig_paths_.end(),
1186                    std::back_inserter(oLabelsToFill),
1187                    std::bind(&Path::name, std::placeholders::_1));
1188   }
1189 
1190   void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1191     TrigPaths::const_iterator itFound = std::find_if(
1192         trig_paths_.begin(),
1193         trig_paths_.end(),
1194         std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1195     if (itFound != trig_paths_.end()) {
1196       oLabelsToFill.reserve(itFound->size());
1197       for (size_t i = 0; i < itFound->size(); ++i) {
1198         oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1199       }
1200     }
1201   }
1202 
1203   void StreamSchedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1204                                                 std::vector<ModuleDescription const*>& descriptions,
1205                                                 unsigned int hint) const {
1206     descriptions.clear();
1207     bool found = false;
1208     TrigPaths::const_iterator itFound;
1209 
1210     if (hint < trig_paths_.size()) {
1211       itFound = trig_paths_.begin() + hint;
1212       if (itFound->name() == iPathLabel)
1213         found = true;
1214     }
1215     if (!found) {
1216       // if the hint did not work, do it the slow way
1217       itFound = std::find_if(
1218           trig_paths_.begin(),
1219           trig_paths_.end(),
1220           std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1221       if (itFound != trig_paths_.end())
1222         found = true;
1223     }
1224     if (found) {
1225       descriptions.reserve(itFound->size());
1226       for (size_t i = 0; i < itFound->size(); ++i) {
1227         descriptions.push_back(itFound->getWorker(i)->description());
1228       }
1229     }
1230   }
1231 
1232   void StreamSchedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1233                                                    std::vector<ModuleDescription const*>& descriptions,
1234                                                    unsigned int hint) const {
1235     descriptions.clear();
1236     bool found = false;
1237     TrigPaths::const_iterator itFound;
1238 
1239     if (hint < end_paths_.size()) {
1240       itFound = end_paths_.begin() + hint;
1241       if (itFound->name() == iEndPathLabel)
1242         found = true;
1243     }
1244     if (!found) {
1245       // if the hint did not work, do it the slow way
1246       itFound = std::find_if(
1247           end_paths_.begin(),
1248           end_paths_.end(),
1249           std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1250       if (itFound != end_paths_.end())
1251         found = true;
1252     }
1253     if (found) {
1254       descriptions.reserve(itFound->size());
1255       for (size_t i = 0; i < itFound->size(); ++i) {
1256         descriptions.push_back(itFound->getWorker(i)->description());
1257       }
1258     }
1259   }
1260 
1261   static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1262     sum.timesVisited += path.timesVisited(which);
1263     sum.timesPassed += path.timesPassed(which);
1264     sum.timesFailed += path.timesFailed(which);
1265     sum.timesExcept += path.timesExcept(which);
1266     sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1267     sum.bitPosition = path.bitPosition(which);
1268   }
1269 
1270   static void fillPathSummary(Path const& path, PathSummary& sum) {
1271     sum.name = path.name();
1272     sum.bitPosition = path.bitPosition();
1273     sum.timesRun += path.timesRun();
1274     sum.timesPassed += path.timesPassed();
1275     sum.timesFailed += path.timesFailed();
1276     sum.timesExcept += path.timesExcept();
1277 
1278     Path::size_type sz = path.size();
1279     if (sum.moduleInPathSummaries.empty()) {
1280       std::vector<ModuleInPathSummary> temp(sz);
1281       for (size_t i = 0; i != sz; ++i) {
1282         fillModuleInPathSummary(path, i, temp[i]);
1283       }
1284       sum.moduleInPathSummaries.swap(temp);
1285     } else {
1286       assert(sz == sum.moduleInPathSummaries.size());
1287       for (size_t i = 0; i != sz; ++i) {
1288         fillModuleInPathSummary(path, i, sum.moduleInPathSummaries[i]);
1289       }
1290     }
1291   }
1292 
1293   static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1294     sum.timesVisited += w.timesVisited();
1295     sum.timesRun += w.timesRun();
1296     sum.timesPassed += w.timesPassed();
1297     sum.timesFailed += w.timesFailed();
1298     sum.timesExcept += w.timesExcept();
1299     sum.moduleLabel = w.description()->moduleLabel();
1300   }
1301 
1302   static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1303 
1304   void StreamSchedule::getTriggerReport(TriggerReport& rep) const {
1305     rep.eventSummary.totalEvents += totalEvents();
1306     rep.eventSummary.totalEventsPassed += totalEventsPassed();
1307     rep.eventSummary.totalEventsFailed += totalEventsFailed();
1308 
1309     fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1310     fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1311     fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1312   }
1313 
1314   void StreamSchedule::clearCounters() {
1315     using std::placeholders::_1;
1316     total_events_ = total_passed_ = 0;
1317     for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1318     for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1319     for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1320   }
1321 
1322   void StreamSchedule::resetAll() {
1323     skippingEvent_ = false;
1324     results_->reset();
1325   }
1326 
1327   void StreamSchedule::addToAllWorkers(Worker* w) { workerManager_.addToAllWorkers(w); }
1328 
1329   void StreamSchedule::resetEarlyDelete() {
1330     //must be sure we have cleared the count first
1331     for (auto& count : earlyDeleteBranchToCount_) {
1332       count.count = 0;
1333     }
1334     //now reset based on how many helpers use that branch
1335     for (auto& index : earlyDeleteHelperToBranchIndicies_) {
1336       ++(earlyDeleteBranchToCount_[index].count);
1337     }
1338     for (auto& helper : earlyDeleteHelpers_) {
1339       helper.reset();
1340     }
1341   }
1342 
1343   void StreamSchedule::makePathStatusInserters(
1344       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1345       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1346       ExceptionToActionTable const& actions) {
1347     int bitpos = 0;
1348     unsigned int indexEmpty = 0;
1349     unsigned int indexOfPath = 0;
1350     for (auto& pathStatusInserter : pathStatusInserters) {
1351       std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1352       WorkerPtr workerPtr(
1353           new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1354       pathStatusInserterWorkers_.emplace_back(workerPtr);
1355       workerPtr->setActivityRegistry(actReg_);
1356       addToAllWorkers(workerPtr.get());
1357 
1358       // A little complexity here because a C++ Path object is not
1359       // instantiated and put into end_paths if there are no modules
1360       // on the configured path.
1361       if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1362         ++indexEmpty;
1363       } else {
1364         trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1365         ++indexOfPath;
1366       }
1367       ++bitpos;
1368     }
1369 
1370     bitpos = 0;
1371     indexEmpty = 0;
1372     indexOfPath = 0;
1373     for (auto& endPathStatusInserter : endPathStatusInserters) {
1374       std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1375       WorkerPtr workerPtr(
1376           new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1377       endPathStatusInserterWorkers_.emplace_back(workerPtr);
1378       workerPtr->setActivityRegistry(actReg_);
1379       addToAllWorkers(workerPtr.get());
1380 
1381       // A little complexity here because a C++ Path object is not
1382       // instantiated and put into end_paths if there are no modules
1383       // on the configured path.
1384       if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1385         ++indexEmpty;
1386       } else {
1387         end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1388         ++indexOfPath;
1389       }
1390       ++bitpos;
1391     }
1392   }
1393 }  // namespace edm