Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-04 04:34:55

0001 #include "FWCore/Framework/interface/StreamSchedule.h"
0002 
0003 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0004 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0005 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0007 #include "FWCore/Framework/src/OutputModuleDescription.h"
0008 #include "FWCore/Framework/interface/TriggerNamesService.h"
0009 #include "FWCore/Framework/src/TriggerReport.h"
0010 #include "FWCore/Framework/src/TriggerTimingReport.h"
0011 #include "FWCore/Framework/src/Factory.h"
0012 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0013 #include "FWCore/Framework/src/TriggerResultInserter.h"
0014 #include "FWCore/Framework/src/PathStatusInserter.h"
0015 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0016 #include "FWCore/Framework/interface/WorkerInPath.h"
0017 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0018 #include "FWCore/Framework/interface/maker/WorkerT.h"
0019 #include "FWCore/Framework/interface/ModuleRegistry.h"
0020 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 #include "FWCore/ParameterSet/interface/Registry.h"
0024 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0025 #include "FWCore/Utilities/interface/Algorithms.h"
0026 #include "FWCore/Utilities/interface/ConvertException.h"
0027 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0028 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0029 
0030 #include "LuminosityBlockProcessingStatus.h"
0031 #include "processEDAliases.h"
0032 
0033 #include <algorithm>
0034 #include <cassert>
0035 #include <cstdlib>
0036 #include <functional>
0037 #include <iomanip>
0038 #include <limits>
0039 #include <list>
0040 #include <map>
0041 #include <exception>
0042 #include <fmt/format.h>
0043 
0044 namespace edm {
0045 
0046   namespace {
0047 
0048     // Function template to transform each element in the input range to
0049     // a value placed into the output range. The supplied function
0050     // should take a const_reference to the 'input', and write to a
0051     // reference to the 'output'.
0052     template <typename InputIterator, typename ForwardIterator, typename Func>
0053     void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
0054       for (; begin != end; ++begin, ++out)
0055         func(*begin, *out);
0056     }
0057 
0058     // Function template that takes a sequence 'from', a sequence
0059     // 'to', and a callable object 'func'. It and applies
0060     // transform_into to fill the 'to' sequence with the values
0061     // calcuated by the callable object, taking care to fill the
0062     // outupt only if all calls succeed.
0063     template <typename FROM, typename TO, typename FUNC>
0064     void fill_summary(FROM const& from, TO& to, FUNC func) {
0065       if (to.size() != from.size()) {
0066         TO temp(from.size());
0067         transform_into(from.begin(), from.end(), temp.begin(), func);
0068         to.swap(temp);
0069       } else {
0070         transform_into(from.begin(), from.end(), to.begin(), func);
0071       }
0072     }
0073 
0074     // -----------------------------
0075 
0076     // Here we make the trigger results inserter directly.  This should
0077     // probably be a utility in the WorkerRegistry or elsewhere.
0078 
0079     StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
0080                                            std::shared_ptr<ActivityRegistry> areg,
0081                                            std::shared_ptr<TriggerResultInserter> inserter) {
0082       StreamSchedule::WorkerPtr ptr(
0083           new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
0084       ptr->setActivityRegistry(areg);
0085       return ptr;
0086     }
0087 
0088     void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
0089                                          ProductRegistry const& preg,
0090                                          std::multimap<std::string, Worker*>& branchToReadingWorker) {
0091       auto vBranchesToDeleteEarly = branchesToDeleteEarly;
0092       // Remove any duplicates
0093       std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
0094       vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
0095                                    vBranchesToDeleteEarly.end());
0096 
0097       // Are the requested items in the product registry?
0098       auto allBranchNames = preg.allBranchNames();
0099       //the branch names all end with a period, which we do not want to compare with
0100       for (auto& b : allBranchNames) {
0101         b.resize(b.size() - 1);
0102       }
0103       std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
0104       std::vector<std::string> temp;
0105       temp.reserve(vBranchesToDeleteEarly.size());
0106 
0107       std::set_intersection(vBranchesToDeleteEarly.begin(),
0108                             vBranchesToDeleteEarly.end(),
0109                             allBranchNames.begin(),
0110                             allBranchNames.end(),
0111                             std::back_inserter(temp));
0112       vBranchesToDeleteEarly.swap(temp);
0113       if (temp.size() != vBranchesToDeleteEarly.size()) {
0114         std::vector<std::string> missingProducts;
0115         std::set_difference(temp.begin(),
0116                             temp.end(),
0117                             vBranchesToDeleteEarly.begin(),
0118                             vBranchesToDeleteEarly.end(),
0119                             std::back_inserter(missingProducts));
0120         LogInfo l("MissingProductsForCanDeleteEarly");
0121         l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
0122         for (auto const& n : missingProducts) {
0123           l << "\n " << n;
0124         }
0125       }
0126       //set placeholder for the branch, we will remove the nullptr if a
0127       // module actually wants the branch.
0128       for (auto const& branch : vBranchesToDeleteEarly) {
0129         branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
0130       }
0131     }
0132 
0133     Worker* getWorker(std::string const& moduleLabel,
0134                       ParameterSet& proc_pset,
0135                       WorkerManager& workerManager,
0136                       ProductRegistry& preg,
0137                       PreallocationConfiguration const* prealloc,
0138                       std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0139       bool isTracked;
0140       ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0141       if (modpset == nullptr) {
0142         return nullptr;
0143       }
0144       assert(isTracked);
0145 
0146       return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
0147     }
0148 
0149     // If ConditionalTask modules exist in the container of module
0150     // names, returns the range (std::pair) for the modules. The range
0151     // excludes the special markers '#' (right before the
0152     // ConditionalTask modules) and '@' (last element).
0153     // If the module name container does not contain ConditionalTask
0154     // modules, returns std::pair of end iterators.
0155     template <typename T>
0156     auto findConditionalTaskModulesRange(T& modnames) {
0157       auto beg = std::find(modnames.begin(), modnames.end(), "#");
0158       if (beg == modnames.end()) {
0159         return std::pair(modnames.end(), modnames.end());
0160       }
0161       return std::pair(beg + 1, std::prev(modnames.end()));
0162     }
0163 
0164     std::optional<std::string> findBestMatchingAlias(
0165         std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0166         std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
0167         std::string const& productModuleLabel,
0168         ConsumesInfo const& consumesInfo) {
0169       std::optional<std::string> best;
0170       int wildcardsInBest = std::numeric_limits<int>::max();
0171       bool bestIsAmbiguous = false;
0172 
0173       auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
0174                             std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
0175         int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
0176         if (wildcards == 0) {
0177           bestIsAmbiguous = false;
0178           return true;
0179         }
0180         if (not best or wildcards < wildcardsInBest) {
0181           best = label;
0182           wildcardsInBest = wildcards;
0183           bestIsAmbiguous = false;
0184         } else if (best and *best != label and wildcardsInBest == wildcards) {
0185           bestIsAmbiguous = true;
0186         }
0187         return false;
0188       };
0189 
0190       auto findAlias = aliasMap.equal_range(productModuleLabel);
0191       for (auto it = findAlias.first; it != findAlias.second; ++it) {
0192         std::string const& aliasInstanceLabel =
0193             it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
0194         bool const instanceIsWildcard = (aliasInstanceLabel == "*");
0195         if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
0196           bool const typeIsWildcard = it->second.friendlyClassName == "*";
0197           if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
0198             if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0199               return it->second.originalModuleLabel;
0200             }
0201           } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
0202             //consume is a View so need to do more intrusive search
0203             //find matching branches in module
0204             auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
0205             for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
0206               if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
0207                 if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
0208                                                                    TypeID(itBranch->second->wrappedType().typeInfo()),
0209                                                                    itBranch->second->className())) {
0210                   if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0211                     return it->second.originalModuleLabel;
0212                   }
0213                 }
0214               }
0215             }
0216           }
0217         }
0218       }
0219       if (bestIsAmbiguous) {
0220         throw Exception(errors::UnimplementedFeature)
0221             << "Encountered ambiguity when trying to find a best-matching alias for\n"
0222             << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
0223             << " module label " << productModuleLabel << "\n"
0224             << " product instance name " << consumesInfo.instance() << "\n"
0225             << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
0226                "wildcards ("
0227             << wildcardsInBest << ")";
0228       }
0229       return best;
0230     }
0231   }  // namespace
0232 
0233   // -----------------------------
0234 
0235   typedef std::vector<std::string> vstring;
0236 
0237   // -----------------------------
0238 
0239   class ConditionalTaskHelper {
0240   public:
0241     using AliasInfo = StreamSchedule::AliasInfo;
0242 
0243     ConditionalTaskHelper(ParameterSet& proc_pset,
0244                           ProductRegistry& preg,
0245                           PreallocationConfiguration const* prealloc,
0246                           std::shared_ptr<ProcessConfiguration const> processConfiguration,
0247                           WorkerManager& workerManagerLumisAndEvents,
0248                           std::vector<std::string> const& trigPathNames) {
0249       std::unordered_set<std::string> allConditionalMods;
0250       for (auto const& pathName : trigPathNames) {
0251         auto const modnames = proc_pset.getParameter<vstring>(pathName);
0252 
0253         //Pull out ConditionalTask modules
0254         auto condRange = findConditionalTaskModulesRange(modnames);
0255         if (condRange.first == condRange.second)
0256           continue;
0257 
0258         //the last entry should be ignored since it is required to be "@"
0259         allConditionalMods.insert(condRange.first, condRange.second);
0260       }
0261 
0262       for (auto const& cond : allConditionalMods) {
0263         //force the creation of the conditional modules so alias check can work
0264         (void)getWorker(cond, proc_pset, workerManagerLumisAndEvents, preg, prealloc, processConfiguration);
0265       }
0266 
0267       fillAliasMap(proc_pset, allConditionalMods);
0268       processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
0269 
0270       //find branches created by the conditional modules
0271       for (auto const& prod : preg.productList()) {
0272         if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
0273           conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
0274         }
0275       }
0276     }
0277 
0278     std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
0279 
0280     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
0281         std::unordered_set<std::string> const& conditionalmods) const {
0282       std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
0283       for (auto const& mod : conditionalmods) {
0284         auto range = conditionalModsBranches_.equal_range(mod);
0285         ret.insert(range.first, range.second);
0286       }
0287       return ret;
0288     }
0289 
0290   private:
0291     void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
0292       auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0293       std::string const star("*");
0294       for (auto const& alias : aliases) {
0295         auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
0296         auto aliasedToModuleLabels = info.getParameterNames();
0297         for (auto const& mod : aliasedToModuleLabels) {
0298           if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
0299             auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
0300             for (auto const& aliasPSet : aliasVPSet) {
0301               std::string type = star;
0302               std::string instance = star;
0303               std::string originalInstance = star;
0304               if (aliasPSet.exists("type")) {
0305                 type = aliasPSet.getParameter<std::string>("type");
0306               }
0307               if (aliasPSet.exists("toProductInstance")) {
0308                 instance = aliasPSet.getParameter<std::string>("toProductInstance");
0309               }
0310               if (aliasPSet.exists("fromProductInstance")) {
0311                 originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
0312               }
0313 
0314               aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
0315             }
0316           }
0317         }
0318       }
0319     }
0320 
0321     void processSwitchEDAliases(ParameterSet const& proc_pset,
0322                                 ProductRegistry& preg,
0323                                 ProcessConfiguration const& processConfiguration,
0324                                 std::unordered_set<std::string> const& allConditionalMods) {
0325       auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
0326       std::vector<std::string> switchEDAliases;
0327       for (auto const& module : all_modules) {
0328         auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
0329         if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
0330           auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
0331           for (auto const& case_label : all_cases) {
0332             auto range = aliasMap_.equal_range(case_label);
0333             if (range.first != range.second) {
0334               switchEDAliases.push_back(case_label);
0335             }
0336           }
0337         }
0338       }
0339       detail::processEDAliases(
0340           switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
0341     }
0342 
0343     std::unordered_multimap<std::string, AliasInfo> aliasMap_;
0344     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
0345   };
0346 
0347   // -----------------------------
0348 
0349   StreamSchedule::StreamSchedule(
0350       std::shared_ptr<TriggerResultInserter> inserter,
0351       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0352       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0353       std::shared_ptr<ModuleRegistry> modReg,
0354       ParameterSet& proc_pset,
0355       service::TriggerNamesService const& tns,
0356       PreallocationConfiguration const& prealloc,
0357       ProductRegistry& preg,
0358       ExceptionToActionTable const& actions,
0359       std::shared_ptr<ActivityRegistry> areg,
0360       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0361       StreamID streamID,
0362       ProcessContext const* processContext)
0363       : workerManagerBeginEnd_(modReg, areg, actions),
0364         workerManagerRuns_(modReg, areg, actions),
0365         workerManagerLumisAndEvents_(modReg, areg, actions),
0366         actReg_(areg),
0367         results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
0368         results_inserter_(),
0369         trig_paths_(),
0370         end_paths_(),
0371         total_events_(),
0372         total_passed_(),
0373         number_of_unscheduled_modules_(0),
0374         streamID_(streamID),
0375         streamContext_(streamID_, processContext) {
0376     bool hasPath = false;
0377     std::vector<std::string> const& pathNames = tns.getTrigPaths();
0378     std::vector<std::string> const& endPathNames = tns.getEndPaths();
0379 
0380     ConditionalTaskHelper conditionalTaskHelper(
0381         proc_pset, preg, &prealloc, processConfiguration, workerManagerLumisAndEvents_, pathNames);
0382     std::unordered_set<std::string> conditionalModules;
0383 
0384     int trig_bitpos = 0;
0385     trig_paths_.reserve(pathNames.size());
0386     for (auto const& trig_name : pathNames) {
0387       fillTrigPath(proc_pset,
0388                    preg,
0389                    &prealloc,
0390                    processConfiguration,
0391                    trig_bitpos,
0392                    trig_name,
0393                    results(),
0394                    endPathNames,
0395                    conditionalTaskHelper,
0396                    conditionalModules);
0397       ++trig_bitpos;
0398       hasPath = true;
0399     }
0400 
0401     if (hasPath) {
0402       // the results inserter stands alone
0403       inserter->setTrigResultForStream(streamID.value(), results());
0404 
0405       results_inserter_ = makeInserter(actions, actReg_, inserter);
0406       addToAllWorkers(results_inserter_.get());
0407     }
0408 
0409     // fill normal endpaths
0410     int bitpos = 0;
0411     end_paths_.reserve(endPathNames.size());
0412     for (auto const& end_path_name : endPathNames) {
0413       fillEndPath(proc_pset,
0414                   preg,
0415                   &prealloc,
0416                   processConfiguration,
0417                   bitpos,
0418                   end_path_name,
0419                   endPathNames,
0420                   conditionalTaskHelper,
0421                   conditionalModules);
0422       ++bitpos;
0423     }
0424 
0425     makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
0426 
0427     //See if all modules were used
0428     std::set<std::string> usedWorkerLabels;
0429     for (auto const& worker : allWorkersLumisAndEvents()) {
0430       usedWorkerLabels.insert(worker->description()->moduleLabel());
0431     }
0432     std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0433     std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0434     std::vector<std::string> unusedLabels;
0435     set_difference(modulesInConfigSet.begin(),
0436                    modulesInConfigSet.end(),
0437                    usedWorkerLabels.begin(),
0438                    usedWorkerLabels.end(),
0439                    back_inserter(unusedLabels));
0440     std::set<std::string> unscheduledLabels;
0441     std::vector<std::string> shouldBeUsedLabels;
0442     if (!unusedLabels.empty()) {
0443       //Need to
0444       // 1) create worker
0445       // 2) if it is a WorkerT<EDProducer>, add it to our list
0446       // 3) hand list to our delayed reader
0447       for (auto const& label : unusedLabels) {
0448         bool isTracked;
0449         ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
0450         assert(isTracked);
0451         assert(modulePSet != nullptr);
0452         workerManagerLumisAndEvents_.addToUnscheduledWorkers(
0453             *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0454       }
0455       if (!shouldBeUsedLabels.empty()) {
0456         std::ostringstream unusedStream;
0457         unusedStream << "'" << shouldBeUsedLabels.front() << "'";
0458         for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
0459                                                 itLabelEnd = shouldBeUsedLabels.end();
0460              itLabel != itLabelEnd;
0461              ++itLabel) {
0462           unusedStream << ",'" << *itLabel << "'";
0463         }
0464         LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
0465       }
0466     }
0467     number_of_unscheduled_modules_ = unscheduledLabels.size();
0468 
0469     // Print conditional modules that were not consumed in any of their associated Paths
0470     if (streamID.value() == 0 and not conditionalModules.empty()) {
0471       // Intersection of unscheduled and ConditionalTask modules gives
0472       // directly the set of conditional modules that were not
0473       // consumed by anything in the Paths associated to the
0474       // corresponding ConditionalTask.
0475       std::vector<std::string_view> labelsToPrint;
0476       std::copy_if(
0477           unscheduledLabels.begin(),
0478           unscheduledLabels.end(),
0479           std::back_inserter(labelsToPrint),
0480           [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
0481 
0482       if (not labelsToPrint.empty()) {
0483         edm::LogWarning log("NonConsumedConditionalModules");
0484         log << "The following modules were part of some ConditionalTask, but were not\n"
0485             << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
0486             << "was associated. Perhaps they should be either removed from the\n"
0487             << "job, or moved to a Task to make it explicit they are unscheduled.\n";
0488         for (auto const& modLabel : labelsToPrint) {
0489           log.format("\n {}", modLabel);
0490         }
0491       }
0492     }
0493 
0494     for (auto const& worker : allWorkersLumisAndEvents()) {
0495       std::string const& moduleLabel = worker->description()->moduleLabel();
0496 
0497       // The new worker pointers will be null for the TriggerResultsInserter, PathStatusInserter, and
0498       // EndPathStatusInserter because there are no ParameterSets for those in the configuration.
0499       // We could add special code to create workers for those, but instead we skip them because they
0500       // do not have beginStream, endStream, or run/lumi begin/end stream transition functions.
0501 
0502       Worker* workerBeginEnd =
0503           getWorker(moduleLabel, proc_pset, workerManagerBeginEnd_, preg, &prealloc, processConfiguration);
0504       if (workerBeginEnd) {
0505         workerManagerBeginEnd_.addToAllWorkers(workerBeginEnd);
0506       }
0507 
0508       Worker* workerRuns = getWorker(moduleLabel, proc_pset, workerManagerRuns_, preg, &prealloc, processConfiguration);
0509       if (workerRuns) {
0510         workerManagerRuns_.addToAllWorkers(workerRuns);
0511       }
0512     }
0513 
0514   }  // StreamSchedule::StreamSchedule
0515 
0516   void StreamSchedule::initializeEarlyDelete(ModuleRegistry& modReg,
0517                                              std::vector<std::string> const& branchesToDeleteEarly,
0518                                              std::multimap<std::string, std::string> const& referencesToBranches,
0519                                              std::vector<std::string> const& modulesToSkip,
0520                                              edm::ProductRegistry const& preg) {
0521     // setup the list with those products actually registered for this job
0522     std::multimap<std::string, Worker*> branchToReadingWorker;
0523     initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
0524 
0525     const std::vector<std::string> kEmpty;
0526     std::map<Worker*, unsigned int> reserveSizeForWorker;
0527     unsigned int upperLimitOnReadingWorker = 0;
0528     unsigned int upperLimitOnIndicies = 0;
0529     unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
0530 
0531     //talk with output modules first
0532     modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
0533       auto comm = iHolder->createOutputModuleCommunicator();
0534       if (comm) {
0535         if (!branchToReadingWorker.empty()) {
0536           //If an OutputModule needs a product, we can't delete it early
0537           // so we should remove it from our list
0538           SelectedProductsForBranchType const& kept = comm->keptProducts();
0539           for (auto const& item : kept[InEvent]) {
0540             BranchDescription const& desc = *item.first;
0541             auto found = branchToReadingWorker.equal_range(desc.branchName());
0542             if (found.first != found.second) {
0543               --nUniqueBranchesToDelete;
0544               branchToReadingWorker.erase(found.first, found.second);
0545             }
0546           }
0547         }
0548       }
0549     });
0550 
0551     if (branchToReadingWorker.empty()) {
0552       return;
0553     }
0554 
0555     std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
0556     for (auto w : allWorkersLumisAndEvents()) {
0557       if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
0558         continue;
0559       }
0560       //determine if this module could read a branch we want to delete early
0561       auto consumes = w->consumesInfo();
0562       if (not consumes.empty()) {
0563         bool foundAtLeastOneMatchingBranch = false;
0564         for (auto const& product : consumes) {
0565           std::string branch = fmt::format("{}_{}_{}_{}",
0566                                            product.type().friendlyClassName(),
0567                                            product.label().data(),
0568                                            product.instance().data(),
0569                                            product.process().data());
0570           {
0571             //Handle case where worker directly consumes product
0572             auto found = branchToReadingWorker.end();
0573             if (product.process().empty()) {
0574               auto startFound = branchToReadingWorker.lower_bound(branch);
0575               if (startFound != branchToReadingWorker.end()) {
0576                 if (startFound->first.substr(0, branch.size()) == branch) {
0577                   //match all processNames here, even if it means multiple matches will happen
0578                   found = startFound;
0579                 }
0580               }
0581             } else {
0582               auto exactFound = branchToReadingWorker.equal_range(branch);
0583               if (exactFound.first != exactFound.second) {
0584                 found = exactFound.first;
0585               }
0586             }
0587             if (found != branchToReadingWorker.end()) {
0588               if (not foundAtLeastOneMatchingBranch) {
0589                 ++upperLimitOnReadingWorker;
0590                 foundAtLeastOneMatchingBranch = true;
0591               }
0592               ++upperLimitOnIndicies;
0593               ++reserveSizeForWorker[w];
0594               if (nullptr == found->second) {
0595                 found->second = w;
0596               } else {
0597                 branchToReadingWorker.insert(make_pair(found->first, w));
0598               }
0599             }
0600           }
0601           {
0602             //Handle case where indirectly consumes product
0603             auto found = referencesToBranches.end();
0604             if (product.process().empty()) {
0605               auto startFound = referencesToBranches.lower_bound(branch);
0606               if (startFound != referencesToBranches.end()) {
0607                 if (startFound->first.substr(0, branch.size()) == branch) {
0608                   //match all processNames here, even if it means multiple matches will happen
0609                   found = startFound;
0610                 }
0611               }
0612             } else {
0613               //can match exactly
0614               auto exactFound = referencesToBranches.equal_range(branch);
0615               if (exactFound.first != exactFound.second) {
0616                 found = exactFound.first;
0617               }
0618             }
0619             if (found != referencesToBranches.end()) {
0620               for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
0621                 auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
0622                 if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
0623                   continue;
0624                 }
0625                 if (not foundAtLeastOneMatchingBranch) {
0626                   ++upperLimitOnReadingWorker;
0627                   foundAtLeastOneMatchingBranch = true;
0628                 }
0629                 ++upperLimitOnIndicies;
0630                 ++reserveSizeForWorker[w];
0631                 if (nullptr == foundInBranchToReadingWorker->second) {
0632                   foundInBranchToReadingWorker->second = w;
0633                 } else {
0634                   branchToReadingWorker.insert(make_pair(itr->second, w));
0635                 }
0636               }
0637             }
0638           }
0639         }
0640       }
0641     }
0642     {
0643       auto it = branchToReadingWorker.begin();
0644       std::vector<std::string> unusedBranches;
0645       while (it != branchToReadingWorker.end()) {
0646         if (it->second == nullptr) {
0647           unusedBranches.push_back(it->first);
0648           //erasing the object invalidates the iterator so must advance it first
0649           auto temp = it;
0650           ++it;
0651           branchToReadingWorker.erase(temp);
0652         } else {
0653           ++it;
0654         }
0655       }
0656       if (not unusedBranches.empty()) {
0657         LogWarning l("UnusedProductsForCanDeleteEarly");
0658         l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
0659              " If possible, remove the producer from the job.";
0660         for (auto const& n : unusedBranches) {
0661           l << "\n " << n;
0662         }
0663       }
0664     }
0665     if (!branchToReadingWorker.empty()) {
0666       earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
0667       earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
0668       earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
0669       std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
0670       std::string lastBranchName;
0671       size_t nextOpenIndex = 0;
0672       unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
0673       for (auto& branchAndWorker : branchToReadingWorker) {
0674         if (lastBranchName != branchAndWorker.first) {
0675           //have to put back the period we removed earlier in order to get the proper name
0676           BranchID bid(branchAndWorker.first + ".");
0677           earlyDeleteBranchToCount_.emplace_back(bid, 0U);
0678           lastBranchName = branchAndWorker.first;
0679         }
0680         auto found = alreadySeenWorkers.find(branchAndWorker.second);
0681         if (alreadySeenWorkers.end() == found) {
0682           //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
0683           // all the branches that might be read by this worker. However, initially we will only tell the
0684           // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
0685           // EarlyDeleteHelper will automatically advance its internal end pointer.
0686           size_t index = nextOpenIndex;
0687           size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
0688           assert(index < earlyDeleteHelperToBranchIndicies_.size());
0689           earlyDeleteHelperToBranchIndicies_[index] = earlyDeleteBranchToCount_.size() - 1;
0690           earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
0691           branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
0692           alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
0693           nextOpenIndex += nIndices;
0694         } else {
0695           found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
0696         }
0697       }
0698 
0699       //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
0700       // space needed for each module
0701       auto itLast = earlyDeleteHelpers_.begin();
0702       for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
0703         if (itLast->end() != it->begin()) {
0704           //figure the offset for next Worker since it hasn't been moved yet so it has the original address
0705           unsigned int delta = it->begin() - itLast->end();
0706           it->shiftIndexPointers(delta);
0707 
0708           earlyDeleteHelperToBranchIndicies_.erase(
0709               earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0710               earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
0711         }
0712         itLast = it;
0713       }
0714       earlyDeleteHelperToBranchIndicies_.erase(
0715           earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0716           earlyDeleteHelperToBranchIndicies_.end());
0717 
0718       //now tell the paths about the deleters
0719       for (auto& p : trig_paths_) {
0720         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0721       }
0722       for (auto& p : end_paths_) {
0723         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0724       }
0725       resetEarlyDelete();
0726     }
0727   }
0728 
0729   std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
0730       Worker* worker,
0731       std::unordered_set<std::string>& conditionalModules,
0732       std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0733       std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0734       ParameterSet& proc_pset,
0735       ProductRegistry& preg,
0736       PreallocationConfiguration const* prealloc,
0737       std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0738     std::vector<Worker*> returnValue;
0739     auto const& consumesInfo = worker->consumesInfo();
0740     auto moduleLabel = worker->description()->moduleLabel();
0741     using namespace productholderindexhelper;
0742     for (auto const& ci : consumesInfo) {
0743       if (not ci.skipCurrentProcess() and
0744           (ci.process().empty() or ci.process() == processConfiguration->processName())) {
0745         auto productModuleLabel = std::string(ci.label());
0746         bool productFromConditionalModule = false;
0747         auto itFound = conditionalModules.find(productModuleLabel);
0748         if (itFound == conditionalModules.end()) {
0749           //Check to see if this was an alias
0750           //note that aliasMap was previously filtered so only the conditional modules remain there
0751           auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
0752           if (foundAlias) {
0753             productModuleLabel = *foundAlias;
0754             productFromConditionalModule = true;
0755             itFound = conditionalModules.find(productModuleLabel);
0756             //check that the alias-for conditional module has not been used
0757             if (itFound == conditionalModules.end()) {
0758               continue;
0759             }
0760           }
0761         } else {
0762           //need to check the rest of the data product info
0763           auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
0764           for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
0765             if (itBranch->second->productInstanceName() == ci.instance()) {
0766               if (ci.kindOfType() == PRODUCT_TYPE) {
0767                 if (ci.type() == itBranch->second->unwrappedTypeID()) {
0768                   productFromConditionalModule = true;
0769                   break;
0770                 }
0771               } else {
0772                 //this is a view
0773                 if (typeIsViewCompatible(
0774                         ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
0775                   productFromConditionalModule = true;
0776                   break;
0777                 }
0778               }
0779             }
0780           }
0781         }
0782         if (productFromConditionalModule) {
0783           auto condWorker = getWorker(
0784               productModuleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0785           assert(condWorker);
0786 
0787           conditionalModules.erase(itFound);
0788 
0789           auto dependents = tryToPlaceConditionalModules(condWorker,
0790                                                          conditionalModules,
0791                                                          conditionalModuleBranches,
0792                                                          aliasMap,
0793                                                          proc_pset,
0794                                                          preg,
0795                                                          prealloc,
0796                                                          processConfiguration);
0797           returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0798           returnValue.push_back(condWorker);
0799         }
0800       }
0801     }
0802     return returnValue;
0803   }
0804 
0805   void StreamSchedule::fillWorkers(ParameterSet& proc_pset,
0806                                    ProductRegistry& preg,
0807                                    PreallocationConfiguration const* prealloc,
0808                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0809                                    std::string const& pathName,
0810                                    bool ignoreFilters,
0811                                    PathWorkers& out,
0812                                    std::vector<std::string> const& endPathNames,
0813                                    ConditionalTaskHelper const& conditionalTaskHelper,
0814                                    std::unordered_set<std::string>& allConditionalModules) {
0815     vstring modnames = proc_pset.getParameter<vstring>(pathName);
0816     PathWorkers tmpworkers;
0817 
0818     //Pull out ConditionalTask modules
0819     auto condRange = findConditionalTaskModulesRange(modnames);
0820 
0821     std::unordered_set<std::string> conditionalmods;
0822     //An EDAlias may be redirecting to a module on a ConditionalTask
0823     std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
0824     std::unordered_map<std::string, unsigned int> conditionalModOrder;
0825     if (condRange.first != condRange.second) {
0826       for (auto it = condRange.first; it != condRange.second; ++it) {
0827         // ordering needs to skip the # token in the path list
0828         conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
0829       }
0830       //the last entry should be ignored since it is required to be "@"
0831       conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
0832                                                         std::make_move_iterator(condRange.second));
0833 
0834       conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
0835       modnames.erase(std::prev(condRange.first), modnames.end());
0836 
0837       // Make a union of all conditional modules from all Paths
0838       allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
0839     }
0840 
0841     unsigned int placeInPath = 0;
0842     for (auto const& name : modnames) {
0843       //Modules except EDFilters are set to run concurrently by default
0844       bool doNotRunConcurrently = false;
0845       WorkerInPath::FilterAction filterAction = WorkerInPath::Normal;
0846       if (name[0] == '!') {
0847         filterAction = WorkerInPath::Veto;
0848       } else if (name[0] == '-' or name[0] == '+') {
0849         filterAction = WorkerInPath::Ignore;
0850       }
0851       if (name[0] == '|' or name[0] == '+') {
0852         //cms.wait was specified so do not run concurrently
0853         doNotRunConcurrently = true;
0854       }
0855 
0856       std::string moduleLabel = name;
0857       if (filterAction != WorkerInPath::Normal or name[0] == '|') {
0858         moduleLabel.erase(0, 1);
0859       }
0860 
0861       Worker* worker =
0862           getWorker(moduleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0863       if (worker == nullptr) {
0864         std::string pathType("endpath");
0865         if (!search_all(endPathNames, pathName)) {
0866           pathType = std::string("path");
0867         }
0868         throw Exception(errors::Configuration)
0869             << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
0870             << "\"\n please check spelling or remove that label from the path.";
0871       }
0872 
0873       if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
0874         // We have a filter on an end path, and the filter is not explicitly ignored.
0875         // See if the filter is allowed.
0876         std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
0877         if (!search_all(allowed_filters, worker->description()->moduleName())) {
0878           // Filter is not allowed. Ignore the result, and issue a warning.
0879           filterAction = WorkerInPath::Ignore;
0880           LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
0881                                         << "' with module label '" << moduleLabel << "' appears on EndPath '"
0882                                         << pathName << "'.\n"
0883                                         << "The return value of the filter will be ignored.\n"
0884                                         << "To suppress this warning, either remove the filter from the endpath,\n"
0885                                         << "or explicitly ignore it in the configuration by using cms.ignore().\n";
0886         }
0887       }
0888       bool runConcurrently = not doNotRunConcurrently;
0889       if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
0890         runConcurrently = false;
0891       }
0892 
0893       auto condModules = tryToPlaceConditionalModules(worker,
0894                                                       conditionalmods,
0895                                                       conditionalModsBranches,
0896                                                       conditionalTaskHelper.aliasMap(),
0897                                                       proc_pset,
0898                                                       preg,
0899                                                       prealloc,
0900                                                       processConfiguration);
0901       for (auto condMod : condModules) {
0902         tmpworkers.emplace_back(
0903             condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
0904       }
0905 
0906       tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
0907       ++placeInPath;
0908     }
0909 
0910     out.swap(tmpworkers);
0911   }
0912 
0913   void StreamSchedule::fillTrigPath(ParameterSet& proc_pset,
0914                                     ProductRegistry& preg,
0915                                     PreallocationConfiguration const* prealloc,
0916                                     std::shared_ptr<ProcessConfiguration const> processConfiguration,
0917                                     int bitpos,
0918                                     std::string const& name,
0919                                     TrigResPtr trptr,
0920                                     std::vector<std::string> const& endPathNames,
0921                                     ConditionalTaskHelper const& conditionalTaskHelper,
0922                                     std::unordered_set<std::string>& allConditionalModules) {
0923     PathWorkers tmpworkers;
0924     fillWorkers(proc_pset,
0925                 preg,
0926                 prealloc,
0927                 processConfiguration,
0928                 name,
0929                 false,
0930                 tmpworkers,
0931                 endPathNames,
0932                 conditionalTaskHelper,
0933                 allConditionalModules);
0934 
0935     // an empty path will cause an extra bit that is not used
0936     if (!tmpworkers.empty()) {
0937       trig_paths_.emplace_back(
0938           bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
0939     } else {
0940       empty_trig_paths_.push_back(bitpos);
0941     }
0942     for (WorkerInPath const& workerInPath : tmpworkers) {
0943       addToAllWorkers(workerInPath.getWorker());
0944     }
0945   }
0946 
0947   void StreamSchedule::fillEndPath(ParameterSet& proc_pset,
0948                                    ProductRegistry& preg,
0949                                    PreallocationConfiguration const* prealloc,
0950                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0951                                    int bitpos,
0952                                    std::string const& name,
0953                                    std::vector<std::string> const& endPathNames,
0954                                    ConditionalTaskHelper const& conditionalTaskHelper,
0955                                    std::unordered_set<std::string>& allConditionalModules) {
0956     PathWorkers tmpworkers;
0957     fillWorkers(proc_pset,
0958                 preg,
0959                 prealloc,
0960                 processConfiguration,
0961                 name,
0962                 true,
0963                 tmpworkers,
0964                 endPathNames,
0965                 conditionalTaskHelper,
0966                 allConditionalModules);
0967 
0968     if (!tmpworkers.empty()) {
0969       end_paths_.emplace_back(bitpos,
0970                               name,
0971                               tmpworkers,
0972                               TrigResPtr(),
0973                               actionTable(),
0974                               actReg_,
0975                               &streamContext_,
0976                               PathContext::PathType::kEndPath);
0977     } else {
0978       empty_end_paths_.push_back(bitpos);
0979     }
0980     for (WorkerInPath const& workerInPath : tmpworkers) {
0981       addToAllWorkers(workerInPath.getWorker());
0982     }
0983   }
0984 
0985   void StreamSchedule::beginStream() { workerManagerBeginEnd_.beginStream(streamID_, streamContext_); }
0986 
0987   void StreamSchedule::endStream() { workerManagerBeginEnd_.endStream(streamID_, streamContext_); }
0988 
0989   void StreamSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
0990     for (auto const& worker : allWorkersBeginEnd()) {
0991       if (worker->description()->moduleLabel() == iLabel) {
0992         iMod->replaceModuleFor(worker);
0993         worker->beginStream(streamID_, streamContext_);
0994         break;
0995       }
0996     }
0997 
0998     for (auto const& worker : allWorkersRuns()) {
0999       if (worker->description()->moduleLabel() == iLabel) {
1000         iMod->replaceModuleFor(worker);
1001         break;
1002       }
1003     }
1004 
1005     for (auto const& worker : allWorkersLumisAndEvents()) {
1006       if (worker->description()->moduleLabel() == iLabel) {
1007         iMod->replaceModuleFor(worker);
1008         break;
1009       }
1010     }
1011   }
1012 
1013   void StreamSchedule::deleteModule(std::string const& iLabel) {
1014     workerManagerBeginEnd_.deleteModuleIfExists(iLabel);
1015     workerManagerRuns_.deleteModuleIfExists(iLabel);
1016     workerManagerLumisAndEvents_.deleteModuleIfExists(iLabel);
1017   }
1018 
1019   std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
1020     std::vector<ModuleDescription const*> result;
1021     result.reserve(allWorkersLumisAndEvents().size());
1022 
1023     for (auto const& worker : allWorkersLumisAndEvents()) {
1024       ModuleDescription const* p = worker->description();
1025       result.push_back(p);
1026     }
1027     return result;
1028   }
1029 
1030   void StreamSchedule::processOneEventAsync(
1031       WaitingTaskHolder iTask,
1032       EventTransitionInfo& info,
1033       ServiceToken const& serviceToken,
1034       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
1035     EventPrincipal& ep = info.principal();
1036 
1037     // Caught exception is propagated via WaitingTaskHolder
1038     CMS_SA_ALLOW try {
1039       this->resetAll();
1040 
1041       using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1042 
1043       Traits::setStreamContext(streamContext_, ep);
1044       //a service may want to communicate with another service
1045       ServiceRegistry::Operate guard(serviceToken);
1046       Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1047 
1048       // Data dependencies need to be set up before marking empty
1049       // (End)Paths complete in case something consumes the status of
1050       // the empty (EndPath)
1051       workerManagerLumisAndEvents_.setupResolvers(ep);
1052       workerManagerLumisAndEvents_.setupOnDemandSystem(info);
1053 
1054       HLTPathStatus hltPathStatus(hlt::Pass, 0);
1055       for (int empty_trig_path : empty_trig_paths_) {
1056         results_->at(empty_trig_path) = hltPathStatus;
1057         pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1058         std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1059                                         ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1060                                             info, streamID_, ParentContext(&streamContext_), &streamContext_);
1061         if (except) {
1062           iTask.doneWaiting(except);
1063           return;
1064         }
1065       }
1066       if (not endPathStatusInserterWorkers_.empty()) {
1067         for (int empty_end_path : empty_end_paths_) {
1068           std::exception_ptr except =
1069               endPathStatusInserterWorkers_[empty_end_path]
1070                   ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1071                       info, streamID_, ParentContext(&streamContext_), &streamContext_);
1072           if (except) {
1073             iTask.doneWaiting(except);
1074             return;
1075           }
1076         }
1077       }
1078 
1079       ++total_events_;
1080 
1081       //use to give priorities on an error to ones from Paths
1082       auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1083       auto pathErrorPtr = pathErrorHolder.get();
1084       ServiceWeakToken weakToken = serviceToken;
1085       auto allPathsDone = make_waiting_task(
1086           [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1087             ServiceRegistry::Operate operate(weakToken.lock());
1088 
1089             std::exception_ptr ptr;
1090             if (pathError->load()) {
1091               ptr = *pathError->load();
1092               delete pathError->load();
1093             }
1094             if ((not ptr) and iPtr) {
1095               ptr = *iPtr;
1096             }
1097             iTask.doneWaiting(finishProcessOneEvent(ptr));
1098           });
1099       //The holder guarantees that if the paths finish before the loop ends
1100       // that we do not start too soon. It also guarantees that the task will
1101       // run under that condition.
1102       WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1103 
1104       auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1105                                              std::exception_ptr const* iPtr) mutable {
1106         ServiceRegistry::Operate operate(weakToken.lock());
1107 
1108         if (iPtr) {
1109           //this is used to prioritize this error over one
1110           // that happens in EndPath or Accumulate
1111           pathErrorPtr->store(new std::exception_ptr(*iPtr));
1112         }
1113         finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1114       });
1115 
1116       //The holder guarantees that if the paths finish before the loop ends
1117       // that we do not start too soon. It also guarantees that the task will
1118       // run under that condition.
1119       WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1120 
1121       //start end paths first so on single threaded the paths will run first
1122       WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1123       for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1124         it->processEventUsingPathAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1125       }
1126 
1127       for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1128         it->processEventUsingPathAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1129       }
1130 
1131       ParentContext parentContext(&streamContext_);
1132       workerManagerLumisAndEvents_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1133           hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1134     } catch (...) {
1135       iTask.doneWaiting(std::current_exception());
1136     }
1137   }
1138 
1139   void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1140                                      WaitingTaskHolder iWait,
1141                                      EventTransitionInfo& info) {
1142     if (iExcept) {
1143       // Caught exception is propagated via WaitingTaskHolder
1144       CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1145         exception_actions::ActionCodes action = actionTable().find(e.category());
1146         assert(action != exception_actions::IgnoreCompletely);
1147         if (action == exception_actions::TryToContinue) {
1148           edm::printCmsExceptionWarning("TryToContinue", e);
1149           *(iExcept.load()) = std::exception_ptr();
1150         } else {
1151           *(iExcept.load()) = std::current_exception();
1152         }
1153       } catch (...) {
1154         *(iExcept.load()) = std::current_exception();
1155       }
1156     }
1157 
1158     if ((not iExcept) and results_->accept()) {
1159       ++total_passed_;
1160     }
1161 
1162     if (nullptr != results_inserter_.get()) {
1163       // Caught exception is propagated to the caller
1164       CMS_SA_ALLOW try {
1165         //Even if there was an exception, we need to allow results inserter
1166         // to run since some module may be waiting on its results.
1167         ParentContext parentContext(&streamContext_);
1168         using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1169 
1170         auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1171         if (expt) {
1172           std::rethrow_exception(expt);
1173         }
1174       } catch (cms::Exception& ex) {
1175         if (not iExcept) {
1176           if (ex.context().empty()) {
1177             std::ostringstream ost;
1178             ost << "Processing Event " << info.principal().id();
1179             ex.addContext(ost.str());
1180           }
1181           iExcept.store(new std::exception_ptr(std::current_exception()));
1182         }
1183       } catch (...) {
1184         if (not iExcept) {
1185           iExcept.store(new std::exception_ptr(std::current_exception()));
1186         }
1187       }
1188     }
1189     std::exception_ptr ptr;
1190     if (iExcept) {
1191       ptr = *iExcept.load();
1192     }
1193     iWait.doneWaiting(ptr);
1194   }
1195 
1196   std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1197     using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1198 
1199     if (iExcept) {
1200       //add context information to the exception and print message
1201       try {
1202         convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1203       } catch (cms::Exception& ex) {
1204         bool const cleaningUpAfterException = false;
1205         if (ex.context().empty()) {
1206           addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1207         } else {
1208           addContextAndPrintException("", ex, cleaningUpAfterException);
1209         }
1210         iExcept = std::current_exception();
1211       }
1212 
1213       actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1214     }
1215     // Caught exception is propagated to the caller
1216     CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1217       if (not iExcept) {
1218         iExcept = std::current_exception();
1219       }
1220     }
1221     if (not iExcept) {
1222       resetEarlyDelete();
1223     }
1224 
1225     return iExcept;
1226   }
1227 
1228   void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1229     oLabelsToFill.reserve(trig_paths_.size());
1230     std::transform(trig_paths_.begin(),
1231                    trig_paths_.end(),
1232                    std::back_inserter(oLabelsToFill),
1233                    std::bind(&Path::name, std::placeholders::_1));
1234   }
1235 
1236   void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1237     TrigPaths::const_iterator itFound = std::find_if(
1238         trig_paths_.begin(),
1239         trig_paths_.end(),
1240         std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1241     if (itFound != trig_paths_.end()) {
1242       oLabelsToFill.reserve(itFound->size());
1243       for (size_t i = 0; i < itFound->size(); ++i) {
1244         oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1245       }
1246     }
1247   }
1248 
1249   void StreamSchedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1250                                                 std::vector<ModuleDescription const*>& descriptions,
1251                                                 unsigned int hint) const {
1252     descriptions.clear();
1253     bool found = false;
1254     TrigPaths::const_iterator itFound;
1255 
1256     if (hint < trig_paths_.size()) {
1257       itFound = trig_paths_.begin() + hint;
1258       if (itFound->name() == iPathLabel)
1259         found = true;
1260     }
1261     if (!found) {
1262       // if the hint did not work, do it the slow way
1263       itFound = std::find_if(
1264           trig_paths_.begin(),
1265           trig_paths_.end(),
1266           std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1267       if (itFound != trig_paths_.end())
1268         found = true;
1269     }
1270     if (found) {
1271       descriptions.reserve(itFound->size());
1272       for (size_t i = 0; i < itFound->size(); ++i) {
1273         descriptions.push_back(itFound->getWorker(i)->description());
1274       }
1275     }
1276   }
1277 
1278   void StreamSchedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1279                                                    std::vector<ModuleDescription const*>& descriptions,
1280                                                    unsigned int hint) const {
1281     descriptions.clear();
1282     bool found = false;
1283     TrigPaths::const_iterator itFound;
1284 
1285     if (hint < end_paths_.size()) {
1286       itFound = end_paths_.begin() + hint;
1287       if (itFound->name() == iEndPathLabel)
1288         found = true;
1289     }
1290     if (!found) {
1291       // if the hint did not work, do it the slow way
1292       itFound = std::find_if(
1293           end_paths_.begin(),
1294           end_paths_.end(),
1295           std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1296       if (itFound != end_paths_.end())
1297         found = true;
1298     }
1299     if (found) {
1300       descriptions.reserve(itFound->size());
1301       for (size_t i = 0; i < itFound->size(); ++i) {
1302         descriptions.push_back(itFound->getWorker(i)->description());
1303       }
1304     }
1305   }
1306 
1307   static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1308     sum.timesVisited += path.timesVisited(which);
1309     sum.timesPassed += path.timesPassed(which);
1310     sum.timesFailed += path.timesFailed(which);
1311     sum.timesExcept += path.timesExcept(which);
1312     sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1313     sum.bitPosition = path.bitPosition(which);
1314   }
1315 
1316   static void fillPathSummary(Path const& path, PathSummary& sum) {
1317     sum.name = path.name();
1318     sum.bitPosition = path.bitPosition();
1319     sum.timesRun += path.timesRun();
1320     sum.timesPassed += path.timesPassed();
1321     sum.timesFailed += path.timesFailed();
1322     sum.timesExcept += path.timesExcept();
1323 
1324     Path::size_type sz = path.size();
1325     if (sum.moduleInPathSummaries.empty()) {
1326       std::vector<ModuleInPathSummary> temp(sz);
1327       for (size_t i = 0; i != sz; ++i) {
1328         fillModuleInPathSummary(path, i, temp[i]);
1329       }
1330       sum.moduleInPathSummaries.swap(temp);
1331     } else {
1332       assert(sz == sum.moduleInPathSummaries.size());
1333       for (size_t i = 0; i != sz; ++i) {
1334         fillModuleInPathSummary(path, i, sum.moduleInPathSummaries[i]);
1335       }
1336     }
1337   }
1338 
1339   static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1340     sum.timesVisited += w.timesVisited();
1341     sum.timesRun += w.timesRun();
1342     sum.timesPassed += w.timesPassed();
1343     sum.timesFailed += w.timesFailed();
1344     sum.timesExcept += w.timesExcept();
1345     sum.moduleLabel = w.description()->moduleLabel();
1346   }
1347 
1348   static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1349 
1350   void StreamSchedule::getTriggerReport(TriggerReport& rep) const {
1351     rep.eventSummary.totalEvents += totalEvents();
1352     rep.eventSummary.totalEventsPassed += totalEventsPassed();
1353     rep.eventSummary.totalEventsFailed += totalEventsFailed();
1354 
1355     fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1356     fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1357     fill_summary(allWorkersLumisAndEvents(), rep.workerSummaries, &fillWorkerSummary);
1358   }
1359 
1360   void StreamSchedule::clearCounters() {
1361     using std::placeholders::_1;
1362     total_events_ = total_passed_ = 0;
1363     for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1364     for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1365     for_all(allWorkersLumisAndEvents(), std::bind(&Worker::clearCounters, _1));
1366   }
1367 
1368   void StreamSchedule::resetAll() { results_->reset(); }
1369 
1370   void StreamSchedule::addToAllWorkers(Worker* w) { workerManagerLumisAndEvents_.addToAllWorkers(w); }
1371 
1372   void StreamSchedule::resetEarlyDelete() {
1373     //must be sure we have cleared the count first
1374     for (auto& count : earlyDeleteBranchToCount_) {
1375       count.count = 0;
1376     }
1377     //now reset based on how many helpers use that branch
1378     for (auto& index : earlyDeleteHelperToBranchIndicies_) {
1379       ++(earlyDeleteBranchToCount_[index].count);
1380     }
1381     for (auto& helper : earlyDeleteHelpers_) {
1382       helper.reset();
1383     }
1384   }
1385 
1386   void StreamSchedule::makePathStatusInserters(
1387       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1388       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1389       ExceptionToActionTable const& actions) {
1390     int bitpos = 0;
1391     unsigned int indexEmpty = 0;
1392     unsigned int indexOfPath = 0;
1393     for (auto& pathStatusInserter : pathStatusInserters) {
1394       std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1395       WorkerPtr workerPtr(
1396           new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1397       pathStatusInserterWorkers_.emplace_back(workerPtr);
1398       workerPtr->setActivityRegistry(actReg_);
1399       addToAllWorkers(workerPtr.get());
1400 
1401       // A little complexity here because a C++ Path object is not
1402       // instantiated and put into end_paths if there are no modules
1403       // on the configured path.
1404       if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1405         ++indexEmpty;
1406       } else {
1407         trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1408         ++indexOfPath;
1409       }
1410       ++bitpos;
1411     }
1412 
1413     bitpos = 0;
1414     indexEmpty = 0;
1415     indexOfPath = 0;
1416     for (auto& endPathStatusInserter : endPathStatusInserters) {
1417       std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1418       WorkerPtr workerPtr(
1419           new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1420       endPathStatusInserterWorkers_.emplace_back(workerPtr);
1421       workerPtr->setActivityRegistry(actReg_);
1422       addToAllWorkers(workerPtr.get());
1423 
1424       // A little complexity here because a C++ Path object is not
1425       // instantiated and put into end_paths if there are no modules
1426       // on the configured path.
1427       if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1428         ++indexEmpty;
1429       } else {
1430         end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1431         ++indexOfPath;
1432       }
1433       ++bitpos;
1434     }
1435   }
1436 
1437   void StreamSchedule::handleException(StreamContext const& streamContext,
1438                                        ServiceWeakToken const& weakToken,
1439                                        bool cleaningUpAfterException,
1440                                        std::exception_ptr& excpt) const noexcept {
1441     //add context information to the exception and print message
1442     try {
1443       convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
1444     } catch (cms::Exception& ex) {
1445       std::ostringstream ost;
1446       // In most cases the exception will already have context at this point,
1447       // but add some context here in those rare cases where it does not.
1448       if (ex.context().empty()) {
1449         exceptionContext(ost, streamContext);
1450       }
1451       ServiceRegistry::Operate op(weakToken.lock());
1452       addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
1453       excpt = std::current_exception();
1454     }
1455     // We are already handling an earlier exception, so ignore it
1456     // if this signal results in another exception being thrown.
1457     CMS_SA_ALLOW try {
1458       ServiceRegistry::Operate op(weakToken.lock());
1459       actReg_->preStreamEarlyTerminationSignal_(streamContext, TerminationOrigin::ExceptionFromThisContext);
1460     } catch (...) {
1461     }
1462   }
1463 }  // namespace edm