Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-01-13 02:35:04

0001 #include "FWCore/Framework/interface/StreamSchedule.h"
0002 
0003 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0004 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0005 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0007 #include "FWCore/Framework/src/OutputModuleDescription.h"
0008 #include "FWCore/Framework/interface/TriggerNamesService.h"
0009 #include "FWCore/Framework/src/TriggerReport.h"
0010 #include "FWCore/Framework/src/TriggerTimingReport.h"
0011 #include "FWCore/Framework/src/Factory.h"
0012 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0013 #include "FWCore/Framework/src/TriggerResultInserter.h"
0014 #include "FWCore/Framework/src/PathStatusInserter.h"
0015 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0016 #include "FWCore/Framework/interface/WorkerInPath.h"
0017 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0018 #include "FWCore/Framework/interface/maker/WorkerT.h"
0019 #include "FWCore/Framework/interface/ModuleRegistry.h"
0020 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 #include "FWCore/ParameterSet/interface/Registry.h"
0024 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0025 #include "FWCore/Utilities/interface/Algorithms.h"
0026 #include "FWCore/Utilities/interface/ConvertException.h"
0027 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0028 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0029 
0030 #include "LuminosityBlockProcessingStatus.h"
0031 #include "processEDAliases.h"
0032 
0033 #include <algorithm>
0034 #include <cassert>
0035 #include <cstdlib>
0036 #include <functional>
0037 #include <iomanip>
0038 #include <limits>
0039 #include <list>
0040 #include <map>
0041 #include <exception>
0042 #include <fmt/format.h>
0043 
0044 namespace edm {
0045 
0046   namespace {
0047 
0048     // Function template to transform each element in the input range to
0049     // a value placed into the output range. The supplied function
0050     // should take a const_reference to the 'input', and write to a
0051     // reference to the 'output'.
0052     template <typename InputIterator, typename ForwardIterator, typename Func>
0053     void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
0054       for (; begin != end; ++begin, ++out)
0055         func(*begin, *out);
0056     }
0057 
0058     // Function template that takes a sequence 'from', a sequence
0059     // 'to', and a callable object 'func'. It and applies
0060     // transform_into to fill the 'to' sequence with the values
0061     // calcuated by the callable object, taking care to fill the
0062     // outupt only if all calls succeed.
0063     template <typename FROM, typename TO, typename FUNC>
0064     void fill_summary(FROM const& from, TO& to, FUNC func) {
0065       if (to.size() != from.size()) {
0066         TO temp(from.size());
0067         transform_into(from.begin(), from.end(), temp.begin(), func);
0068         to.swap(temp);
0069       } else {
0070         transform_into(from.begin(), from.end(), to.begin(), func);
0071       }
0072     }
0073 
0074     // -----------------------------
0075 
0076     // Here we make the trigger results inserter directly.  This should
0077     // probably be a utility in the WorkerRegistry or elsewhere.
0078 
0079     StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
0080                                            std::shared_ptr<ActivityRegistry> areg,
0081                                            std::shared_ptr<TriggerResultInserter> inserter) {
0082       StreamSchedule::WorkerPtr ptr(
0083           new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
0084       ptr->setActivityRegistry(areg);
0085       return ptr;
0086     }
0087 
0088     void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
0089                                          ProductRegistry const& preg,
0090                                          std::multimap<std::string, Worker*>& branchToReadingWorker) {
0091       auto vBranchesToDeleteEarly = branchesToDeleteEarly;
0092       // Remove any duplicates
0093       std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
0094       vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
0095                                    vBranchesToDeleteEarly.end());
0096 
0097       // Are the requested items in the product registry?
0098       auto allBranchNames = preg.allBranchNames();
0099       //the branch names all end with a period, which we do not want to compare with
0100       for (auto& b : allBranchNames) {
0101         b.resize(b.size() - 1);
0102       }
0103       std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
0104       std::vector<std::string> temp;
0105       temp.reserve(vBranchesToDeleteEarly.size());
0106 
0107       std::set_intersection(vBranchesToDeleteEarly.begin(),
0108                             vBranchesToDeleteEarly.end(),
0109                             allBranchNames.begin(),
0110                             allBranchNames.end(),
0111                             std::back_inserter(temp));
0112       vBranchesToDeleteEarly.swap(temp);
0113       if (temp.size() != vBranchesToDeleteEarly.size()) {
0114         std::vector<std::string> missingProducts;
0115         std::set_difference(temp.begin(),
0116                             temp.end(),
0117                             vBranchesToDeleteEarly.begin(),
0118                             vBranchesToDeleteEarly.end(),
0119                             std::back_inserter(missingProducts));
0120         LogInfo l("MissingProductsForCanDeleteEarly");
0121         l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
0122         for (auto const& n : missingProducts) {
0123           l << "\n " << n;
0124         }
0125       }
0126       //set placeholder for the branch, we will remove the nullptr if a
0127       // module actually wants the branch.
0128       for (auto const& branch : vBranchesToDeleteEarly) {
0129         branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
0130       }
0131     }
0132 
0133     Worker* getWorker(std::string const& moduleLabel,
0134                       ParameterSet& proc_pset,
0135                       WorkerManager& workerManager,
0136                       ProductRegistry& preg,
0137                       PreallocationConfiguration const* prealloc,
0138                       std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0139       bool isTracked;
0140       ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0141       if (modpset == nullptr) {
0142         return nullptr;
0143       }
0144       assert(isTracked);
0145 
0146       return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
0147     }
0148 
0149     // If ConditionalTask modules exist in the container of module
0150     // names, returns the range (std::pair) for the modules. The range
0151     // excludes the special markers '#' (right before the
0152     // ConditionalTask modules) and '@' (last element).
0153     // If the module name container does not contain ConditionalTask
0154     // modules, returns std::pair of end iterators.
0155     template <typename T>
0156     auto findConditionalTaskModulesRange(T& modnames) {
0157       auto beg = std::find(modnames.begin(), modnames.end(), "#");
0158       if (beg == modnames.end()) {
0159         return std::pair(modnames.end(), modnames.end());
0160       }
0161       return std::pair(beg + 1, std::prev(modnames.end()));
0162     }
0163 
0164     std::optional<std::string> findBestMatchingAlias(
0165         std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0166         std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
0167         std::string const& productModuleLabel,
0168         ConsumesInfo const& consumesInfo) {
0169       std::optional<std::string> best;
0170       int wildcardsInBest = std::numeric_limits<int>::max();
0171       bool bestIsAmbiguous = false;
0172 
0173       auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
0174                             std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
0175         int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
0176         if (wildcards == 0) {
0177           bestIsAmbiguous = false;
0178           return true;
0179         }
0180         if (not best or wildcards < wildcardsInBest) {
0181           best = label;
0182           wildcardsInBest = wildcards;
0183           bestIsAmbiguous = false;
0184         } else if (best and *best != label and wildcardsInBest == wildcards) {
0185           bestIsAmbiguous = true;
0186         }
0187         return false;
0188       };
0189 
0190       auto findAlias = aliasMap.equal_range(productModuleLabel);
0191       for (auto it = findAlias.first; it != findAlias.second; ++it) {
0192         std::string const& aliasInstanceLabel =
0193             it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
0194         bool const instanceIsWildcard = (aliasInstanceLabel == "*");
0195         if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
0196           bool const typeIsWildcard = it->second.friendlyClassName == "*";
0197           if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
0198             if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0199               return it->second.originalModuleLabel;
0200             }
0201           } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
0202             //consume is a View so need to do more intrusive search
0203             //find matching branches in module
0204             auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
0205             for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
0206               if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
0207                 if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
0208                                                                    TypeID(itBranch->second->wrappedType().typeInfo()),
0209                                                                    itBranch->second->className())) {
0210                   if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0211                     return it->second.originalModuleLabel;
0212                   }
0213                 }
0214               }
0215             }
0216           }
0217         }
0218       }
0219       if (bestIsAmbiguous) {
0220         throw Exception(errors::UnimplementedFeature)
0221             << "Encountered ambiguity when trying to find a best-matching alias for\n"
0222             << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
0223             << " module label " << productModuleLabel << "\n"
0224             << " product instance name " << consumesInfo.instance() << "\n"
0225             << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
0226                "wildcards ("
0227             << wildcardsInBest << ")";
0228       }
0229       return best;
0230     }
0231   }  // namespace
0232 
0233   // -----------------------------
0234 
0235   typedef std::vector<std::string> vstring;
0236 
0237   // -----------------------------
0238 
0239   class ConditionalTaskHelper {
0240   public:
0241     using AliasInfo = StreamSchedule::AliasInfo;
0242 
0243     ConditionalTaskHelper(ParameterSet& proc_pset,
0244                           ProductRegistry& preg,
0245                           PreallocationConfiguration const* prealloc,
0246                           std::shared_ptr<ProcessConfiguration const> processConfiguration,
0247                           WorkerManager& workerManager,
0248                           std::vector<std::string> const& trigPathNames) {
0249       std::unordered_set<std::string> allConditionalMods;
0250       for (auto const& pathName : trigPathNames) {
0251         auto const modnames = proc_pset.getParameter<vstring>(pathName);
0252 
0253         //Pull out ConditionalTask modules
0254         auto condRange = findConditionalTaskModulesRange(modnames);
0255         if (condRange.first == condRange.second)
0256           continue;
0257 
0258         //the last entry should be ignored since it is required to be "@"
0259         allConditionalMods.insert(condRange.first, condRange.second);
0260       }
0261 
0262       for (auto const& cond : allConditionalMods) {
0263         //force the creation of the conditional modules so alias check can work
0264         (void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
0265       }
0266 
0267       fillAliasMap(proc_pset, allConditionalMods);
0268       processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
0269 
0270       //find branches created by the conditional modules
0271       for (auto const& prod : preg.productList()) {
0272         if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
0273           conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
0274         }
0275       }
0276     }
0277 
0278     std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
0279 
0280     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
0281         std::unordered_set<std::string> const& conditionalmods) const {
0282       std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
0283       for (auto const& mod : conditionalmods) {
0284         auto range = conditionalModsBranches_.equal_range(mod);
0285         ret.insert(range.first, range.second);
0286       }
0287       return ret;
0288     }
0289 
0290   private:
0291     void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
0292       auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0293       std::string const star("*");
0294       for (auto const& alias : aliases) {
0295         auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
0296         auto aliasedToModuleLabels = info.getParameterNames();
0297         for (auto const& mod : aliasedToModuleLabels) {
0298           if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
0299             auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
0300             for (auto const& aliasPSet : aliasVPSet) {
0301               std::string type = star;
0302               std::string instance = star;
0303               std::string originalInstance = star;
0304               if (aliasPSet.exists("type")) {
0305                 type = aliasPSet.getParameter<std::string>("type");
0306               }
0307               if (aliasPSet.exists("toProductInstance")) {
0308                 instance = aliasPSet.getParameter<std::string>("toProductInstance");
0309               }
0310               if (aliasPSet.exists("fromProductInstance")) {
0311                 originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
0312               }
0313 
0314               aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
0315             }
0316           }
0317         }
0318       }
0319     }
0320 
0321     void processSwitchEDAliases(ParameterSet const& proc_pset,
0322                                 ProductRegistry& preg,
0323                                 ProcessConfiguration const& processConfiguration,
0324                                 std::unordered_set<std::string> const& allConditionalMods) {
0325       auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
0326       std::vector<std::string> switchEDAliases;
0327       for (auto const& module : all_modules) {
0328         auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
0329         if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
0330           auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
0331           for (auto const& case_label : all_cases) {
0332             auto range = aliasMap_.equal_range(case_label);
0333             if (range.first != range.second) {
0334               switchEDAliases.push_back(case_label);
0335             }
0336           }
0337         }
0338       }
0339       detail::processEDAliases(
0340           switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
0341     }
0342 
0343     std::unordered_multimap<std::string, AliasInfo> aliasMap_;
0344     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
0345   };
0346 
0347   // -----------------------------
0348 
0349   StreamSchedule::StreamSchedule(
0350       std::shared_ptr<TriggerResultInserter> inserter,
0351       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0352       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0353       std::shared_ptr<ModuleRegistry> modReg,
0354       ParameterSet& proc_pset,
0355       service::TriggerNamesService const& tns,
0356       PreallocationConfiguration const& prealloc,
0357       ProductRegistry& preg,
0358       ExceptionToActionTable const& actions,
0359       std::shared_ptr<ActivityRegistry> areg,
0360       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0361       StreamID streamID,
0362       ProcessContext const* processContext)
0363       : workerManager_(modReg, areg, actions),
0364         actReg_(areg),
0365         results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
0366         results_inserter_(),
0367         trig_paths_(),
0368         end_paths_(),
0369         total_events_(),
0370         total_passed_(),
0371         number_of_unscheduled_modules_(0),
0372         streamID_(streamID),
0373         streamContext_(streamID_, processContext) {
0374     bool hasPath = false;
0375     std::vector<std::string> const& pathNames = tns.getTrigPaths();
0376     std::vector<std::string> const& endPathNames = tns.getEndPaths();
0377 
0378     ConditionalTaskHelper conditionalTaskHelper(
0379         proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
0380     std::unordered_set<std::string> conditionalModules;
0381 
0382     int trig_bitpos = 0;
0383     trig_paths_.reserve(pathNames.size());
0384     for (auto const& trig_name : pathNames) {
0385       fillTrigPath(proc_pset,
0386                    preg,
0387                    &prealloc,
0388                    processConfiguration,
0389                    trig_bitpos,
0390                    trig_name,
0391                    results(),
0392                    endPathNames,
0393                    conditionalTaskHelper,
0394                    conditionalModules);
0395       ++trig_bitpos;
0396       hasPath = true;
0397     }
0398 
0399     if (hasPath) {
0400       // the results inserter stands alone
0401       inserter->setTrigResultForStream(streamID.value(), results());
0402 
0403       results_inserter_ = makeInserter(actions, actReg_, inserter);
0404       addToAllWorkers(results_inserter_.get());
0405     }
0406 
0407     // fill normal endpaths
0408     int bitpos = 0;
0409     end_paths_.reserve(endPathNames.size());
0410     for (auto const& end_path_name : endPathNames) {
0411       fillEndPath(proc_pset,
0412                   preg,
0413                   &prealloc,
0414                   processConfiguration,
0415                   bitpos,
0416                   end_path_name,
0417                   endPathNames,
0418                   conditionalTaskHelper,
0419                   conditionalModules);
0420       ++bitpos;
0421     }
0422 
0423     makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
0424 
0425     //See if all modules were used
0426     std::set<std::string> usedWorkerLabels;
0427     for (auto const& worker : allWorkers()) {
0428       usedWorkerLabels.insert(worker->description()->moduleLabel());
0429     }
0430     std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0431     std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0432     std::vector<std::string> unusedLabels;
0433     set_difference(modulesInConfigSet.begin(),
0434                    modulesInConfigSet.end(),
0435                    usedWorkerLabels.begin(),
0436                    usedWorkerLabels.end(),
0437                    back_inserter(unusedLabels));
0438     std::set<std::string> unscheduledLabels;
0439     std::vector<std::string> shouldBeUsedLabels;
0440     if (!unusedLabels.empty()) {
0441       //Need to
0442       // 1) create worker
0443       // 2) if it is a WorkerT<EDProducer>, add it to our list
0444       // 3) hand list to our delayed reader
0445       for (auto const& label : unusedLabels) {
0446         bool isTracked;
0447         ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
0448         assert(isTracked);
0449         assert(modulePSet != nullptr);
0450         workerManager_.addToUnscheduledWorkers(
0451             *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0452       }
0453       if (!shouldBeUsedLabels.empty()) {
0454         std::ostringstream unusedStream;
0455         unusedStream << "'" << shouldBeUsedLabels.front() << "'";
0456         for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
0457                                                 itLabelEnd = shouldBeUsedLabels.end();
0458              itLabel != itLabelEnd;
0459              ++itLabel) {
0460           unusedStream << ",'" << *itLabel << "'";
0461         }
0462         LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
0463       }
0464     }
0465     number_of_unscheduled_modules_ = unscheduledLabels.size();
0466 
0467     // Print conditional modules that were not consumed in any of their associated Paths
0468     if (streamID.value() == 0 and not conditionalModules.empty()) {
0469       // Intersection of unscheduled and ConditionalTask modules gives
0470       // directly the set of conditional modules that were not
0471       // consumed by anything in the Paths associated to the
0472       // corresponding ConditionalTask.
0473       std::vector<std::string_view> labelsToPrint;
0474       std::copy_if(
0475           unscheduledLabels.begin(),
0476           unscheduledLabels.end(),
0477           std::back_inserter(labelsToPrint),
0478           [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
0479 
0480       if (not labelsToPrint.empty()) {
0481         edm::LogWarning log("NonConsumedConditionalModules");
0482         log << "The following modules were part of some ConditionalTask, but were not\n"
0483             << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
0484             << "was associated. Perhaps they should be either removed from the\n"
0485             << "job, or moved to a Task to make it explicit they are unscheduled.\n";
0486         for (auto const& modLabel : labelsToPrint) {
0487           log.format("\n {}", modLabel);
0488         }
0489       }
0490     }
0491   }  // StreamSchedule::StreamSchedule
0492 
0493   void StreamSchedule::initializeEarlyDelete(ModuleRegistry& modReg,
0494                                              std::vector<std::string> const& branchesToDeleteEarly,
0495                                              std::multimap<std::string, std::string> const& referencesToBranches,
0496                                              std::vector<std::string> const& modulesToSkip,
0497                                              edm::ProductRegistry const& preg) {
0498     // setup the list with those products actually registered for this job
0499     std::multimap<std::string, Worker*> branchToReadingWorker;
0500     initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
0501 
0502     const std::vector<std::string> kEmpty;
0503     std::map<Worker*, unsigned int> reserveSizeForWorker;
0504     unsigned int upperLimitOnReadingWorker = 0;
0505     unsigned int upperLimitOnIndicies = 0;
0506     unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
0507 
0508     //talk with output modules first
0509     modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
0510       auto comm = iHolder->createOutputModuleCommunicator();
0511       if (comm) {
0512         if (!branchToReadingWorker.empty()) {
0513           //If an OutputModule needs a product, we can't delete it early
0514           // so we should remove it from our list
0515           SelectedProductsForBranchType const& kept = comm->keptProducts();
0516           for (auto const& item : kept[InEvent]) {
0517             BranchDescription const& desc = *item.first;
0518             auto found = branchToReadingWorker.equal_range(desc.branchName());
0519             if (found.first != found.second) {
0520               --nUniqueBranchesToDelete;
0521               branchToReadingWorker.erase(found.first, found.second);
0522             }
0523           }
0524         }
0525       }
0526     });
0527 
0528     if (branchToReadingWorker.empty()) {
0529       return;
0530     }
0531 
0532     std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
0533     for (auto w : allWorkers()) {
0534       if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
0535         continue;
0536       }
0537       //determine if this module could read a branch we want to delete early
0538       auto consumes = w->consumesInfo();
0539       if (not consumes.empty()) {
0540         bool foundAtLeastOneMatchingBranch = false;
0541         for (auto const& product : consumes) {
0542           std::string branch = fmt::format("{}_{}_{}_{}",
0543                                            product.type().friendlyClassName(),
0544                                            product.label().data(),
0545                                            product.instance().data(),
0546                                            product.process().data());
0547           {
0548             //Handle case where worker directly consumes product
0549             auto found = branchToReadingWorker.end();
0550             if (product.process().empty()) {
0551               auto startFound = branchToReadingWorker.lower_bound(branch);
0552               if (startFound != branchToReadingWorker.end()) {
0553                 if (startFound->first.substr(0, branch.size()) == branch) {
0554                   //match all processNames here, even if it means multiple matches will happen
0555                   found = startFound;
0556                 }
0557               }
0558             } else {
0559               auto exactFound = branchToReadingWorker.equal_range(branch);
0560               if (exactFound.first != exactFound.second) {
0561                 found = exactFound.first;
0562               }
0563             }
0564             if (found != branchToReadingWorker.end()) {
0565               if (not foundAtLeastOneMatchingBranch) {
0566                 ++upperLimitOnReadingWorker;
0567                 foundAtLeastOneMatchingBranch = true;
0568               }
0569               ++upperLimitOnIndicies;
0570               ++reserveSizeForWorker[w];
0571               if (nullptr == found->second) {
0572                 found->second = w;
0573               } else {
0574                 branchToReadingWorker.insert(make_pair(found->first, w));
0575               }
0576             }
0577           }
0578           {
0579             //Handle case where indirectly consumes product
0580             auto found = referencesToBranches.end();
0581             if (product.process().empty()) {
0582               auto startFound = referencesToBranches.lower_bound(branch);
0583               if (startFound != referencesToBranches.end()) {
0584                 if (startFound->first.substr(0, branch.size()) == branch) {
0585                   //match all processNames here, even if it means multiple matches will happen
0586                   found = startFound;
0587                 }
0588               }
0589             } else {
0590               //can match exactly
0591               auto exactFound = referencesToBranches.equal_range(branch);
0592               if (exactFound.first != exactFound.second) {
0593                 found = exactFound.first;
0594               }
0595             }
0596             if (found != referencesToBranches.end()) {
0597               for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
0598                 auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
0599                 if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
0600                   continue;
0601                 }
0602                 if (not foundAtLeastOneMatchingBranch) {
0603                   ++upperLimitOnReadingWorker;
0604                   foundAtLeastOneMatchingBranch = true;
0605                 }
0606                 ++upperLimitOnIndicies;
0607                 ++reserveSizeForWorker[w];
0608                 if (nullptr == foundInBranchToReadingWorker->second) {
0609                   foundInBranchToReadingWorker->second = w;
0610                 } else {
0611                   branchToReadingWorker.insert(make_pair(itr->second, w));
0612                 }
0613               }
0614             }
0615           }
0616         }
0617       }
0618     }
0619     {
0620       auto it = branchToReadingWorker.begin();
0621       std::vector<std::string> unusedBranches;
0622       while (it != branchToReadingWorker.end()) {
0623         if (it->second == nullptr) {
0624           unusedBranches.push_back(it->first);
0625           //erasing the object invalidates the iterator so must advance it first
0626           auto temp = it;
0627           ++it;
0628           branchToReadingWorker.erase(temp);
0629         } else {
0630           ++it;
0631         }
0632       }
0633       if (not unusedBranches.empty()) {
0634         LogWarning l("UnusedProductsForCanDeleteEarly");
0635         l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
0636              " If possible, remove the producer from the job.";
0637         for (auto const& n : unusedBranches) {
0638           l << "\n " << n;
0639         }
0640       }
0641     }
0642     if (!branchToReadingWorker.empty()) {
0643       earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
0644       earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
0645       earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
0646       std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
0647       std::string lastBranchName;
0648       size_t nextOpenIndex = 0;
0649       unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
0650       for (auto& branchAndWorker : branchToReadingWorker) {
0651         if (lastBranchName != branchAndWorker.first) {
0652           //have to put back the period we removed earlier in order to get the proper name
0653           BranchID bid(branchAndWorker.first + ".");
0654           earlyDeleteBranchToCount_.emplace_back(bid, 0U);
0655           lastBranchName = branchAndWorker.first;
0656         }
0657         auto found = alreadySeenWorkers.find(branchAndWorker.second);
0658         if (alreadySeenWorkers.end() == found) {
0659           //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
0660           // all the branches that might be read by this worker. However, initially we will only tell the
0661           // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
0662           // EarlyDeleteHelper will automatically advance its internal end pointer.
0663           size_t index = nextOpenIndex;
0664           size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
0665           assert(index < earlyDeleteHelperToBranchIndicies_.size());
0666           earlyDeleteHelperToBranchIndicies_[index] = earlyDeleteBranchToCount_.size() - 1;
0667           earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
0668           branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
0669           alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
0670           nextOpenIndex += nIndices;
0671         } else {
0672           found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
0673         }
0674       }
0675 
0676       //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
0677       // space needed for each module
0678       auto itLast = earlyDeleteHelpers_.begin();
0679       for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
0680         if (itLast->end() != it->begin()) {
0681           //figure the offset for next Worker since it hasn't been moved yet so it has the original address
0682           unsigned int delta = it->begin() - itLast->end();
0683           it->shiftIndexPointers(delta);
0684 
0685           earlyDeleteHelperToBranchIndicies_.erase(
0686               earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0687               earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
0688         }
0689         itLast = it;
0690       }
0691       earlyDeleteHelperToBranchIndicies_.erase(
0692           earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0693           earlyDeleteHelperToBranchIndicies_.end());
0694 
0695       //now tell the paths about the deleters
0696       for (auto& p : trig_paths_) {
0697         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0698       }
0699       for (auto& p : end_paths_) {
0700         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0701       }
0702       resetEarlyDelete();
0703     }
0704   }
0705 
0706   std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
0707       Worker* worker,
0708       std::unordered_set<std::string>& conditionalModules,
0709       std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0710       std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0711       ParameterSet& proc_pset,
0712       ProductRegistry& preg,
0713       PreallocationConfiguration const* prealloc,
0714       std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0715     std::vector<Worker*> returnValue;
0716     auto const& consumesInfo = worker->consumesInfo();
0717     auto moduleLabel = worker->description()->moduleLabel();
0718     using namespace productholderindexhelper;
0719     for (auto const& ci : consumesInfo) {
0720       if (not ci.skipCurrentProcess() and
0721           (ci.process().empty() or ci.process() == processConfiguration->processName())) {
0722         auto productModuleLabel = std::string(ci.label());
0723         bool productFromConditionalModule = false;
0724         auto itFound = conditionalModules.find(productModuleLabel);
0725         if (itFound == conditionalModules.end()) {
0726           //Check to see if this was an alias
0727           //note that aliasMap was previously filtered so only the conditional modules remain there
0728           auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
0729           if (foundAlias) {
0730             productModuleLabel = *foundAlias;
0731             productFromConditionalModule = true;
0732             itFound = conditionalModules.find(productModuleLabel);
0733             //check that the alias-for conditional module has not been used
0734             if (itFound == conditionalModules.end()) {
0735               continue;
0736             }
0737           }
0738         } else {
0739           //need to check the rest of the data product info
0740           auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
0741           for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
0742             if (itBranch->second->productInstanceName() == ci.instance()) {
0743               if (ci.kindOfType() == PRODUCT_TYPE) {
0744                 if (ci.type() == itBranch->second->unwrappedTypeID()) {
0745                   productFromConditionalModule = true;
0746                   break;
0747                 }
0748               } else {
0749                 //this is a view
0750                 if (typeIsViewCompatible(
0751                         ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
0752                   productFromConditionalModule = true;
0753                   break;
0754                 }
0755               }
0756             }
0757           }
0758         }
0759         if (productFromConditionalModule) {
0760           auto condWorker =
0761               getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0762           assert(condWorker);
0763 
0764           conditionalModules.erase(itFound);
0765 
0766           auto dependents = tryToPlaceConditionalModules(condWorker,
0767                                                          conditionalModules,
0768                                                          conditionalModuleBranches,
0769                                                          aliasMap,
0770                                                          proc_pset,
0771                                                          preg,
0772                                                          prealloc,
0773                                                          processConfiguration);
0774           returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0775           returnValue.push_back(condWorker);
0776         }
0777       }
0778     }
0779     return returnValue;
0780   }
0781 
0782   void StreamSchedule::fillWorkers(ParameterSet& proc_pset,
0783                                    ProductRegistry& preg,
0784                                    PreallocationConfiguration const* prealloc,
0785                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0786                                    std::string const& pathName,
0787                                    bool ignoreFilters,
0788                                    PathWorkers& out,
0789                                    std::vector<std::string> const& endPathNames,
0790                                    ConditionalTaskHelper const& conditionalTaskHelper,
0791                                    std::unordered_set<std::string>& allConditionalModules) {
0792     vstring modnames = proc_pset.getParameter<vstring>(pathName);
0793     PathWorkers tmpworkers;
0794 
0795     //Pull out ConditionalTask modules
0796     auto condRange = findConditionalTaskModulesRange(modnames);
0797 
0798     std::unordered_set<std::string> conditionalmods;
0799     //An EDAlias may be redirecting to a module on a ConditionalTask
0800     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
0801     std::unordered_map<std::string, unsigned int> conditionalModOrder;
0802     if (condRange.first != condRange.second) {
0803       for (auto it = condRange.first; it != condRange.second; ++it) {
0804         // ordering needs to skip the # token in the path list
0805         conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
0806       }
0807       //the last entry should be ignored since it is required to be "@"
0808       conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
0809                                                         std::make_move_iterator(condRange.second));
0810 
0811       conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
0812       modnames.erase(std::prev(condRange.first), modnames.end());
0813 
0814       // Make a union of all conditional modules from all Paths
0815       allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
0816     }
0817 
0818     unsigned int placeInPath = 0;
0819     for (auto const& name : modnames) {
0820       //Modules except EDFilters are set to run concurrently by default
0821       bool doNotRunConcurrently = false;
0822       WorkerInPath::FilterAction filterAction = WorkerInPath::Normal;
0823       if (name[0] == '!') {
0824         filterAction = WorkerInPath::Veto;
0825       } else if (name[0] == '-' or name[0] == '+') {
0826         filterAction = WorkerInPath::Ignore;
0827       }
0828       if (name[0] == '|' or name[0] == '+') {
0829         //cms.wait was specified so do not run concurrently
0830         doNotRunConcurrently = true;
0831       }
0832 
0833       std::string moduleLabel = name;
0834       if (filterAction != WorkerInPath::Normal or name[0] == '|') {
0835         moduleLabel.erase(0, 1);
0836       }
0837 
0838       Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0839       if (worker == nullptr) {
0840         std::string pathType("endpath");
0841         if (!search_all(endPathNames, pathName)) {
0842           pathType = std::string("path");
0843         }
0844         throw Exception(errors::Configuration)
0845             << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
0846             << "\"\n please check spelling or remove that label from the path.";
0847       }
0848 
0849       if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
0850         // We have a filter on an end path, and the filter is not explicitly ignored.
0851         // See if the filter is allowed.
0852         std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
0853         if (!search_all(allowed_filters, worker->description()->moduleName())) {
0854           // Filter is not allowed. Ignore the result, and issue a warning.
0855           filterAction = WorkerInPath::Ignore;
0856           LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
0857                                         << "' with module label '" << moduleLabel << "' appears on EndPath '"
0858                                         << pathName << "'.\n"
0859                                         << "The return value of the filter will be ignored.\n"
0860                                         << "To suppress this warning, either remove the filter from the endpath,\n"
0861                                         << "or explicitly ignore it in the configuration by using cms.ignore().\n";
0862         }
0863       }
0864       bool runConcurrently = not doNotRunConcurrently;
0865       if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
0866         runConcurrently = false;
0867       }
0868 
0869       auto condModules = tryToPlaceConditionalModules(worker,
0870                                                       conditionalmods,
0871                                                       conditionalModsBranches,
0872                                                       conditionalTaskHelper.aliasMap(),
0873                                                       proc_pset,
0874                                                       preg,
0875                                                       prealloc,
0876                                                       processConfiguration);
0877       for (auto condMod : condModules) {
0878         tmpworkers.emplace_back(
0879             condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
0880       }
0881 
0882       tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
0883       ++placeInPath;
0884     }
0885 
0886     out.swap(tmpworkers);
0887   }
0888 
0889   void StreamSchedule::fillTrigPath(ParameterSet& proc_pset,
0890                                     ProductRegistry& preg,
0891                                     PreallocationConfiguration const* prealloc,
0892                                     std::shared_ptr<ProcessConfiguration const> processConfiguration,
0893                                     int bitpos,
0894                                     std::string const& name,
0895                                     TrigResPtr trptr,
0896                                     std::vector<std::string> const& endPathNames,
0897                                     ConditionalTaskHelper const& conditionalTaskHelper,
0898                                     std::unordered_set<std::string>& allConditionalModules) {
0899     PathWorkers tmpworkers;
0900     fillWorkers(proc_pset,
0901                 preg,
0902                 prealloc,
0903                 processConfiguration,
0904                 name,
0905                 false,
0906                 tmpworkers,
0907                 endPathNames,
0908                 conditionalTaskHelper,
0909                 allConditionalModules);
0910 
0911     // an empty path will cause an extra bit that is not used
0912     if (!tmpworkers.empty()) {
0913       trig_paths_.emplace_back(
0914           bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
0915     } else {
0916       empty_trig_paths_.push_back(bitpos);
0917     }
0918     for (WorkerInPath const& workerInPath : tmpworkers) {
0919       addToAllWorkers(workerInPath.getWorker());
0920     }
0921   }
0922 
0923   void StreamSchedule::fillEndPath(ParameterSet& proc_pset,
0924                                    ProductRegistry& preg,
0925                                    PreallocationConfiguration const* prealloc,
0926                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0927                                    int bitpos,
0928                                    std::string const& name,
0929                                    std::vector<std::string> const& endPathNames,
0930                                    ConditionalTaskHelper const& conditionalTaskHelper,
0931                                    std::unordered_set<std::string>& allConditionalModules) {
0932     PathWorkers tmpworkers;
0933     fillWorkers(proc_pset,
0934                 preg,
0935                 prealloc,
0936                 processConfiguration,
0937                 name,
0938                 true,
0939                 tmpworkers,
0940                 endPathNames,
0941                 conditionalTaskHelper,
0942                 allConditionalModules);
0943 
0944     if (!tmpworkers.empty()) {
0945       end_paths_.emplace_back(bitpos,
0946                               name,
0947                               tmpworkers,
0948                               TrigResPtr(),
0949                               actionTable(),
0950                               actReg_,
0951                               &streamContext_,
0952                               PathContext::PathType::kEndPath);
0953     } else {
0954       empty_end_paths_.push_back(bitpos);
0955     }
0956     for (WorkerInPath const& workerInPath : tmpworkers) {
0957       addToAllWorkers(workerInPath.getWorker());
0958     }
0959   }
0960 
0961   void StreamSchedule::beginStream() { workerManager_.beginStream(streamID_, streamContext_); }
0962 
0963   void StreamSchedule::endStream() { workerManager_.endStream(streamID_, streamContext_); }
0964 
0965   void StreamSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
0966     Worker* found = nullptr;
0967     for (auto const& worker : allWorkers()) {
0968       if (worker->description()->moduleLabel() == iLabel) {
0969         found = worker;
0970         break;
0971       }
0972     }
0973     if (nullptr == found) {
0974       return;
0975     }
0976 
0977     iMod->replaceModuleFor(found);
0978     found->beginStream(streamID_, streamContext_);
0979   }
0980 
0981   void StreamSchedule::deleteModule(std::string const& iLabel) { workerManager_.deleteModuleIfExists(iLabel); }
0982 
0983   std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
0984     std::vector<ModuleDescription const*> result;
0985     result.reserve(allWorkers().size());
0986 
0987     for (auto const& worker : allWorkers()) {
0988       ModuleDescription const* p = worker->description();
0989       result.push_back(p);
0990     }
0991     return result;
0992   }
0993 
0994   void StreamSchedule::processOneEventAsync(
0995       WaitingTaskHolder iTask,
0996       EventTransitionInfo& info,
0997       ServiceToken const& serviceToken,
0998       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
0999     EventPrincipal& ep = info.principal();
1000 
1001     // Caught exception is propagated via WaitingTaskHolder
1002     CMS_SA_ALLOW try {
1003       this->resetAll();
1004 
1005       using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1006 
1007       Traits::setStreamContext(streamContext_, ep);
1008       //a service may want to communicate with another service
1009       ServiceRegistry::Operate guard(serviceToken);
1010       Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1011 
1012       // Data dependencies need to be set up before marking empty
1013       // (End)Paths complete in case something consumes the status of
1014       // the empty (EndPath)
1015       workerManager_.setupResolvers(ep);
1016       workerManager_.setupOnDemandSystem(info);
1017 
1018       HLTPathStatus hltPathStatus(hlt::Pass, 0);
1019       for (int empty_trig_path : empty_trig_paths_) {
1020         results_->at(empty_trig_path) = hltPathStatus;
1021         pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1022         std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1023                                         ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1024                                             info, streamID_, ParentContext(&streamContext_), &streamContext_);
1025         if (except) {
1026           iTask.doneWaiting(except);
1027           return;
1028         }
1029       }
1030       if (not endPathStatusInserterWorkers_.empty()) {
1031         for (int empty_end_path : empty_end_paths_) {
1032           std::exception_ptr except =
1033               endPathStatusInserterWorkers_[empty_end_path]
1034                   ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1035                       info, streamID_, ParentContext(&streamContext_), &streamContext_);
1036           if (except) {
1037             iTask.doneWaiting(except);
1038             return;
1039           }
1040         }
1041       }
1042 
1043       ++total_events_;
1044 
1045       //use to give priorities on an error to ones from Paths
1046       auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1047       auto pathErrorPtr = pathErrorHolder.get();
1048       ServiceWeakToken weakToken = serviceToken;
1049       auto allPathsDone = make_waiting_task(
1050           [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1051             ServiceRegistry::Operate operate(weakToken.lock());
1052 
1053             std::exception_ptr ptr;
1054             if (pathError->load()) {
1055               ptr = *pathError->load();
1056               delete pathError->load();
1057             }
1058             if ((not ptr) and iPtr) {
1059               ptr = *iPtr;
1060             }
1061             iTask.doneWaiting(finishProcessOneEvent(ptr));
1062           });
1063       //The holder guarantees that if the paths finish before the loop ends
1064       // that we do not start too soon. It also guarantees that the task will
1065       // run under that condition.
1066       WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1067 
1068       auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1069                                              std::exception_ptr const* iPtr) mutable {
1070         ServiceRegistry::Operate operate(weakToken.lock());
1071 
1072         if (iPtr) {
1073           //this is used to prioritize this error over one
1074           // that happens in EndPath or Accumulate
1075           pathErrorPtr->store(new std::exception_ptr(*iPtr));
1076         }
1077         finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1078       });
1079 
1080       //The holder guarantees that if the paths finish before the loop ends
1081       // that we do not start too soon. It also guarantees that the task will
1082       // run under that condition.
1083       WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1084 
1085       //start end paths first so on single threaded the paths will run first
1086       WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1087       for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1088         it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1089       }
1090 
1091       for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1092         it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1093       }
1094 
1095       ParentContext parentContext(&streamContext_);
1096       workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1097           hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1098     } catch (...) {
1099       iTask.doneWaiting(std::current_exception());
1100     }
1101   }
1102 
1103   void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1104                                      WaitingTaskHolder iWait,
1105                                      EventTransitionInfo& info) {
1106     if (iExcept) {
1107       // Caught exception is propagated via WaitingTaskHolder
1108       CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1109         exception_actions::ActionCodes action = actionTable().find(e.category());
1110         assert(action != exception_actions::IgnoreCompletely);
1111         if (action == exception_actions::TryToContinue) {
1112           edm::printCmsExceptionWarning("TryToContinue", e);
1113           *(iExcept.load()) = std::exception_ptr();
1114         } else {
1115           *(iExcept.load()) = std::current_exception();
1116         }
1117       } catch (...) {
1118         *(iExcept.load()) = std::current_exception();
1119       }
1120     }
1121 
1122     if ((not iExcept) and results_->accept()) {
1123       ++total_passed_;
1124     }
1125 
1126     if (nullptr != results_inserter_.get()) {
1127       // Caught exception is propagated to the caller
1128       CMS_SA_ALLOW try {
1129         //Even if there was an exception, we need to allow results inserter
1130         // to run since some module may be waiting on its results.
1131         ParentContext parentContext(&streamContext_);
1132         using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1133 
1134         auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1135         if (expt) {
1136           std::rethrow_exception(expt);
1137         }
1138       } catch (cms::Exception& ex) {
1139         if (not iExcept) {
1140           if (ex.context().empty()) {
1141             std::ostringstream ost;
1142             ost << "Processing Event " << info.principal().id();
1143             ex.addContext(ost.str());
1144           }
1145           iExcept.store(new std::exception_ptr(std::current_exception()));
1146         }
1147       } catch (...) {
1148         if (not iExcept) {
1149           iExcept.store(new std::exception_ptr(std::current_exception()));
1150         }
1151       }
1152     }
1153     std::exception_ptr ptr;
1154     if (iExcept) {
1155       ptr = *iExcept.load();
1156     }
1157     iWait.doneWaiting(ptr);
1158   }
1159 
1160   std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1161     using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1162 
1163     if (iExcept) {
1164       //add context information to the exception and print message
1165       try {
1166         convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1167       } catch (cms::Exception& ex) {
1168         bool const cleaningUpAfterException = false;
1169         if (ex.context().empty()) {
1170           addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1171         } else {
1172           addContextAndPrintException("", ex, cleaningUpAfterException);
1173         }
1174         iExcept = std::current_exception();
1175       }
1176 
1177       actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1178     }
1179     // Caught exception is propagated to the caller
1180     CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1181       if (not iExcept) {
1182         iExcept = std::current_exception();
1183       }
1184     }
1185     if (not iExcept) {
1186       resetEarlyDelete();
1187     }
1188 
1189     return iExcept;
1190   }
1191 
1192   void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1193     oLabelsToFill.reserve(trig_paths_.size());
1194     std::transform(trig_paths_.begin(),
1195                    trig_paths_.end(),
1196                    std::back_inserter(oLabelsToFill),
1197                    std::bind(&Path::name, std::placeholders::_1));
1198   }
1199 
1200   void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1201     TrigPaths::const_iterator itFound = std::find_if(
1202         trig_paths_.begin(),
1203         trig_paths_.end(),
1204         std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1205     if (itFound != trig_paths_.end()) {
1206       oLabelsToFill.reserve(itFound->size());
1207       for (size_t i = 0; i < itFound->size(); ++i) {
1208         oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1209       }
1210     }
1211   }
1212 
1213   void StreamSchedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1214                                                 std::vector<ModuleDescription const*>& descriptions,
1215                                                 unsigned int hint) const {
1216     descriptions.clear();
1217     bool found = false;
1218     TrigPaths::const_iterator itFound;
1219 
1220     if (hint < trig_paths_.size()) {
1221       itFound = trig_paths_.begin() + hint;
1222       if (itFound->name() == iPathLabel)
1223         found = true;
1224     }
1225     if (!found) {
1226       // if the hint did not work, do it the slow way
1227       itFound = std::find_if(
1228           trig_paths_.begin(),
1229           trig_paths_.end(),
1230           std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1231       if (itFound != trig_paths_.end())
1232         found = true;
1233     }
1234     if (found) {
1235       descriptions.reserve(itFound->size());
1236       for (size_t i = 0; i < itFound->size(); ++i) {
1237         descriptions.push_back(itFound->getWorker(i)->description());
1238       }
1239     }
1240   }
1241 
1242   void StreamSchedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1243                                                    std::vector<ModuleDescription const*>& descriptions,
1244                                                    unsigned int hint) const {
1245     descriptions.clear();
1246     bool found = false;
1247     TrigPaths::const_iterator itFound;
1248 
1249     if (hint < end_paths_.size()) {
1250       itFound = end_paths_.begin() + hint;
1251       if (itFound->name() == iEndPathLabel)
1252         found = true;
1253     }
1254     if (!found) {
1255       // if the hint did not work, do it the slow way
1256       itFound = std::find_if(
1257           end_paths_.begin(),
1258           end_paths_.end(),
1259           std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1260       if (itFound != end_paths_.end())
1261         found = true;
1262     }
1263     if (found) {
1264       descriptions.reserve(itFound->size());
1265       for (size_t i = 0; i < itFound->size(); ++i) {
1266         descriptions.push_back(itFound->getWorker(i)->description());
1267       }
1268     }
1269   }
1270 
1271   static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1272     sum.timesVisited += path.timesVisited(which);
1273     sum.timesPassed += path.timesPassed(which);
1274     sum.timesFailed += path.timesFailed(which);
1275     sum.timesExcept += path.timesExcept(which);
1276     sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1277     sum.bitPosition = path.bitPosition(which);
1278   }
1279 
1280   static void fillPathSummary(Path const& path, PathSummary& sum) {
1281     sum.name = path.name();
1282     sum.bitPosition = path.bitPosition();
1283     sum.timesRun += path.timesRun();
1284     sum.timesPassed += path.timesPassed();
1285     sum.timesFailed += path.timesFailed();
1286     sum.timesExcept += path.timesExcept();
1287 
1288     Path::size_type sz = path.size();
1289     if (sum.moduleInPathSummaries.empty()) {
1290       std::vector<ModuleInPathSummary> temp(sz);
1291       for (size_t i = 0; i != sz; ++i) {
1292         fillModuleInPathSummary(path, i, temp[i]);
1293       }
1294       sum.moduleInPathSummaries.swap(temp);
1295     } else {
1296       assert(sz == sum.moduleInPathSummaries.size());
1297       for (size_t i = 0; i != sz; ++i) {
1298         fillModuleInPathSummary(path, i, sum.moduleInPathSummaries[i]);
1299       }
1300     }
1301   }
1302 
1303   static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1304     sum.timesVisited += w.timesVisited();
1305     sum.timesRun += w.timesRun();
1306     sum.timesPassed += w.timesPassed();
1307     sum.timesFailed += w.timesFailed();
1308     sum.timesExcept += w.timesExcept();
1309     sum.moduleLabel = w.description()->moduleLabel();
1310   }
1311 
1312   static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1313 
1314   void StreamSchedule::getTriggerReport(TriggerReport& rep) const {
1315     rep.eventSummary.totalEvents += totalEvents();
1316     rep.eventSummary.totalEventsPassed += totalEventsPassed();
1317     rep.eventSummary.totalEventsFailed += totalEventsFailed();
1318 
1319     fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1320     fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1321     fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1322   }
1323 
1324   void StreamSchedule::clearCounters() {
1325     using std::placeholders::_1;
1326     total_events_ = total_passed_ = 0;
1327     for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1328     for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1329     for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1330   }
1331 
1332   void StreamSchedule::resetAll() { results_->reset(); }
1333 
1334   void StreamSchedule::addToAllWorkers(Worker* w) { workerManager_.addToAllWorkers(w); }
1335 
1336   void StreamSchedule::resetEarlyDelete() {
1337     //must be sure we have cleared the count first
1338     for (auto& count : earlyDeleteBranchToCount_) {
1339       count.count = 0;
1340     }
1341     //now reset based on how many helpers use that branch
1342     for (auto& index : earlyDeleteHelperToBranchIndicies_) {
1343       ++(earlyDeleteBranchToCount_[index].count);
1344     }
1345     for (auto& helper : earlyDeleteHelpers_) {
1346       helper.reset();
1347     }
1348   }
1349 
1350   void StreamSchedule::makePathStatusInserters(
1351       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1352       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1353       ExceptionToActionTable const& actions) {
1354     int bitpos = 0;
1355     unsigned int indexEmpty = 0;
1356     unsigned int indexOfPath = 0;
1357     for (auto& pathStatusInserter : pathStatusInserters) {
1358       std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1359       WorkerPtr workerPtr(
1360           new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1361       pathStatusInserterWorkers_.emplace_back(workerPtr);
1362       workerPtr->setActivityRegistry(actReg_);
1363       addToAllWorkers(workerPtr.get());
1364 
1365       // A little complexity here because a C++ Path object is not
1366       // instantiated and put into end_paths if there are no modules
1367       // on the configured path.
1368       if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1369         ++indexEmpty;
1370       } else {
1371         trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1372         ++indexOfPath;
1373       }
1374       ++bitpos;
1375     }
1376 
1377     bitpos = 0;
1378     indexEmpty = 0;
1379     indexOfPath = 0;
1380     for (auto& endPathStatusInserter : endPathStatusInserters) {
1381       std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1382       WorkerPtr workerPtr(
1383           new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1384       endPathStatusInserterWorkers_.emplace_back(workerPtr);
1385       workerPtr->setActivityRegistry(actReg_);
1386       addToAllWorkers(workerPtr.get());
1387 
1388       // A little complexity here because a C++ Path object is not
1389       // instantiated and put into end_paths if there are no modules
1390       // on the configured path.
1391       if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1392         ++indexEmpty;
1393       } else {
1394         end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1395         ++indexOfPath;
1396       }
1397       ++bitpos;
1398     }
1399   }
1400 }  // namespace edm