Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-29 22:58:05

0001 #include "FWCore/Framework/interface/StreamSchedule.h"
0002 
0003 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0004 #include "DataFormats/Provenance/interface/EventID.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0007 #include "DataFormats/Provenance/interface/Timestamp.h"
0008 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0009 #include "FWCore/Framework/src/OutputModuleDescription.h"
0010 #include "FWCore/Framework/interface/TriggerNamesService.h"
0011 #include "FWCore/Framework/src/TriggerReport.h"
0012 #include "FWCore/Framework/src/TriggerTimingReport.h"
0013 #include "FWCore/Framework/src/ModuleHolderFactory.h"
0014 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0015 #include "FWCore/Framework/src/TriggerResultInserter.h"
0016 #include "FWCore/Framework/src/PathStatusInserter.h"
0017 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0018 #include "FWCore/Framework/interface/WorkerInPath.h"
0019 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0020 #include "FWCore/Framework/interface/maker/WorkerT.h"
0021 #include "FWCore/Framework/interface/ModuleRegistry.h"
0022 #include "FWCore/Framework/interface/ModuleRegistryUtilities.h"
0023 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0024 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0025 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0026 #include "FWCore/ParameterSet/interface/Registry.h"
0027 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0028 #include "FWCore/ServiceRegistry/interface/ModuleConsumesInfo.h"
0029 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0030 #include "FWCore/Utilities/interface/Algorithms.h"
0031 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0032 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0033 #include "FWCore/Utilities/interface/RunIndex.h"
0034 
0035 #include "LuminosityBlockProcessingStatus.h"
0036 #include "processEDAliases.h"
0037 
0038 #include <algorithm>
0039 #include <cassert>
0040 #include <cstdlib>
0041 #include <functional>
0042 #include <iomanip>
0043 #include <limits>
0044 #include <list>
0045 #include <map>
0046 #include <fmt/format.h>
0047 
0048 namespace edm {
0049 
0050   namespace {
0051 
0052     // Function template to transform each element in the input range to
0053     // a value placed into the output range. The supplied function
0054     // should take a const_reference to the 'input', and write to a
0055     // reference to the 'output'.
0056     template <typename InputIterator, typename ForwardIterator, typename Func>
0057     void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
0058       for (; begin != end; ++begin, ++out)
0059         func(*begin, *out);
0060     }
0061 
0062     // Function template that takes a sequence 'from', a sequence
0063     // 'to', and a callable object 'func'. It and applies
0064     // transform_into to fill the 'to' sequence with the values
0065     // calcuated by the callable object, taking care to fill the
0066     // outupt only if all calls succeed.
0067     template <typename FROM, typename TO, typename FUNC>
0068     void fill_summary(FROM const& from, TO& to, FUNC func) {
0069       if (to.size() != from.size()) {
0070         TO temp(from.size());
0071         transform_into(from.begin(), from.end(), temp.begin(), func);
0072         to.swap(temp);
0073       } else {
0074         transform_into(from.begin(), from.end(), to.begin(), func);
0075       }
0076     }
0077 
0078     class BeginStreamTraits {
0079     public:
0080       static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0081         activityRegistry->preBeginStreamSignal_(*streamContext);
0082       }
0083       static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0084         activityRegistry->postBeginStreamSignal_(*streamContext);
0085       }
0086     };
0087 
0088     class EndStreamTraits {
0089     public:
0090       static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0091         activityRegistry->preEndStreamSignal_(*streamContext);
0092       }
0093       static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
0094         activityRegistry->postEndStreamSignal_(*streamContext);
0095       }
0096     };
0097 
0098     // -----------------------------
0099 
0100     void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
0101                                          ProductRegistry const& preg,
0102                                          std::multimap<std::string, Worker*>& branchToReadingWorker) {
0103       auto vBranchesToDeleteEarly = branchesToDeleteEarly;
0104       // Remove any duplicates
0105       std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
0106       vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
0107                                    vBranchesToDeleteEarly.end());
0108 
0109       // Are the requested items in the product registry?
0110       auto allBranchNames = preg.allBranchNames();
0111       //the branch names all end with a period, which we do not want to compare with
0112       for (auto& b : allBranchNames) {
0113         b.resize(b.size() - 1);
0114       }
0115       std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
0116       std::vector<std::string> temp;
0117       temp.reserve(vBranchesToDeleteEarly.size());
0118 
0119       std::set_intersection(vBranchesToDeleteEarly.begin(),
0120                             vBranchesToDeleteEarly.end(),
0121                             allBranchNames.begin(),
0122                             allBranchNames.end(),
0123                             std::back_inserter(temp));
0124       vBranchesToDeleteEarly.swap(temp);
0125       if (temp.size() != vBranchesToDeleteEarly.size()) {
0126         std::vector<std::string> missingProducts;
0127         std::set_difference(temp.begin(),
0128                             temp.end(),
0129                             vBranchesToDeleteEarly.begin(),
0130                             vBranchesToDeleteEarly.end(),
0131                             std::back_inserter(missingProducts));
0132         LogInfo l("MissingProductsForCanDeleteEarly");
0133         l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
0134         for (auto const& n : missingProducts) {
0135           l << "\n " << n;
0136         }
0137       }
0138       //set placeholder for the branch, we will remove the nullptr if a
0139       // module actually wants the branch.
0140       for (auto const& branch : vBranchesToDeleteEarly) {
0141         branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
0142       }
0143     }
0144 
0145     Worker* getWorker(std::string const& moduleLabel,
0146                       ParameterSet& proc_pset,
0147                       WorkerManager& workerManager,
0148                       SignallingProductRegistryFiller& preg,
0149                       PreallocationConfiguration const* prealloc,
0150                       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0151                       bool addToAllWorkers = true) {
0152       bool isTracked;
0153       ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0154       if (modpset == nullptr) {
0155         return nullptr;
0156       }
0157       assert(isTracked);
0158 
0159       return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel, addToAllWorkers);
0160     }
0161 
0162     // If ConditionalTask modules exist in the container of module
0163     // names, returns the range (std::pair) for the modules. The range
0164     // excludes the special markers '#' (right before the
0165     // ConditionalTask modules) and '@' (last element).
0166     // If the module name container does not contain ConditionalTask
0167     // modules, returns std::pair of end iterators.
0168     template <typename T>
0169     auto findConditionalTaskModulesRange(T& modnames) {
0170       auto beg = std::find(modnames.begin(), modnames.end(), "#");
0171       if (beg == modnames.end()) {
0172         return std::pair(modnames.end(), modnames.end());
0173       }
0174       return std::pair(beg + 1, std::prev(modnames.end()));
0175     }
0176 
0177     std::optional<std::string> findBestMatchingAlias(
0178         std::unordered_multimap<std::string, edm::ProductDescription const*> const& conditionalModuleBranches,
0179         std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
0180         std::string const& productModuleLabel,
0181         ModuleConsumesInfo const& consumesInfo) {
0182       std::optional<std::string> best;
0183       int wildcardsInBest = std::numeric_limits<int>::max();
0184       bool bestIsAmbiguous = false;
0185 
0186       auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
0187                             std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
0188         int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
0189         if (wildcards == 0) {
0190           bestIsAmbiguous = false;
0191           return true;
0192         }
0193         if (not best or wildcards < wildcardsInBest) {
0194           best = label;
0195           wildcardsInBest = wildcards;
0196           bestIsAmbiguous = false;
0197         } else if (best and *best != label and wildcardsInBest == wildcards) {
0198           bestIsAmbiguous = true;
0199         }
0200         return false;
0201       };
0202 
0203       auto findAlias = aliasMap.equal_range(productModuleLabel);
0204       for (auto it = findAlias.first; it != findAlias.second; ++it) {
0205         std::string const& aliasInstanceLabel =
0206             it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
0207         bool const instanceIsWildcard = (aliasInstanceLabel == "*");
0208         if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
0209           bool const typeIsWildcard = it->second.friendlyClassName == "*";
0210           if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
0211             if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0212               return it->second.originalModuleLabel;
0213             }
0214           } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
0215             //consume is a View so need to do more intrusive search
0216             //find matching branches in module
0217             auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
0218             for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
0219               if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
0220                 if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
0221                                                                    TypeID(itBranch->second->wrappedType().typeInfo()),
0222                                                                    itBranch->second->className())) {
0223                   if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
0224                     return it->second.originalModuleLabel;
0225                   }
0226                 }
0227               }
0228             }
0229           }
0230         }
0231       }
0232       if (bestIsAmbiguous) {
0233         throw Exception(errors::UnimplementedFeature)
0234             << "Encountered ambiguity when trying to find a best-matching alias for\n"
0235             << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
0236             << " module label " << productModuleLabel << "\n"
0237             << " product instance name " << consumesInfo.instance() << "\n"
0238             << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
0239                "wildcards ("
0240             << wildcardsInBest << ")";
0241       }
0242       return best;
0243     }
0244   }  // namespace
0245 
0246   // -----------------------------
0247 
0248   typedef std::vector<std::string> vstring;
0249 
0250   // -----------------------------
0251 
0252   class ConditionalTaskHelper {
0253   public:
0254     using AliasInfo = StreamSchedule::AliasInfo;
0255 
0256     ConditionalTaskHelper(ParameterSet& proc_pset,
0257                           SignallingProductRegistryFiller& preg,
0258                           PreallocationConfiguration const* prealloc,
0259                           std::shared_ptr<ProcessConfiguration const> processConfiguration,
0260                           WorkerManager& workerManagerLumisAndEvents,
0261                           std::vector<std::string> const& trigPathNames) {
0262       std::unordered_set<std::string> allConditionalMods;
0263       for (auto const& pathName : trigPathNames) {
0264         auto const modnames = proc_pset.getParameter<vstring>(pathName);
0265 
0266         //Pull out ConditionalTask modules
0267         auto condRange = findConditionalTaskModulesRange(modnames);
0268         if (condRange.first == condRange.second)
0269           continue;
0270 
0271         //the last entry should be ignored since it is required to be "@"
0272         allConditionalMods.insert(condRange.first, condRange.second);
0273       }
0274 
0275       for (auto const& cond : allConditionalMods) {
0276         //force the creation of the conditional modules so alias check can work
0277         // must be sure this is not added to the list of all workers else the system will hang in case the
0278         // module is not used.
0279         (void)getWorker(cond, proc_pset, workerManagerLumisAndEvents, preg, prealloc, processConfiguration, false);
0280       }
0281 
0282       fillAliasMap(proc_pset, allConditionalMods);
0283       processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
0284 
0285       //find branches created by the conditional modules
0286       for (auto const& prod : preg.registry().productList()) {
0287         if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
0288           conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
0289         }
0290       }
0291     }
0292 
0293     std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
0294 
0295     std::unordered_multimap<std::string, edm::ProductDescription const*> conditionalModuleBranches(
0296         std::unordered_set<std::string> const& conditionalmods) const {
0297       std::unordered_multimap<std::string, edm::ProductDescription const*> ret;
0298       for (auto const& mod : conditionalmods) {
0299         auto range = conditionalModsBranches_.equal_range(mod);
0300         ret.insert(range.first, range.second);
0301       }
0302       return ret;
0303     }
0304 
0305   private:
0306     void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
0307       auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0308       std::string const star("*");
0309       for (auto const& alias : aliases) {
0310         auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
0311         auto aliasedToModuleLabels = info.getParameterNames();
0312         for (auto const& mod : aliasedToModuleLabels) {
0313           if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
0314             auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
0315             for (auto const& aliasPSet : aliasVPSet) {
0316               std::string type = star;
0317               std::string instance = star;
0318               std::string originalInstance = star;
0319               if (aliasPSet.exists("type")) {
0320                 type = aliasPSet.getParameter<std::string>("type");
0321               }
0322               if (aliasPSet.exists("toProductInstance")) {
0323                 instance = aliasPSet.getParameter<std::string>("toProductInstance");
0324               }
0325               if (aliasPSet.exists("fromProductInstance")) {
0326                 originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
0327               }
0328 
0329               aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
0330             }
0331           }
0332         }
0333       }
0334     }
0335 
0336     void processSwitchEDAliases(ParameterSet const& proc_pset,
0337                                 SignallingProductRegistryFiller& preg,
0338                                 ProcessConfiguration const& processConfiguration,
0339                                 std::unordered_set<std::string> const& allConditionalMods) {
0340       auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
0341       std::vector<std::string> switchEDAliases;
0342       for (auto const& module : all_modules) {
0343         auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
0344         if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
0345           auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
0346           for (auto const& case_label : all_cases) {
0347             auto range = aliasMap_.equal_range(case_label);
0348             if (range.first != range.second) {
0349               switchEDAliases.push_back(case_label);
0350             }
0351           }
0352         }
0353       }
0354       detail::processEDAliases(
0355           switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
0356     }
0357 
0358     std::unordered_multimap<std::string, AliasInfo> aliasMap_;
0359     std::unordered_multimap<std::string, edm::ProductDescription const*> conditionalModsBranches_;
0360   };
0361 
0362   // -----------------------------
0363 
0364   StreamSchedule::StreamSchedule(
0365       std::shared_ptr<TriggerResultInserter> inserter,
0366       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0367       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0368       std::shared_ptr<ModuleRegistry> modReg,
0369       ParameterSet& proc_pset,
0370       service::TriggerNamesService const& tns,
0371       PreallocationConfiguration const& prealloc,
0372       SignallingProductRegistryFiller& preg,
0373       ExceptionToActionTable const& actions,
0374       std::shared_ptr<ActivityRegistry> areg,
0375       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0376       StreamID streamID,
0377       ProcessContext const* processContext)
0378       : workerManagerRuns_(modReg, areg, actions),
0379         workerManagerLumisAndEvents_(modReg, areg, actions),
0380         actReg_(areg),
0381         results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
0382         results_inserter_(),
0383         trig_paths_(),
0384         end_paths_(),
0385         total_events_(),
0386         total_passed_(),
0387         number_of_unscheduled_modules_(0),
0388         streamID_(streamID),
0389         streamContext_(streamID_, processContext) {
0390     bool hasPath = false;
0391     std::vector<std::string> const& pathNames = tns.getTrigPaths();
0392     std::vector<std::string> const& endPathNames = tns.getEndPaths();
0393 
0394     ConditionalTaskHelper conditionalTaskHelper(
0395         proc_pset, preg, &prealloc, processConfiguration, workerManagerLumisAndEvents_, pathNames);
0396     std::unordered_set<std::string> conditionalModules;
0397 
0398     int trig_bitpos = 0;
0399     trig_paths_.reserve(pathNames.size());
0400     for (auto const& trig_name : pathNames) {
0401       fillTrigPath(proc_pset,
0402                    preg,
0403                    &prealloc,
0404                    processConfiguration,
0405                    trig_bitpos,
0406                    trig_name,
0407                    results(),
0408                    endPathNames,
0409                    conditionalTaskHelper,
0410                    conditionalModules);
0411       ++trig_bitpos;
0412       hasPath = true;
0413     }
0414 
0415     if (hasPath) {
0416       // the results inserter stands alone
0417       inserter->setTrigResultForStream(streamID.value(), results());
0418       results_inserter_ = workerManagerLumisAndEvents_.getWorkerForModule(*inserter);
0419     }
0420 
0421     // fill normal endpaths
0422     int bitpos = 0;
0423     end_paths_.reserve(endPathNames.size());
0424     for (auto const& end_path_name : endPathNames) {
0425       fillEndPath(proc_pset,
0426                   preg,
0427                   &prealloc,
0428                   processConfiguration,
0429                   bitpos,
0430                   end_path_name,
0431                   endPathNames,
0432                   conditionalTaskHelper,
0433                   conditionalModules);
0434       ++bitpos;
0435     }
0436 
0437     makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
0438 
0439     //See if all modules were used
0440     std::set<std::string> usedWorkerLabels;
0441     for (auto const& worker : allWorkersLumisAndEvents()) {
0442       usedWorkerLabels.insert(worker->description()->moduleLabel());
0443     }
0444     std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0445     std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0446     std::vector<std::string> unusedLabels;
0447     set_difference(modulesInConfigSet.begin(),
0448                    modulesInConfigSet.end(),
0449                    usedWorkerLabels.begin(),
0450                    usedWorkerLabels.end(),
0451                    back_inserter(unusedLabels));
0452     std::set<std::string> unscheduledLabels;
0453     std::vector<std::string> shouldBeUsedLabels;
0454     if (!unusedLabels.empty()) {
0455       //Need to
0456       // 1) create worker
0457       // 2) if it is a WorkerT<EDProducer>, add it to our list
0458       // 3) hand list to our delayed reader
0459       for (auto const& label : unusedLabels) {
0460         bool isTracked;
0461         ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
0462         assert(isTracked);
0463         assert(modulePSet != nullptr);
0464         workerManagerLumisAndEvents_.addToUnscheduledWorkers(
0465             *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0466       }
0467       if (!shouldBeUsedLabels.empty()) {
0468         std::ostringstream unusedStream;
0469         unusedStream << "'" << shouldBeUsedLabels.front() << "'";
0470         for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
0471                                                 itLabelEnd = shouldBeUsedLabels.end();
0472              itLabel != itLabelEnd;
0473              ++itLabel) {
0474           unusedStream << ",'" << *itLabel << "'";
0475         }
0476         LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
0477       }
0478     }
0479     number_of_unscheduled_modules_ = unscheduledLabels.size();
0480 
0481     // Print conditional modules that were not consumed in any of their associated Paths
0482     if (streamID.value() == 0 and not conditionalModules.empty()) {
0483       // Intersection of unscheduled and ConditionalTask modules gives
0484       // directly the set of conditional modules that were not
0485       // consumed by anything in the Paths associated to the
0486       // corresponding ConditionalTask.
0487       std::vector<std::string_view> labelsToPrint;
0488       std::copy_if(
0489           unscheduledLabels.begin(),
0490           unscheduledLabels.end(),
0491           std::back_inserter(labelsToPrint),
0492           [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
0493 
0494       if (not labelsToPrint.empty()) {
0495         edm::LogWarning log("NonConsumedConditionalModules");
0496         log << "The following modules were part of some ConditionalTask, but were not\n"
0497             << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
0498             << "was associated. Perhaps they should be either removed from the\n"
0499             << "job, or moved to a Task to make it explicit they are unscheduled.\n";
0500         for (auto const& modLabel : labelsToPrint) {
0501           log.format("\n {}", modLabel);
0502         }
0503       }
0504     }
0505 
0506     for (auto const& worker : allWorkersLumisAndEvents()) {
0507       std::string const& moduleLabel = worker->description()->moduleLabel();
0508 
0509       // The new worker pointers will be null for the TriggerResultsInserter, PathStatusInserter, and
0510       // EndPathStatusInserter because there are no ParameterSets for those in the configuration.
0511       // We could add special code to create workers for those, but instead we skip them because they
0512       // do not have beginStream, endStream, or run/lumi begin/end stream transition functions.
0513       (void)getWorker(moduleLabel, proc_pset, workerManagerRuns_, preg, &prealloc, processConfiguration);
0514     }
0515 
0516   }  // StreamSchedule::StreamSchedule
0517 
0518   void StreamSchedule::initializeEarlyDelete(ModuleRegistry& modReg,
0519                                              std::vector<std::string> const& branchesToDeleteEarly,
0520                                              std::multimap<std::string, std::string> const& referencesToBranches,
0521                                              std::vector<std::string> const& modulesToSkip,
0522                                              edm::ProductRegistry const& preg) {
0523     // setup the list with those products actually registered for this job
0524     std::multimap<std::string, Worker*> branchToReadingWorker;
0525     initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
0526 
0527     const std::vector<std::string> kEmpty;
0528     std::map<Worker*, unsigned int> reserveSizeForWorker;
0529     unsigned int upperLimitOnReadingWorker = 0;
0530     unsigned int upperLimitOnIndicies = 0;
0531     unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
0532 
0533     //talk with output modules first
0534     modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
0535       auto comm = iHolder->createOutputModuleCommunicator();
0536       if (comm) {
0537         if (!branchToReadingWorker.empty()) {
0538           //If an OutputModule needs a product, we can't delete it early
0539           // so we should remove it from our list
0540           SelectedProductsForBranchType const& kept = comm->keptProducts();
0541           for (auto const& item : kept[InEvent]) {
0542             ProductDescription const& desc = *item.first;
0543             auto found = branchToReadingWorker.equal_range(desc.branchName());
0544             if (found.first != found.second) {
0545               --nUniqueBranchesToDelete;
0546               branchToReadingWorker.erase(found.first, found.second);
0547             }
0548           }
0549         }
0550       }
0551     });
0552 
0553     if (branchToReadingWorker.empty()) {
0554       return;
0555     }
0556 
0557     std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
0558     for (auto w : allWorkersLumisAndEvents()) {
0559       if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
0560         continue;
0561       }
0562       //determine if this module could read a branch we want to delete early
0563       auto consumes = w->moduleConsumesInfos();
0564       if (not consumes.empty()) {
0565         bool foundAtLeastOneMatchingBranch = false;
0566         for (auto const& product : consumes) {
0567           std::string branch = fmt::format("{}_{}_{}_{}",
0568                                            product.type().friendlyClassName(),
0569                                            product.label().data(),
0570                                            product.instance().data(),
0571                                            product.process().data());
0572           {
0573             //Handle case where worker directly consumes product
0574             auto found = branchToReadingWorker.end();
0575             if (product.process().empty()) {
0576               auto startFound = branchToReadingWorker.lower_bound(branch);
0577               if (startFound != branchToReadingWorker.end()) {
0578                 if (startFound->first.substr(0, branch.size()) == branch) {
0579                   //match all processNames here, even if it means multiple matches will happen
0580                   found = startFound;
0581                 }
0582               }
0583             } else {
0584               auto exactFound = branchToReadingWorker.equal_range(branch);
0585               if (exactFound.first != exactFound.second) {
0586                 found = exactFound.first;
0587               }
0588             }
0589             if (found != branchToReadingWorker.end()) {
0590               if (not foundAtLeastOneMatchingBranch) {
0591                 ++upperLimitOnReadingWorker;
0592                 foundAtLeastOneMatchingBranch = true;
0593               }
0594               ++upperLimitOnIndicies;
0595               ++reserveSizeForWorker[w];
0596               if (nullptr == found->second) {
0597                 found->second = w;
0598               } else {
0599                 branchToReadingWorker.insert(make_pair(found->first, w));
0600               }
0601             }
0602           }
0603           {
0604             //Handle case where indirectly consumes product
0605             auto found = referencesToBranches.end();
0606             if (product.process().empty()) {
0607               auto startFound = referencesToBranches.lower_bound(branch);
0608               if (startFound != referencesToBranches.end()) {
0609                 if (startFound->first.substr(0, branch.size()) == branch) {
0610                   //match all processNames here, even if it means multiple matches will happen
0611                   found = startFound;
0612                 }
0613               }
0614             } else {
0615               //can match exactly
0616               auto exactFound = referencesToBranches.equal_range(branch);
0617               if (exactFound.first != exactFound.second) {
0618                 found = exactFound.first;
0619               }
0620             }
0621             if (found != referencesToBranches.end()) {
0622               for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
0623                 auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
0624                 if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
0625                   continue;
0626                 }
0627                 if (not foundAtLeastOneMatchingBranch) {
0628                   ++upperLimitOnReadingWorker;
0629                   foundAtLeastOneMatchingBranch = true;
0630                 }
0631                 ++upperLimitOnIndicies;
0632                 ++reserveSizeForWorker[w];
0633                 if (nullptr == foundInBranchToReadingWorker->second) {
0634                   foundInBranchToReadingWorker->second = w;
0635                 } else {
0636                   branchToReadingWorker.insert(make_pair(itr->second, w));
0637                 }
0638               }
0639             }
0640           }
0641         }
0642       }
0643     }
0644     {
0645       auto it = branchToReadingWorker.begin();
0646       std::vector<std::string> unusedBranches;
0647       while (it != branchToReadingWorker.end()) {
0648         if (it->second == nullptr) {
0649           unusedBranches.push_back(it->first);
0650           //erasing the object invalidates the iterator so must advance it first
0651           auto temp = it;
0652           ++it;
0653           branchToReadingWorker.erase(temp);
0654         } else {
0655           ++it;
0656         }
0657       }
0658       if (not unusedBranches.empty() and streamID_.value() == 0) {
0659         LogWarning l("UnusedProductsForCanDeleteEarly");
0660         l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
0661              " If possible, remove the producer from the job.";
0662         for (auto const& n : unusedBranches) {
0663           l << "\n " << n;
0664         }
0665       }
0666     }
0667     if (!branchToReadingWorker.empty()) {
0668       earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
0669       earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
0670       earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
0671       std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
0672       std::string lastBranchName;
0673       size_t nextOpenIndex = 0;
0674       unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
0675       for (auto& branchAndWorker : branchToReadingWorker) {
0676         if (lastBranchName != branchAndWorker.first) {
0677           //have to put back the period we removed earlier in order to get the proper name
0678           BranchID bid(branchAndWorker.first + ".");
0679           earlyDeleteBranchToCount_.emplace_back(bid, 0U);
0680           lastBranchName = branchAndWorker.first;
0681         }
0682         auto found = alreadySeenWorkers.find(branchAndWorker.second);
0683         if (alreadySeenWorkers.end() == found) {
0684           //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
0685           // all the branches that might be read by this worker. However, initially we will only tell the
0686           // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
0687           // EarlyDeleteHelper will automatically advance its internal end pointer.
0688           size_t index = nextOpenIndex;
0689           size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
0690           assert(index < earlyDeleteHelperToBranchIndicies_.size());
0691           earlyDeleteHelperToBranchIndicies_[index] = earlyDeleteBranchToCount_.size() - 1;
0692           earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
0693           branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
0694           alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
0695           nextOpenIndex += nIndices;
0696         } else {
0697           found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
0698         }
0699       }
0700 
0701       //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
0702       // space needed for each module
0703       auto itLast = earlyDeleteHelpers_.begin();
0704       for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
0705         if (itLast->end() != it->begin()) {
0706           //figure the offset for next Worker since it hasn't been moved yet so it has the original address
0707           unsigned int delta = it->begin() - itLast->end();
0708           it->shiftIndexPointers(delta);
0709 
0710           earlyDeleteHelperToBranchIndicies_.erase(
0711               earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0712               earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
0713         }
0714         itLast = it;
0715       }
0716       earlyDeleteHelperToBranchIndicies_.erase(
0717           earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0718           earlyDeleteHelperToBranchIndicies_.end());
0719 
0720       //now tell the paths about the deleters
0721       for (auto& p : trig_paths_) {
0722         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0723       }
0724       for (auto& p : end_paths_) {
0725         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0726       }
0727       resetEarlyDelete();
0728     }
0729   }
0730 
0731   std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
0732       Worker* worker,
0733       std::unordered_set<std::string>& conditionalModules,
0734       std::unordered_multimap<std::string, edm::ProductDescription const*> const& conditionalModuleBranches,
0735       std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0736       ParameterSet& proc_pset,
0737       SignallingProductRegistryFiller& preg,
0738       PreallocationConfiguration const* prealloc,
0739       std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0740     std::vector<Worker*> returnValue;
0741     auto const& consumesInfo = worker->moduleConsumesInfos();
0742     auto moduleLabel = worker->description()->moduleLabel();
0743     using namespace productholderindexhelper;
0744     for (auto const& ci : consumesInfo) {
0745       if (not ci.skipCurrentProcess() and
0746           (ci.process().empty() or ci.process() == processConfiguration->processName())) {
0747         auto productModuleLabel = std::string(ci.label());
0748         bool productFromConditionalModule = false;
0749         auto itFound = conditionalModules.find(productModuleLabel);
0750         if (itFound == conditionalModules.end()) {
0751           //Check to see if this was an alias
0752           //note that aliasMap was previously filtered so only the conditional modules remain there
0753           auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
0754           if (foundAlias) {
0755             productModuleLabel = *foundAlias;
0756             productFromConditionalModule = true;
0757             itFound = conditionalModules.find(productModuleLabel);
0758             //check that the alias-for conditional module has not been used
0759             if (itFound == conditionalModules.end()) {
0760               continue;
0761             }
0762           }
0763         } else {
0764           //need to check the rest of the data product info
0765           auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
0766           for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
0767             if (itBranch->second->productInstanceName() == ci.instance()) {
0768               if (ci.kindOfType() == PRODUCT_TYPE) {
0769                 if (ci.type() == itBranch->second->unwrappedTypeID()) {
0770                   productFromConditionalModule = true;
0771                   break;
0772                 }
0773               } else {
0774                 //this is a view
0775                 if (typeIsViewCompatible(
0776                         ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
0777                   productFromConditionalModule = true;
0778                   break;
0779                 }
0780               }
0781             }
0782           }
0783         }
0784         if (productFromConditionalModule) {
0785           auto condWorker = getWorker(
0786               productModuleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0787           assert(condWorker);
0788 
0789           conditionalModules.erase(itFound);
0790 
0791           auto dependents = tryToPlaceConditionalModules(condWorker,
0792                                                          conditionalModules,
0793                                                          conditionalModuleBranches,
0794                                                          aliasMap,
0795                                                          proc_pset,
0796                                                          preg,
0797                                                          prealloc,
0798                                                          processConfiguration);
0799           returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0800           returnValue.push_back(condWorker);
0801         }
0802       }
0803     }
0804     return returnValue;
0805   }
0806 
0807   StreamSchedule::PathWorkers StreamSchedule::fillWorkers(
0808       ParameterSet& proc_pset,
0809       SignallingProductRegistryFiller& preg,
0810       PreallocationConfiguration const* prealloc,
0811       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0812       std::string const& pathName,
0813       bool ignoreFilters,
0814       std::vector<std::string> const& endPathNames,
0815       ConditionalTaskHelper const& conditionalTaskHelper,
0816       std::unordered_set<std::string>& allConditionalModules) {
0817     vstring modnames = proc_pset.getParameter<vstring>(pathName);
0818     PathWorkers tmpworkers;
0819 
0820     //Pull out ConditionalTask modules
0821     auto condRange = findConditionalTaskModulesRange(modnames);
0822 
0823     std::unordered_set<std::string> conditionalmods;
0824     //An EDAlias may be redirecting to a module on a ConditionalTask
0825     std::unordered_multimap<std::string, edm::ProductDescription const*> conditionalModsBranches;
0826     std::unordered_map<std::string, unsigned int> conditionalModOrder;
0827     if (condRange.first != condRange.second) {
0828       for (auto it = condRange.first; it != condRange.second; ++it) {
0829         // ordering needs to skip the # token in the path list
0830         conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
0831       }
0832       //the last entry should be ignored since it is required to be "@"
0833       conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
0834                                                         std::make_move_iterator(condRange.second));
0835 
0836       conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
0837       modnames.erase(std::prev(condRange.first), modnames.end());
0838 
0839       // Make a union of all conditional modules from all Paths
0840       allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
0841     }
0842 
0843     unsigned int placeInPath = 0;
0844     for (auto const& name : modnames) {
0845       //Modules except EDFilters are set to run concurrently by default
0846       bool doNotRunConcurrently = false;
0847       WorkerInPath::FilterAction filterAction = WorkerInPath::Normal;
0848       if (name[0] == '!') {
0849         filterAction = WorkerInPath::Veto;
0850       } else if (name[0] == '-' or name[0] == '+') {
0851         filterAction = WorkerInPath::Ignore;
0852       }
0853       if (name[0] == '|' or name[0] == '+') {
0854         //cms.wait was specified so do not run concurrently
0855         doNotRunConcurrently = true;
0856       }
0857 
0858       std::string moduleLabel = name;
0859       if (filterAction != WorkerInPath::Normal or name[0] == '|') {
0860         moduleLabel.erase(0, 1);
0861       }
0862 
0863       Worker* worker =
0864           getWorker(moduleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
0865       if (worker == nullptr) {
0866         std::string pathType("endpath");
0867         if (!search_all(endPathNames, pathName)) {
0868           pathType = std::string("path");
0869         }
0870         throw Exception(errors::Configuration)
0871             << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
0872             << "\"\n please check spelling or remove that label from the path.";
0873       }
0874 
0875       if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
0876         // We have a filter on an end path, and the filter is not explicitly ignored.
0877         // See if the filter is allowed.
0878         std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
0879         if (!search_all(allowed_filters, worker->description()->moduleName())) {
0880           // Filter is not allowed. Ignore the result, and issue a warning.
0881           filterAction = WorkerInPath::Ignore;
0882           LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
0883                                         << "' with module label '" << moduleLabel << "' appears on EndPath '"
0884                                         << pathName << "'.\n"
0885                                         << "The return value of the filter will be ignored.\n"
0886                                         << "To suppress this warning, either remove the filter from the endpath,\n"
0887                                         << "or explicitly ignore it in the configuration by using cms.ignore().\n";
0888         }
0889       }
0890       bool runConcurrently = not doNotRunConcurrently;
0891       if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
0892         runConcurrently = false;
0893       }
0894 
0895       auto condModules = tryToPlaceConditionalModules(worker,
0896                                                       conditionalmods,
0897                                                       conditionalModsBranches,
0898                                                       conditionalTaskHelper.aliasMap(),
0899                                                       proc_pset,
0900                                                       preg,
0901                                                       prealloc,
0902                                                       processConfiguration);
0903       for (auto condMod : condModules) {
0904         tmpworkers.emplace_back(
0905             condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
0906       }
0907 
0908       tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
0909       ++placeInPath;
0910     }
0911 
0912     return tmpworkers;
0913   }
0914 
0915   void StreamSchedule::fillTrigPath(ParameterSet& proc_pset,
0916                                     SignallingProductRegistryFiller& preg,
0917                                     PreallocationConfiguration const* prealloc,
0918                                     std::shared_ptr<ProcessConfiguration const> processConfiguration,
0919                                     int bitpos,
0920                                     std::string const& name,
0921                                     TrigResPtr trptr,
0922                                     std::vector<std::string> const& endPathNames,
0923                                     ConditionalTaskHelper const& conditionalTaskHelper,
0924                                     std::unordered_set<std::string>& allConditionalModules) {
0925     PathWorkers tmpworkers = fillWorkers(proc_pset,
0926                                          preg,
0927                                          prealloc,
0928                                          processConfiguration,
0929                                          name,
0930                                          false,
0931                                          endPathNames,
0932                                          conditionalTaskHelper,
0933                                          allConditionalModules);
0934 
0935     // an empty path will cause an extra bit that is not used
0936     if (!tmpworkers.empty()) {
0937       trig_paths_.emplace_back(
0938           bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
0939     } else {
0940       empty_trig_paths_.push_back(bitpos);
0941     }
0942   }
0943 
0944   void StreamSchedule::fillEndPath(ParameterSet& proc_pset,
0945                                    SignallingProductRegistryFiller& preg,
0946                                    PreallocationConfiguration const* prealloc,
0947                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0948                                    int bitpos,
0949                                    std::string const& name,
0950                                    std::vector<std::string> const& endPathNames,
0951                                    ConditionalTaskHelper const& conditionalTaskHelper,
0952                                    std::unordered_set<std::string>& allConditionalModules) {
0953     PathWorkers tmpworkers = fillWorkers(proc_pset,
0954                                          preg,
0955                                          prealloc,
0956                                          processConfiguration,
0957                                          name,
0958                                          true,
0959                                          endPathNames,
0960                                          conditionalTaskHelper,
0961                                          allConditionalModules);
0962 
0963     if (!tmpworkers.empty()) {
0964       end_paths_.emplace_back(bitpos,
0965                               name,
0966                               tmpworkers,
0967                               TrigResPtr(),
0968                               actionTable(),
0969                               actReg_,
0970                               &streamContext_,
0971                               PathContext::PathType::kEndPath);
0972     } else {
0973       empty_end_paths_.push_back(bitpos);
0974     }
0975   }
0976 
0977   void StreamSchedule::beginStream(ModuleRegistry& iModuleRegistry) {
0978     streamContext_.setTransition(StreamContext::Transition::kBeginStream);
0979     streamContext_.setEventID(EventID(0, 0, 0));
0980     streamContext_.setRunIndex(RunIndex::invalidRunIndex());
0981     streamContext_.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0982     streamContext_.setTimestamp(Timestamp());
0983 
0984     std::exception_ptr exceptionInStream;
0985     CMS_SA_ALLOW try {
0986       preScheduleSignal<BeginStreamTraits>(&streamContext_);
0987       runBeginStreamForModules(streamContext_, iModuleRegistry, *actReg_, moduleBeginStreamFailed_);
0988     } catch (...) {
0989       exceptionInStream = std::current_exception();
0990     }
0991 
0992     postScheduleSignal<BeginStreamTraits>(&streamContext_, exceptionInStream);
0993 
0994     if (exceptionInStream) {
0995       bool cleaningUpAfterException = false;
0996       handleException(streamContext_, cleaningUpAfterException, exceptionInStream);
0997     }
0998     streamContext_.setTransition(StreamContext::Transition::kInvalid);
0999 
1000     if (exceptionInStream) {
1001       std::rethrow_exception(exceptionInStream);
1002     }
1003   }
1004 
1005   void StreamSchedule::endStream(ModuleRegistry& iModuleRegistry,
1006                                  ExceptionCollector& collector,
1007                                  std::mutex& collectorMutex) noexcept {
1008     streamContext_.setTransition(StreamContext::Transition::kEndStream);
1009     streamContext_.setEventID(EventID(0, 0, 0));
1010     streamContext_.setRunIndex(RunIndex::invalidRunIndex());
1011     streamContext_.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
1012     streamContext_.setTimestamp(Timestamp());
1013 
1014     std::exception_ptr exceptionInStream;
1015     CMS_SA_ALLOW try {
1016       preScheduleSignal<EndStreamTraits>(&streamContext_);
1017       runEndStreamForModules(
1018           streamContext_, iModuleRegistry, *actReg_, collector, collectorMutex, moduleBeginStreamFailed_);
1019     } catch (...) {
1020       exceptionInStream = std::current_exception();
1021     }
1022 
1023     postScheduleSignal<EndStreamTraits>(&streamContext_, exceptionInStream);
1024 
1025     if (exceptionInStream) {
1026       std::lock_guard<std::mutex> collectorLock(collectorMutex);
1027       collector.call([&exceptionInStream]() { std::rethrow_exception(exceptionInStream); });
1028     }
1029     streamContext_.setTransition(StreamContext::Transition::kInvalid);
1030   }
1031 
1032   void StreamSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
1033     for (auto const& worker : allWorkersRuns()) {
1034       if (worker->description()->moduleLabel() == iLabel) {
1035         iMod->replaceModuleFor(worker);
1036         try {
1037           convertException::wrap([&] { iMod->beginStream(streamID_); });
1038         } catch (cms::Exception& ex) {
1039           moduleBeginStreamFailed_.emplace_back(iMod->moduleDescription().id());
1040           ex.addContext("Executing StreamSchedule::replaceModule");
1041           throw;
1042         }
1043         break;
1044       }
1045     }
1046 
1047     for (auto const& worker : allWorkersLumisAndEvents()) {
1048       if (worker->description()->moduleLabel() == iLabel) {
1049         iMod->replaceModuleFor(worker);
1050         break;
1051       }
1052     }
1053   }
1054 
1055   void StreamSchedule::deleteModule(std::string const& iLabel) {
1056     workerManagerRuns_.deleteModuleIfExists(iLabel);
1057     workerManagerLumisAndEvents_.deleteModuleIfExists(iLabel);
1058   }
1059 
1060   std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
1061     std::vector<ModuleDescription const*> result;
1062     result.reserve(allWorkersLumisAndEvents().size());
1063 
1064     for (auto const& worker : allWorkersLumisAndEvents()) {
1065       ModuleDescription const* p = worker->description();
1066       result.push_back(p);
1067     }
1068     return result;
1069   }
1070 
1071   void StreamSchedule::processOneEventAsync(
1072       WaitingTaskHolder iTask,
1073       EventTransitionInfo& info,
1074       ServiceToken const& serviceToken,
1075       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
1076     EventPrincipal& ep = info.principal();
1077 
1078     // Caught exception is propagated via WaitingTaskHolder
1079     CMS_SA_ALLOW try {
1080       this->resetAll();
1081 
1082       using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1083 
1084       Traits::setStreamContext(streamContext_, ep);
1085       //a service may want to communicate with another service
1086       ServiceRegistry::Operate guard(serviceToken);
1087       Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1088 
1089       // Data dependencies need to be set up before marking empty
1090       // (End)Paths complete in case something consumes the status of
1091       // the empty (EndPath)
1092       workerManagerLumisAndEvents_.setupResolvers(ep);
1093       workerManagerLumisAndEvents_.setupOnDemandSystem(info);
1094 
1095       HLTPathStatus hltPathStatus(hlt::Pass, 0);
1096       for (int empty_trig_path : empty_trig_paths_) {
1097         results_->at(empty_trig_path) = hltPathStatus;
1098         pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1099         std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1100                                         ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1101                                             info, streamID_, ParentContext(&streamContext_), &streamContext_);
1102         if (except) {
1103           iTask.doneWaiting(except);
1104           return;
1105         }
1106       }
1107       if (not endPathStatusInserterWorkers_.empty()) {
1108         for (int empty_end_path : empty_end_paths_) {
1109           std::exception_ptr except =
1110               endPathStatusInserterWorkers_[empty_end_path]
1111                   ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1112                       info, streamID_, ParentContext(&streamContext_), &streamContext_);
1113           if (except) {
1114             iTask.doneWaiting(except);
1115             return;
1116           }
1117         }
1118       }
1119 
1120       ++total_events_;
1121 
1122       //use to give priorities on an error to ones from Paths
1123       auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1124       auto pathErrorPtr = pathErrorHolder.get();
1125       ServiceWeakToken weakToken = serviceToken;
1126       auto allPathsDone = make_waiting_task(
1127           [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1128             ServiceRegistry::Operate operate(weakToken.lock());
1129 
1130             std::exception_ptr ptr;
1131             if (pathError->load()) {
1132               ptr = *pathError->load();
1133               delete pathError->load();
1134             }
1135             if ((not ptr) and iPtr) {
1136               ptr = *iPtr;
1137             }
1138             iTask.doneWaiting(finishProcessOneEvent(ptr));
1139           });
1140       //The holder guarantees that if the paths finish before the loop ends
1141       // that we do not start too soon. It also guarantees that the task will
1142       // run under that condition.
1143       WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1144 
1145       auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1146                                              std::exception_ptr const* iPtr) mutable {
1147         ServiceRegistry::Operate operate(weakToken.lock());
1148 
1149         if (iPtr) {
1150           // free previous value of pathErrorPtr, if any;
1151           // prioritize this error over one that happens in EndPath or Accumulate
1152           auto currentPtr = pathErrorPtr->exchange(new std::exception_ptr(*iPtr));
1153           assert(currentPtr == nullptr);
1154         }
1155         finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1156       });
1157 
1158       //The holder guarantees that if the paths finish before the loop ends
1159       // that we do not start too soon. It also guarantees that the task will
1160       // run under that condition.
1161       WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1162 
1163       //start end paths first so on single threaded the paths will run first
1164       WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1165       for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1166         it->processEventUsingPathAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1167       }
1168 
1169       for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1170         it->processEventUsingPathAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1171       }
1172 
1173       ParentContext parentContext(&streamContext_);
1174       workerManagerLumisAndEvents_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1175           hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1176     } catch (...) {
1177       iTask.doneWaiting(std::current_exception());
1178     }
1179   }
1180 
1181   void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1182                                      WaitingTaskHolder iWait,
1183                                      EventTransitionInfo& info) {
1184     if (iExcept) {
1185       // Caught exception is propagated via WaitingTaskHolder
1186       CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1187         exception_actions::ActionCodes action = actionTable().find(e.category());
1188         assert(action != exception_actions::IgnoreCompletely);
1189         if (action == exception_actions::TryToContinue) {
1190           edm::printCmsExceptionWarning("TryToContinue", e);
1191           *(iExcept.load()) = std::exception_ptr();
1192         } else {
1193           *(iExcept.load()) = std::current_exception();
1194         }
1195       } catch (...) {
1196         *(iExcept.load()) = std::current_exception();
1197       }
1198     }
1199 
1200     if ((not iExcept) and results_->accept()) {
1201       ++total_passed_;
1202     }
1203 
1204     if (nullptr != results_inserter_.get()) {
1205       // Caught exception is propagated to the caller
1206       CMS_SA_ALLOW try {
1207         //Even if there was an exception, we need to allow results inserter
1208         // to run since some module may be waiting on its results.
1209         ParentContext parentContext(&streamContext_);
1210         using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1211 
1212         auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1213         if (expt) {
1214           std::rethrow_exception(expt);
1215         }
1216       } catch (cms::Exception& ex) {
1217         if (not iExcept) {
1218           if (ex.context().empty()) {
1219             std::ostringstream ost;
1220             ost << "Processing Event " << info.principal().id();
1221             ex.addContext(ost.str());
1222           }
1223           iExcept.store(new std::exception_ptr(std::current_exception()));
1224         }
1225       } catch (...) {
1226         if (not iExcept) {
1227           iExcept.store(new std::exception_ptr(std::current_exception()));
1228         }
1229       }
1230     }
1231     std::exception_ptr ptr;
1232     if (iExcept) {
1233       ptr = *iExcept.load();
1234     }
1235     iWait.doneWaiting(ptr);
1236   }
1237 
1238   std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1239     using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1240 
1241     if (iExcept) {
1242       //add context information to the exception and print message
1243       try {
1244         convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1245       } catch (cms::Exception& ex) {
1246         bool const cleaningUpAfterException = false;
1247         if (ex.context().empty()) {
1248           addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1249         } else {
1250           addContextAndPrintException("", ex, cleaningUpAfterException);
1251         }
1252         iExcept = std::current_exception();
1253       }
1254 
1255       actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1256     }
1257     // Caught exception is propagated to the caller
1258     CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1259       if (not iExcept) {
1260         iExcept = std::current_exception();
1261       }
1262     }
1263     if (not iExcept) {
1264       resetEarlyDelete();
1265     }
1266 
1267     return iExcept;
1268   }
1269 
1270   void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1271     oLabelsToFill.reserve(trig_paths_.size());
1272     std::transform(trig_paths_.begin(),
1273                    trig_paths_.end(),
1274                    std::back_inserter(oLabelsToFill),
1275                    std::bind(&Path::name, std::placeholders::_1));
1276   }
1277 
1278   void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1279     TrigPaths::const_iterator itFound = std::find_if(
1280         trig_paths_.begin(),
1281         trig_paths_.end(),
1282         std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1283     if (itFound != trig_paths_.end()) {
1284       oLabelsToFill.reserve(itFound->size());
1285       for (size_t i = 0; i < itFound->size(); ++i) {
1286         oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1287       }
1288     }
1289   }
1290 
1291   void StreamSchedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1292                                                 std::vector<ModuleDescription const*>& descriptions,
1293                                                 unsigned int hint) const {
1294     descriptions.clear();
1295     bool found = false;
1296     TrigPaths::const_iterator itFound;
1297 
1298     if (hint < trig_paths_.size()) {
1299       itFound = trig_paths_.begin() + hint;
1300       if (itFound->name() == iPathLabel)
1301         found = true;
1302     }
1303     if (!found) {
1304       // if the hint did not work, do it the slow way
1305       itFound = std::find_if(
1306           trig_paths_.begin(),
1307           trig_paths_.end(),
1308           std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1309       if (itFound != trig_paths_.end())
1310         found = true;
1311     }
1312     if (found) {
1313       descriptions.reserve(itFound->size());
1314       for (size_t i = 0; i < itFound->size(); ++i) {
1315         descriptions.push_back(itFound->getWorker(i)->description());
1316       }
1317     }
1318   }
1319 
1320   void StreamSchedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1321                                                    std::vector<ModuleDescription const*>& descriptions,
1322                                                    unsigned int hint) const {
1323     descriptions.clear();
1324     bool found = false;
1325     TrigPaths::const_iterator itFound;
1326 
1327     if (hint < end_paths_.size()) {
1328       itFound = end_paths_.begin() + hint;
1329       if (itFound->name() == iEndPathLabel)
1330         found = true;
1331     }
1332     if (!found) {
1333       // if the hint did not work, do it the slow way
1334       itFound = std::find_if(
1335           end_paths_.begin(),
1336           end_paths_.end(),
1337           std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1338       if (itFound != end_paths_.end())
1339         found = true;
1340     }
1341     if (found) {
1342       descriptions.reserve(itFound->size());
1343       for (size_t i = 0; i < itFound->size(); ++i) {
1344         descriptions.push_back(itFound->getWorker(i)->description());
1345       }
1346     }
1347   }
1348 
1349   static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1350     sum.timesVisited += path.timesVisited(which);
1351     sum.timesPassed += path.timesPassed(which);
1352     sum.timesFailed += path.timesFailed(which);
1353     sum.timesExcept += path.timesExcept(which);
1354     sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1355     sum.bitPosition = path.bitPosition(which);
1356   }
1357 
1358   static void fillPathSummary(Path const& path, PathSummary& sum) {
1359     sum.name = path.name();
1360     sum.bitPosition = path.bitPosition();
1361     sum.timesRun += path.timesRun();
1362     sum.timesPassed += path.timesPassed();
1363     sum.timesFailed += path.timesFailed();
1364     sum.timesExcept += path.timesExcept();
1365 
1366     Path::size_type sz = path.size();
1367     if (sum.moduleInPathSummaries.empty()) {
1368       std::vector<ModuleInPathSummary> temp(sz);
1369       for (size_t i = 0; i != sz; ++i) {
1370         fillModuleInPathSummary(path, i, temp[i]);
1371       }
1372       sum.moduleInPathSummaries.swap(temp);
1373     } else {
1374       assert(sz == sum.moduleInPathSummaries.size());
1375       for (size_t i = 0; i != sz; ++i) {
1376         fillModuleInPathSummary(path, i, sum.moduleInPathSummaries[i]);
1377       }
1378     }
1379   }
1380 
1381   static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1382     sum.timesVisited += w.timesVisited();
1383     sum.timesRun += w.timesRun();
1384     sum.timesPassed += w.timesPassed();
1385     sum.timesFailed += w.timesFailed();
1386     sum.timesExcept += w.timesExcept();
1387     sum.moduleLabel = w.description()->moduleLabel();
1388   }
1389 
1390   static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1391 
1392   void StreamSchedule::getTriggerReport(TriggerReport& rep) const {
1393     rep.eventSummary.totalEvents += totalEvents();
1394     rep.eventSummary.totalEventsPassed += totalEventsPassed();
1395     rep.eventSummary.totalEventsFailed += totalEventsFailed();
1396 
1397     fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1398     fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1399     fill_summary(allWorkersLumisAndEvents(), rep.workerSummaries, &fillWorkerSummary);
1400   }
1401 
1402   void StreamSchedule::clearCounters() {
1403     using std::placeholders::_1;
1404     total_events_ = total_passed_ = 0;
1405     for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1406     for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1407     for_all(allWorkersLumisAndEvents(), std::bind(&Worker::clearCounters, _1));
1408   }
1409 
1410   void StreamSchedule::resetAll() { results_->reset(); }
1411 
1412   void StreamSchedule::addToAllWorkers(Worker* w) { workerManagerLumisAndEvents_.addToAllWorkers(w); }
1413 
1414   void StreamSchedule::resetEarlyDelete() {
1415     //must be sure we have cleared the count first
1416     for (auto& count : earlyDeleteBranchToCount_) {
1417       count.count = 0;
1418     }
1419     //now reset based on how many helpers use that branch
1420     for (auto& index : earlyDeleteHelperToBranchIndicies_) {
1421       ++(earlyDeleteBranchToCount_[index].count);
1422     }
1423     for (auto& helper : earlyDeleteHelpers_) {
1424       helper.reset();
1425     }
1426   }
1427 
1428   void StreamSchedule::makePathStatusInserters(
1429       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1430       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1431       ExceptionToActionTable const& actions) {
1432     int bitpos = 0;
1433     unsigned int indexEmpty = 0;
1434     unsigned int indexOfPath = 0;
1435     for (auto& pathStatusInserter : pathStatusInserters) {
1436       std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1437       auto workerPtr = workerManagerLumisAndEvents_.getWorkerForModule(*inserterPtr);
1438       pathStatusInserterWorkers_.emplace_back(workerPtr);
1439       // A little complexity here because a C++ Path object is not
1440       // instantiated and put into end_paths if there are no modules
1441       // on the configured path.
1442       if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1443         ++indexEmpty;
1444       } else {
1445         trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr);
1446         ++indexOfPath;
1447       }
1448       ++bitpos;
1449     }
1450 
1451     bitpos = 0;
1452     indexEmpty = 0;
1453     indexOfPath = 0;
1454     for (auto& endPathStatusInserter : endPathStatusInserters) {
1455       std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1456       auto workerPtr = workerManagerLumisAndEvents_.getWorkerForModule(*inserterPtr);
1457       endPathStatusInserterWorkers_.emplace_back(workerPtr);
1458       // A little complexity here because a C++ Path object is not
1459       // instantiated and put into end_paths if there are no modules
1460       // on the configured path.
1461       if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1462         ++indexEmpty;
1463       } else {
1464         end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr);
1465         ++indexOfPath;
1466       }
1467       ++bitpos;
1468     }
1469   }
1470 
1471   void StreamSchedule::handleException(StreamContext const& streamContext,
1472                                        bool cleaningUpAfterException,
1473                                        std::exception_ptr& excpt) const noexcept {
1474     //add context information to the exception and print message
1475     try {
1476       convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
1477     } catch (cms::Exception& ex) {
1478       std::ostringstream ost;
1479       // In most cases the exception will already have context at this point,
1480       // but add some context here in those rare cases where it does not.
1481       if (ex.context().empty()) {
1482         exceptionContext(ost, streamContext);
1483       }
1484       addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
1485       excpt = std::current_exception();
1486     }
1487     // We are already handling an earlier exception, so ignore it
1488     // if this signal results in another exception being thrown.
1489     CMS_SA_ALLOW try {
1490       actReg_->preStreamEarlyTerminationSignal_(streamContext, TerminationOrigin::ExceptionFromThisContext);
1491     } catch (...) {
1492     }
1493   }
1494 }  // namespace edm