Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-05-21 03:38:51

0001 #include "FWCore/Framework/interface/StreamSchedule.h"
0002 
0003 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0004 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0005 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0006 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0007 #include "FWCore/Framework/src/OutputModuleDescription.h"
0008 #include "FWCore/Framework/interface/TriggerNamesService.h"
0009 #include "FWCore/Framework/src/TriggerReport.h"
0010 #include "FWCore/Framework/src/TriggerTimingReport.h"
0011 #include "FWCore/Framework/src/Factory.h"
0012 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0013 #include "FWCore/Framework/src/TriggerResultInserter.h"
0014 #include "FWCore/Framework/src/PathStatusInserter.h"
0015 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0016 #include "FWCore/Framework/interface/WorkerInPath.h"
0017 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0018 #include "FWCore/Framework/interface/maker/WorkerT.h"
0019 #include "FWCore/Framework/interface/ModuleRegistry.h"
0020 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 #include "FWCore/ParameterSet/interface/Registry.h"
0024 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0025 #include "FWCore/Utilities/interface/Algorithms.h"
0026 #include "FWCore/Utilities/interface/ConvertException.h"
0027 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0028 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0029 
0030 #include "LuminosityBlockProcessingStatus.h"
0031 #include "processEDAliases.h"
0032 
0033 #include <algorithm>
0034 #include <cassert>
0035 #include <cstdlib>
0036 #include <functional>
0037 #include <iomanip>
0038 #include <list>
0039 #include <map>
0040 #include <exception>
0041 
0042 namespace edm {
0043 
0044   namespace {
0045 
0046     // Function template to transform each element in the input range to
0047     // a value placed into the output range. The supplied function
0048     // should take a const_reference to the 'input', and write to a
0049     // reference to the 'output'.
0050     template <typename InputIterator, typename ForwardIterator, typename Func>
0051     void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
0052       for (; begin != end; ++begin, ++out)
0053         func(*begin, *out);
0054     }
0055 
0056     // Function template that takes a sequence 'from', a sequence
0057     // 'to', and a callable object 'func'. It and applies
0058     // transform_into to fill the 'to' sequence with the values
0059     // calcuated by the callable object, taking care to fill the
0060     // outupt only if all calls succeed.
0061     template <typename FROM, typename TO, typename FUNC>
0062     void fill_summary(FROM const& from, TO& to, FUNC func) {
0063       if (to.size() != from.size()) {
0064         TO temp(from.size());
0065         transform_into(from.begin(), from.end(), temp.begin(), func);
0066         to.swap(temp);
0067       } else {
0068         transform_into(from.begin(), from.end(), to.begin(), func);
0069       }
0070     }
0071 
0072     // -----------------------------
0073 
0074     // Here we make the trigger results inserter directly.  This should
0075     // probably be a utility in the WorkerRegistry or elsewhere.
0076 
0077     StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
0078                                            std::shared_ptr<ActivityRegistry> areg,
0079                                            std::shared_ptr<TriggerResultInserter> inserter) {
0080       StreamSchedule::WorkerPtr ptr(
0081           new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
0082       ptr->setActivityRegistry(areg);
0083       return ptr;
0084     }
0085 
0086     void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
0087                                          ProductRegistry const& preg,
0088                                          std::multimap<std::string, Worker*>& branchToReadingWorker) {
0089       auto vBranchesToDeleteEarly = branchesToDeleteEarly;
0090       // Remove any duplicates
0091       std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
0092       vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
0093                                    vBranchesToDeleteEarly.end());
0094 
0095       // Are the requested items in the product registry?
0096       auto allBranchNames = preg.allBranchNames();
0097       //the branch names all end with a period, which we do not want to compare with
0098       for (auto& b : allBranchNames) {
0099         b.resize(b.size() - 1);
0100       }
0101       std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
0102       std::vector<std::string> temp;
0103       temp.reserve(vBranchesToDeleteEarly.size());
0104 
0105       std::set_intersection(vBranchesToDeleteEarly.begin(),
0106                             vBranchesToDeleteEarly.end(),
0107                             allBranchNames.begin(),
0108                             allBranchNames.end(),
0109                             std::back_inserter(temp));
0110       vBranchesToDeleteEarly.swap(temp);
0111       if (temp.size() != vBranchesToDeleteEarly.size()) {
0112         std::vector<std::string> missingProducts;
0113         std::set_difference(temp.begin(),
0114                             temp.end(),
0115                             vBranchesToDeleteEarly.begin(),
0116                             vBranchesToDeleteEarly.end(),
0117                             std::back_inserter(missingProducts));
0118         LogInfo l("MissingProductsForCanDeleteEarly");
0119         l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
0120         for (auto const& n : missingProducts) {
0121           l << "\n " << n;
0122         }
0123       }
0124       //set placeholder for the branch, we will remove the nullptr if a
0125       // module actually wants the branch.
0126       for (auto const& branch : vBranchesToDeleteEarly) {
0127         branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
0128       }
0129     }
0130   }  // namespace
0131 
0132   // -----------------------------
0133 
0134   typedef std::vector<std::string> vstring;
0135 
0136   // -----------------------------
0137 
0138   StreamSchedule::StreamSchedule(
0139       std::shared_ptr<TriggerResultInserter> inserter,
0140       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0141       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0142       std::shared_ptr<ModuleRegistry> modReg,
0143       ParameterSet& proc_pset,
0144       service::TriggerNamesService const& tns,
0145       PreallocationConfiguration const& prealloc,
0146       ProductRegistry& preg,
0147       BranchIDListHelper& branchIDListHelper,
0148       ExceptionToActionTable const& actions,
0149       std::shared_ptr<ActivityRegistry> areg,
0150       std::shared_ptr<ProcessConfiguration> processConfiguration,
0151       StreamID streamID,
0152       ProcessContext const* processContext)
0153       : workerManager_(modReg, areg, actions),
0154         actReg_(areg),
0155         results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
0156         results_inserter_(),
0157         trig_paths_(),
0158         end_paths_(),
0159         total_events_(),
0160         total_passed_(),
0161         number_of_unscheduled_modules_(0),
0162         streamID_(streamID),
0163         streamContext_(streamID_, processContext),
0164         skippingEvent_(false) {
0165     bool hasPath = false;
0166     std::vector<std::string> const& pathNames = tns.getTrigPaths();
0167     std::vector<std::string> const& endPathNames = tns.getEndPaths();
0168 
0169     int trig_bitpos = 0;
0170     trig_paths_.reserve(pathNames.size());
0171     for (auto const& trig_name : pathNames) {
0172       fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
0173       ++trig_bitpos;
0174       hasPath = true;
0175     }
0176 
0177     if (hasPath) {
0178       // the results inserter stands alone
0179       inserter->setTrigResultForStream(streamID.value(), results());
0180 
0181       results_inserter_ = makeInserter(actions, actReg_, inserter);
0182       addToAllWorkers(results_inserter_.get());
0183     }
0184 
0185     // fill normal endpaths
0186     int bitpos = 0;
0187     end_paths_.reserve(endPathNames.size());
0188     for (auto const& end_path_name : endPathNames) {
0189       fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
0190       ++bitpos;
0191     }
0192 
0193     makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
0194 
0195     //See if all modules were used
0196     std::set<std::string> usedWorkerLabels;
0197     for (auto const& worker : allWorkers()) {
0198       usedWorkerLabels.insert(worker->description()->moduleLabel());
0199     }
0200     std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0201     std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0202     std::vector<std::string> unusedLabels;
0203     set_difference(modulesInConfigSet.begin(),
0204                    modulesInConfigSet.end(),
0205                    usedWorkerLabels.begin(),
0206                    usedWorkerLabels.end(),
0207                    back_inserter(unusedLabels));
0208     std::set<std::string> unscheduledLabels;
0209     std::vector<std::string> shouldBeUsedLabels;
0210     if (!unusedLabels.empty()) {
0211       //Need to
0212       // 1) create worker
0213       // 2) if it is a WorkerT<EDProducer>, add it to our list
0214       // 3) hand list to our delayed reader
0215       for (auto const& label : unusedLabels) {
0216         bool isTracked;
0217         ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
0218         assert(isTracked);
0219         assert(modulePSet != nullptr);
0220         workerManager_.addToUnscheduledWorkers(
0221             *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0222       }
0223       if (!shouldBeUsedLabels.empty()) {
0224         std::ostringstream unusedStream;
0225         unusedStream << "'" << shouldBeUsedLabels.front() << "'";
0226         for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
0227                                                 itLabelEnd = shouldBeUsedLabels.end();
0228              itLabel != itLabelEnd;
0229              ++itLabel) {
0230           unusedStream << ",'" << *itLabel << "'";
0231         }
0232         LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
0233       }
0234     }
0235     number_of_unscheduled_modules_ = unscheduledLabels.size();
0236   }  // StreamSchedule::StreamSchedule
0237 
0238   void StreamSchedule::initializeEarlyDelete(ModuleRegistry& modReg,
0239                                              std::vector<std::string> const& branchesToDeleteEarly,
0240                                              edm::ProductRegistry const& preg) {
0241     // setup the list with those products actually registered for this job
0242     std::multimap<std::string, Worker*> branchToReadingWorker;
0243     initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
0244 
0245     const std::vector<std::string> kEmpty;
0246     std::map<Worker*, unsigned int> reserveSizeForWorker;
0247     unsigned int upperLimitOnReadingWorker = 0;
0248     unsigned int upperLimitOnIndicies = 0;
0249     unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
0250 
0251     //talk with output modules first
0252     modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
0253       auto comm = iHolder->createOutputModuleCommunicator();
0254       if (comm) {
0255         if (!branchToReadingWorker.empty()) {
0256           //If an OutputModule needs a product, we can't delete it early
0257           // so we should remove it from our list
0258           SelectedProductsForBranchType const& kept = comm->keptProducts();
0259           for (auto const& item : kept[InEvent]) {
0260             BranchDescription const& desc = *item.first;
0261             auto found = branchToReadingWorker.equal_range(desc.branchName());
0262             if (found.first != found.second) {
0263               --nUniqueBranchesToDelete;
0264               branchToReadingWorker.erase(found.first, found.second);
0265             }
0266           }
0267         }
0268       }
0269     });
0270 
0271     if (branchToReadingWorker.empty()) {
0272       return;
0273     }
0274 
0275     for (auto w : allWorkers()) {
0276       //determine if this module could read a branch we want to delete early
0277       auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
0278       if (nullptr != pset) {
0279         auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
0280         if (not branches.empty()) {
0281           ++upperLimitOnReadingWorker;
0282         }
0283         for (auto const& branch : branches) {
0284           auto found = branchToReadingWorker.equal_range(branch);
0285           if (found.first != found.second) {
0286             ++upperLimitOnIndicies;
0287             ++reserveSizeForWorker[w];
0288             if (nullptr == found.first->second) {
0289               found.first->second = w;
0290             } else {
0291               branchToReadingWorker.insert(make_pair(found.first->first, w));
0292             }
0293           }
0294         }
0295       }
0296     }
0297     {
0298       auto it = branchToReadingWorker.begin();
0299       std::vector<std::string> unusedBranches;
0300       while (it != branchToReadingWorker.end()) {
0301         if (it->second == nullptr) {
0302           unusedBranches.push_back(it->first);
0303           //erasing the object invalidates the iterator so must advance it first
0304           auto temp = it;
0305           ++it;
0306           branchToReadingWorker.erase(temp);
0307         } else {
0308           ++it;
0309         }
0310       }
0311       if (not unusedBranches.empty()) {
0312         LogWarning l("UnusedProductsForCanDeleteEarly");
0313         l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
0314              " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
0315         for (auto const& n : unusedBranches) {
0316           l << "\n " << n;
0317         }
0318       }
0319     }
0320     if (!branchToReadingWorker.empty()) {
0321       earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
0322       earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
0323       earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
0324       std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
0325       std::string lastBranchName;
0326       size_t nextOpenIndex = 0;
0327       unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
0328       for (auto& branchAndWorker : branchToReadingWorker) {
0329         if (lastBranchName != branchAndWorker.first) {
0330           //have to put back the period we removed earlier in order to get the proper name
0331           BranchID bid(branchAndWorker.first + ".");
0332           earlyDeleteBranchToCount_.emplace_back(bid, 0U);
0333           lastBranchName = branchAndWorker.first;
0334         }
0335         auto found = alreadySeenWorkers.find(branchAndWorker.second);
0336         if (alreadySeenWorkers.end() == found) {
0337           //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
0338           // all the branches that might be read by this worker. However, initially we will only tell the
0339           // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
0340           // EarlyDeleteHelper will automatically advance its internal end pointer.
0341           size_t index = nextOpenIndex;
0342           size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
0343           earlyDeleteHelperToBranchIndicies_[index] = earlyDeleteBranchToCount_.size() - 1;
0344           earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
0345           branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
0346           alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
0347           nextOpenIndex += nIndices;
0348         } else {
0349           found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
0350         }
0351       }
0352 
0353       //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
0354       // space needed for each module
0355       auto itLast = earlyDeleteHelpers_.begin();
0356       for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
0357         if (itLast->end() != it->begin()) {
0358           //figure the offset for next Worker since it hasn't been moved yet so it has the original address
0359           unsigned int delta = it->begin() - itLast->end();
0360           it->shiftIndexPointers(delta);
0361 
0362           earlyDeleteHelperToBranchIndicies_.erase(
0363               earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0364               earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
0365         }
0366         itLast = it;
0367       }
0368       earlyDeleteHelperToBranchIndicies_.erase(
0369           earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
0370           earlyDeleteHelperToBranchIndicies_.end());
0371 
0372       //now tell the paths about the deleters
0373       for (auto& p : trig_paths_) {
0374         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0375       }
0376       for (auto& p : end_paths_) {
0377         p.setEarlyDeleteHelpers(alreadySeenWorkers);
0378       }
0379       resetEarlyDelete();
0380     }
0381   }
0382 
0383   static Worker* getWorker(std::string const& moduleLabel,
0384                            ParameterSet& proc_pset,
0385                            WorkerManager& workerManager,
0386                            ProductRegistry& preg,
0387                            PreallocationConfiguration const* prealloc,
0388                            std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0389     bool isTracked;
0390     ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0391     if (modpset == nullptr) {
0392       return nullptr;
0393     }
0394     assert(isTracked);
0395 
0396     return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
0397   }
0398 
0399   std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
0400       Worker* worker,
0401       std::unordered_set<std::string>& conditionalModules,
0402       std::multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0403       std::multimap<std::string, AliasInfo> const& aliasMap,
0404       ParameterSet& proc_pset,
0405       ProductRegistry& preg,
0406       PreallocationConfiguration const* prealloc,
0407       std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0408     std::vector<Worker*> returnValue;
0409     auto const& consumesInfo = worker->consumesInfo();
0410     auto moduleLabel = worker->description()->moduleLabel();
0411     using namespace productholderindexhelper;
0412     for (auto const& ci : consumesInfo) {
0413       if (not ci.skipCurrentProcess() and
0414           (ci.process().empty() or ci.process() == processConfiguration->processName())) {
0415         auto productModuleLabel = ci.label();
0416         if (productModuleLabel.empty()) {
0417           //this is a consumesMany request
0418           for (auto const& branch : conditionalModuleBranches) {
0419             //check that the conditional module has not been used
0420             if (conditionalModules.find(branch.first) == conditionalModules.end()) {
0421               continue;
0422             }
0423             if (ci.kindOfType() == edm::PRODUCT_TYPE) {
0424               if (branch.second->unwrappedTypeID() != ci.type()) {
0425                 continue;
0426               }
0427             } else {
0428               if (not typeIsViewCompatible(
0429                       ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
0430                 continue;
0431               }
0432             }
0433 
0434             auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0435             assert(condWorker);
0436 
0437             conditionalModules.erase(branch.first);
0438 
0439             auto dependents = tryToPlaceConditionalModules(condWorker,
0440                                                            conditionalModules,
0441                                                            conditionalModuleBranches,
0442                                                            aliasMap,
0443                                                            proc_pset,
0444                                                            preg,
0445                                                            prealloc,
0446                                                            processConfiguration);
0447             returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0448             returnValue.push_back(condWorker);
0449           }
0450         } else {
0451           //just a regular consumes
0452           bool productFromConditionalModule = false;
0453           auto itFound = conditionalModules.find(productModuleLabel);
0454           if (itFound == conditionalModules.end()) {
0455             //Check to see if this was an alias
0456             auto findAlias = aliasMap.equal_range(productModuleLabel);
0457             for (auto it = findAlias.first; it != findAlias.second; ++it) {
0458               //this was previously filtered so only the conditional modules remain
0459               productModuleLabel = it->second.originalModuleLabel;
0460               if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) {
0461                 if (it->second.friendlyClassName == "*" or
0462                     (ci.type().friendlyClassName() == it->second.friendlyClassName)) {
0463                   productFromConditionalModule = true;
0464                   //need to check the rest of the data product info
0465                   break;
0466                 } else if (ci.kindOfType() == ELEMENT_TYPE) {
0467                   //consume is a View so need to do more intrusive search
0468                   //find matching branches in module
0469                   auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
0470                   for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
0471                     if (it->second.originalInstanceLabel == "*" or
0472                         itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
0473                       if (typeIsViewCompatible(ci.type(),
0474                                                TypeID(itBranch->second->wrappedType().typeInfo()),
0475                                                itBranch->second->className())) {
0476                         productFromConditionalModule = true;
0477                         break;
0478                       }
0479                     }
0480                   }
0481                   if (productFromConditionalModule) {
0482                     break;
0483                   }
0484                 }
0485               }
0486             }
0487             if (productFromConditionalModule) {
0488               itFound = conditionalModules.find(productModuleLabel);
0489               //check that the alias-for conditional module has not been used
0490               if (itFound == conditionalModules.end()) {
0491                 continue;
0492               }
0493             }
0494           } else {
0495             //need to check the rest of the data product info
0496             auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
0497             for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
0498               if (itBranch->second->productInstanceName() == ci.instance()) {
0499                 if (ci.kindOfType() == PRODUCT_TYPE) {
0500                   if (ci.type() == itBranch->second->unwrappedTypeID()) {
0501                     productFromConditionalModule = true;
0502                     break;
0503                   }
0504                 } else {
0505                   //this is a view
0506                   if (typeIsViewCompatible(ci.type(),
0507                                            TypeID(itBranch->second->wrappedType().typeInfo()),
0508                                            itBranch->second->className())) {
0509                     productFromConditionalModule = true;
0510                     break;
0511                   }
0512                 }
0513               }
0514             }
0515           }
0516           if (productFromConditionalModule) {
0517             auto condWorker =
0518                 getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0519             assert(condWorker);
0520 
0521             conditionalModules.erase(itFound);
0522 
0523             auto dependents = tryToPlaceConditionalModules(condWorker,
0524                                                            conditionalModules,
0525                                                            conditionalModuleBranches,
0526                                                            aliasMap,
0527                                                            proc_pset,
0528                                                            preg,
0529                                                            prealloc,
0530                                                            processConfiguration);
0531             returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
0532             returnValue.push_back(condWorker);
0533           }
0534         }
0535       }
0536     }
0537     return returnValue;
0538   }
0539 
0540   void StreamSchedule::fillWorkers(ParameterSet& proc_pset,
0541                                    ProductRegistry& preg,
0542                                    PreallocationConfiguration const* prealloc,
0543                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0544                                    std::string const& pathName,
0545                                    bool ignoreFilters,
0546                                    PathWorkers& out,
0547                                    std::vector<std::string> const& endPathNames) {
0548     vstring modnames = proc_pset.getParameter<vstring>(pathName);
0549     PathWorkers tmpworkers;
0550 
0551     //Pull out ConditionalTask modules
0552     auto itCondBegin = std::find(modnames.begin(), modnames.end(), "#");
0553 
0554     std::unordered_set<std::string> conditionalmods;
0555     //An EDAlias may be redirecting to a module on a ConditionalTask
0556     std::multimap<std::string, AliasInfo> aliasMap;
0557     std::multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
0558     std::unordered_map<std::string, unsigned int> conditionalModOrder;
0559     if (itCondBegin != modnames.end()) {
0560       for (auto it = itCondBegin + 1; it != modnames.begin() + modnames.size() - 1; ++it) {
0561         // ordering needs to skip the # token in the path list
0562         conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
0563       }
0564       //the last entry should be ignored since it is required to be "@"
0565       conditionalmods = std::unordered_set<std::string>(
0566           std::make_move_iterator(itCondBegin + 1), std::make_move_iterator(modnames.begin() + modnames.size() - 1));
0567 
0568       for (auto const& cond : conditionalmods) {
0569         //force the creation of the conditional modules so alias check can work
0570         (void)getWorker(cond, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0571       }
0572       //find aliases
0573       {
0574         auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0575         std::string const star("*");
0576         for (auto const& alias : aliases) {
0577           auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
0578           auto aliasedToModuleLabels = info.getParameterNames();
0579           for (auto const& mod : aliasedToModuleLabels) {
0580             if (not mod.empty() and mod[0] != '@' and conditionalmods.find(mod) != conditionalmods.end()) {
0581               auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
0582               for (auto const& aliasPSet : aliasVPSet) {
0583                 std::string type = star;
0584                 std::string instance = star;
0585                 std::string originalInstance = star;
0586                 if (aliasPSet.exists("type")) {
0587                   type = aliasPSet.getParameter<std::string>("type");
0588                 }
0589                 if (aliasPSet.exists("toProductInstance")) {
0590                   instance = aliasPSet.getParameter<std::string>("toProductInstance");
0591                 }
0592                 if (aliasPSet.exists("fromProductInstance")) {
0593                   originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
0594                 }
0595 
0596                 aliasMap.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
0597               }
0598             }
0599           }
0600         }
0601       }
0602       //find SwitchProducers whose cases are aliases
0603       {
0604         auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
0605         std::vector<std::string> switchEDAliases;
0606         for (auto const& module : all_modules) {
0607           auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
0608           if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
0609             auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
0610             for (auto const& case_label : all_cases) {
0611               auto range = aliasMap.equal_range(case_label);
0612               if (range.first != range.second) {
0613                 switchEDAliases.push_back(case_label);
0614               }
0615             }
0616           }
0617         }
0618         detail::processEDAliases(
0619             switchEDAliases, conditionalmods, proc_pset, processConfiguration->processName(), preg);
0620       }
0621       {
0622         //find branches created by the conditional modules
0623         for (auto const& prod : preg.productList()) {
0624           if (conditionalmods.find(prod.first.moduleLabel()) != conditionalmods.end()) {
0625             conditionalModsBranches.emplace(prod.first.moduleLabel(), &prod.second);
0626           }
0627         }
0628       }
0629     }
0630     modnames.erase(itCondBegin, modnames.end());
0631 
0632     unsigned int placeInPath = 0;
0633     for (auto const& name : modnames) {
0634       //Modules except EDFilters are set to run concurrently by default
0635       bool doNotRunConcurrently = false;
0636       WorkerInPath::FilterAction filterAction = WorkerInPath::Normal;
0637       if (name[0] == '!') {
0638         filterAction = WorkerInPath::Veto;
0639       } else if (name[0] == '-' or name[0] == '+') {
0640         filterAction = WorkerInPath::Ignore;
0641       }
0642       if (name[0] == '|' or name[0] == '+') {
0643         //cms.wait was specified so do not run concurrently
0644         doNotRunConcurrently = true;
0645       }
0646 
0647       std::string moduleLabel = name;
0648       if (filterAction != WorkerInPath::Normal or name[0] == '|') {
0649         moduleLabel.erase(0, 1);
0650       }
0651 
0652       Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
0653       if (worker == nullptr) {
0654         std::string pathType("endpath");
0655         if (!search_all(endPathNames, pathName)) {
0656           pathType = std::string("path");
0657         }
0658         throw Exception(errors::Configuration)
0659             << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
0660             << "\"\n please check spelling or remove that label from the path.";
0661       }
0662 
0663       if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
0664         // We have a filter on an end path, and the filter is not explicitly ignored.
0665         // See if the filter is allowed.
0666         std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
0667         if (!search_all(allowed_filters, worker->description()->moduleName())) {
0668           // Filter is not allowed. Ignore the result, and issue a warning.
0669           filterAction = WorkerInPath::Ignore;
0670           LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
0671                                         << "' with module label '" << moduleLabel << "' appears on EndPath '"
0672                                         << pathName << "'.\n"
0673                                         << "The return value of the filter will be ignored.\n"
0674                                         << "To suppress this warning, either remove the filter from the endpath,\n"
0675                                         << "or explicitly ignore it in the configuration by using cms.ignore().\n";
0676         }
0677       }
0678       bool runConcurrently = not doNotRunConcurrently;
0679       if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
0680         runConcurrently = false;
0681       }
0682 
0683       auto condModules = tryToPlaceConditionalModules(
0684           worker, conditionalmods, conditionalModsBranches, aliasMap, proc_pset, preg, prealloc, processConfiguration);
0685       for (auto condMod : condModules) {
0686         tmpworkers.emplace_back(
0687             condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
0688       }
0689 
0690       tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
0691       ++placeInPath;
0692     }
0693 
0694     out.swap(tmpworkers);
0695   }
0696 
0697   void StreamSchedule::fillTrigPath(ParameterSet& proc_pset,
0698                                     ProductRegistry& preg,
0699                                     PreallocationConfiguration const* prealloc,
0700                                     std::shared_ptr<ProcessConfiguration const> processConfiguration,
0701                                     int bitpos,
0702                                     std::string const& name,
0703                                     TrigResPtr trptr,
0704                                     std::vector<std::string> const& endPathNames) {
0705     PathWorkers tmpworkers;
0706     fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
0707 
0708     // an empty path will cause an extra bit that is not used
0709     if (!tmpworkers.empty()) {
0710       trig_paths_.emplace_back(bitpos,
0711                                name,
0712                                tmpworkers,
0713                                trptr,
0714                                actionTable(),
0715                                actReg_,
0716                                &streamContext_,
0717                                &skippingEvent_,
0718                                PathContext::PathType::kPath);
0719     } else {
0720       empty_trig_paths_.push_back(bitpos);
0721     }
0722     for (WorkerInPath const& workerInPath : tmpworkers) {
0723       addToAllWorkers(workerInPath.getWorker());
0724     }
0725   }
0726 
0727   void StreamSchedule::fillEndPath(ParameterSet& proc_pset,
0728                                    ProductRegistry& preg,
0729                                    PreallocationConfiguration const* prealloc,
0730                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0731                                    int bitpos,
0732                                    std::string const& name,
0733                                    std::vector<std::string> const& endPathNames) {
0734     PathWorkers tmpworkers;
0735     fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
0736 
0737     if (!tmpworkers.empty()) {
0738       //EndPaths are not supposed to stop if SkipEvent type exception happens
0739       end_paths_.emplace_back(bitpos,
0740                               name,
0741                               tmpworkers,
0742                               TrigResPtr(),
0743                               actionTable(),
0744                               actReg_,
0745                               &streamContext_,
0746                               nullptr,
0747                               PathContext::PathType::kEndPath);
0748     } else {
0749       empty_end_paths_.push_back(bitpos);
0750     }
0751     for (WorkerInPath const& workerInPath : tmpworkers) {
0752       addToAllWorkers(workerInPath.getWorker());
0753     }
0754   }
0755 
0756   void StreamSchedule::beginStream() { workerManager_.beginStream(streamID_, streamContext_); }
0757 
0758   void StreamSchedule::endStream() { workerManager_.endStream(streamID_, streamContext_); }
0759 
0760   void StreamSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
0761     Worker* found = nullptr;
0762     for (auto const& worker : allWorkers()) {
0763       if (worker->description()->moduleLabel() == iLabel) {
0764         found = worker;
0765         break;
0766       }
0767     }
0768     if (nullptr == found) {
0769       return;
0770     }
0771 
0772     iMod->replaceModuleFor(found);
0773     found->beginStream(streamID_, streamContext_);
0774   }
0775 
0776   void StreamSchedule::deleteModule(std::string const& iLabel) { workerManager_.deleteModuleIfExists(iLabel); }
0777 
0778   std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
0779     std::vector<ModuleDescription const*> result;
0780     result.reserve(allWorkers().size());
0781 
0782     for (auto const& worker : allWorkers()) {
0783       ModuleDescription const* p = worker->description();
0784       result.push_back(p);
0785     }
0786     return result;
0787   }
0788 
0789   void StreamSchedule::processOneEventAsync(
0790       WaitingTaskHolder iTask,
0791       EventTransitionInfo& info,
0792       ServiceToken const& serviceToken,
0793       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
0794     EventPrincipal& ep = info.principal();
0795 
0796     // Caught exception is propagated via WaitingTaskHolder
0797     CMS_SA_ALLOW try {
0798       this->resetAll();
0799 
0800       using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
0801 
0802       Traits::setStreamContext(streamContext_, ep);
0803       //a service may want to communicate with another service
0804       ServiceRegistry::Operate guard(serviceToken);
0805       Traits::preScheduleSignal(actReg_.get(), &streamContext_);
0806 
0807       HLTPathStatus hltPathStatus(hlt::Pass, 0);
0808       for (int empty_trig_path : empty_trig_paths_) {
0809         results_->at(empty_trig_path) = hltPathStatus;
0810         pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
0811         std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
0812                                         ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0813                                             info, streamID_, ParentContext(&streamContext_), &streamContext_);
0814         if (except) {
0815           iTask.doneWaiting(except);
0816           return;
0817         }
0818       }
0819       for (int empty_end_path : empty_end_paths_) {
0820         std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
0821                                         ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0822                                             info, streamID_, ParentContext(&streamContext_), &streamContext_);
0823         if (except) {
0824           iTask.doneWaiting(except);
0825           return;
0826         }
0827       }
0828 
0829       workerManager_.setupResolvers(ep);
0830       workerManager_.setupOnDemandSystem(info);
0831 
0832       ++total_events_;
0833 
0834       //use to give priorities on an error to ones from Paths
0835       auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
0836       auto pathErrorPtr = pathErrorHolder.get();
0837       ServiceWeakToken weakToken = serviceToken;
0838       auto allPathsDone = make_waiting_task(
0839           [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
0840             ServiceRegistry::Operate operate(weakToken.lock());
0841 
0842             std::exception_ptr ptr;
0843             if (pathError->load()) {
0844               ptr = *pathError->load();
0845               delete pathError->load();
0846             }
0847             if ((not ptr) and iPtr) {
0848               ptr = *iPtr;
0849             }
0850             iTask.doneWaiting(finishProcessOneEvent(ptr));
0851           });
0852       //The holder guarantees that if the paths finish before the loop ends
0853       // that we do not start too soon. It also guarantees that the task will
0854       // run under that condition.
0855       WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
0856 
0857       auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
0858                                              std::exception_ptr const* iPtr) mutable {
0859         ServiceRegistry::Operate operate(weakToken.lock());
0860 
0861         if (iPtr) {
0862           //this is used to prioritize this error over one
0863           // that happens in EndPath or Accumulate
0864           pathErrorPtr->store(new std::exception_ptr(*iPtr));
0865         }
0866         finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
0867       });
0868 
0869       //The holder guarantees that if the paths finish before the loop ends
0870       // that we do not start too soon. It also guarantees that the task will
0871       // run under that condition.
0872       WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
0873 
0874       //start end paths first so on single threaded the paths will run first
0875       WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
0876       for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
0877         it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
0878       }
0879 
0880       for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
0881         it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
0882       }
0883 
0884       ParentContext parentContext(&streamContext_);
0885       workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0886           hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
0887     } catch (...) {
0888       iTask.doneWaiting(std::current_exception());
0889     }
0890   }
0891 
0892   void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
0893                                      WaitingTaskHolder iWait,
0894                                      EventTransitionInfo& info) {
0895     if (iExcept) {
0896       // Caught exception is propagated via WaitingTaskHolder
0897       CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
0898         exception_actions::ActionCodes action = actionTable().find(e.category());
0899         assert(action != exception_actions::IgnoreCompletely);
0900         assert(action != exception_actions::FailPath);
0901         if (action == exception_actions::SkipEvent) {
0902           edm::printCmsExceptionWarning("SkipEvent", e);
0903           *(iExcept.load()) = std::exception_ptr();
0904         } else {
0905           *(iExcept.load()) = std::current_exception();
0906         }
0907       } catch (...) {
0908         *(iExcept.load()) = std::current_exception();
0909       }
0910     }
0911 
0912     if ((not iExcept) and results_->accept()) {
0913       ++total_passed_;
0914     }
0915 
0916     if (nullptr != results_inserter_.get()) {
0917       // Caught exception is propagated to the caller
0918       CMS_SA_ALLOW try {
0919         //Even if there was an exception, we need to allow results inserter
0920         // to run since some module may be waiting on its results.
0921         ParentContext parentContext(&streamContext_);
0922         using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
0923 
0924         auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
0925         if (expt) {
0926           std::rethrow_exception(expt);
0927         }
0928       } catch (cms::Exception& ex) {
0929         if (not iExcept) {
0930           if (ex.context().empty()) {
0931             std::ostringstream ost;
0932             ost << "Processing Event " << info.principal().id();
0933             ex.addContext(ost.str());
0934           }
0935           iExcept.store(new std::exception_ptr(std::current_exception()));
0936         }
0937       } catch (...) {
0938         if (not iExcept) {
0939           iExcept.store(new std::exception_ptr(std::current_exception()));
0940         }
0941       }
0942     }
0943     std::exception_ptr ptr;
0944     if (iExcept) {
0945       ptr = *iExcept.load();
0946     }
0947     iWait.doneWaiting(ptr);
0948   }
0949 
0950   std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
0951     using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
0952 
0953     if (iExcept) {
0954       //add context information to the exception and print message
0955       try {
0956         convertException::wrap([&]() { std::rethrow_exception(iExcept); });
0957       } catch (cms::Exception& ex) {
0958         bool const cleaningUpAfterException = false;
0959         if (ex.context().empty()) {
0960           addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
0961         } else {
0962           addContextAndPrintException("", ex, cleaningUpAfterException);
0963         }
0964         iExcept = std::current_exception();
0965       }
0966 
0967       actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
0968     }
0969     // Caught exception is propagated to the caller
0970     CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
0971       if (not iExcept) {
0972         iExcept = std::current_exception();
0973       }
0974     }
0975     if (not iExcept) {
0976       resetEarlyDelete();
0977     }
0978 
0979     return iExcept;
0980   }
0981 
0982   void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
0983     oLabelsToFill.reserve(trig_paths_.size());
0984     std::transform(trig_paths_.begin(),
0985                    trig_paths_.end(),
0986                    std::back_inserter(oLabelsToFill),
0987                    std::bind(&Path::name, std::placeholders::_1));
0988   }
0989 
0990   void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
0991     TrigPaths::const_iterator itFound = std::find_if(
0992         trig_paths_.begin(),
0993         trig_paths_.end(),
0994         std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
0995     if (itFound != trig_paths_.end()) {
0996       oLabelsToFill.reserve(itFound->size());
0997       for (size_t i = 0; i < itFound->size(); ++i) {
0998         oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
0999       }
1000     }
1001   }
1002 
1003   void StreamSchedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1004                                                 std::vector<ModuleDescription const*>& descriptions,
1005                                                 unsigned int hint) const {
1006     descriptions.clear();
1007     bool found = false;
1008     TrigPaths::const_iterator itFound;
1009 
1010     if (hint < trig_paths_.size()) {
1011       itFound = trig_paths_.begin() + hint;
1012       if (itFound->name() == iPathLabel)
1013         found = true;
1014     }
1015     if (!found) {
1016       // if the hint did not work, do it the slow way
1017       itFound = std::find_if(
1018           trig_paths_.begin(),
1019           trig_paths_.end(),
1020           std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1021       if (itFound != trig_paths_.end())
1022         found = true;
1023     }
1024     if (found) {
1025       descriptions.reserve(itFound->size());
1026       for (size_t i = 0; i < itFound->size(); ++i) {
1027         descriptions.push_back(itFound->getWorker(i)->description());
1028       }
1029     }
1030   }
1031 
1032   void StreamSchedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1033                                                    std::vector<ModuleDescription const*>& descriptions,
1034                                                    unsigned int hint) const {
1035     descriptions.clear();
1036     bool found = false;
1037     TrigPaths::const_iterator itFound;
1038 
1039     if (hint < end_paths_.size()) {
1040       itFound = end_paths_.begin() + hint;
1041       if (itFound->name() == iEndPathLabel)
1042         found = true;
1043     }
1044     if (!found) {
1045       // if the hint did not work, do it the slow way
1046       itFound = std::find_if(
1047           end_paths_.begin(),
1048           end_paths_.end(),
1049           std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1050       if (itFound != end_paths_.end())
1051         found = true;
1052     }
1053     if (found) {
1054       descriptions.reserve(itFound->size());
1055       for (size_t i = 0; i < itFound->size(); ++i) {
1056         descriptions.push_back(itFound->getWorker(i)->description());
1057       }
1058     }
1059   }
1060 
1061   static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1062     sum.timesVisited += path.timesVisited(which);
1063     sum.timesPassed += path.timesPassed(which);
1064     sum.timesFailed += path.timesFailed(which);
1065     sum.timesExcept += path.timesExcept(which);
1066     sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1067     sum.bitPosition = path.bitPosition(which);
1068   }
1069 
1070   static void fillPathSummary(Path const& path, PathSummary& sum) {
1071     sum.name = path.name();
1072     sum.bitPosition = path.bitPosition();
1073     sum.timesRun += path.timesRun();
1074     sum.timesPassed += path.timesPassed();
1075     sum.timesFailed += path.timesFailed();
1076     sum.timesExcept += path.timesExcept();
1077 
1078     Path::size_type sz = path.size();
1079     if (sum.moduleInPathSummaries.empty()) {
1080       std::vector<ModuleInPathSummary> temp(sz);
1081       for (size_t i = 0; i != sz; ++i) {
1082         fillModuleInPathSummary(path, i, temp[i]);
1083       }
1084       sum.moduleInPathSummaries.swap(temp);
1085     } else {
1086       assert(sz == sum.moduleInPathSummaries.size());
1087       for (size_t i = 0; i != sz; ++i) {
1088         fillModuleInPathSummary(path, i, sum.moduleInPathSummaries[i]);
1089       }
1090     }
1091   }
1092 
1093   static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1094     sum.timesVisited += w.timesVisited();
1095     sum.timesRun += w.timesRun();
1096     sum.timesPassed += w.timesPassed();
1097     sum.timesFailed += w.timesFailed();
1098     sum.timesExcept += w.timesExcept();
1099     sum.moduleLabel = w.description()->moduleLabel();
1100   }
1101 
1102   static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1103 
1104   void StreamSchedule::getTriggerReport(TriggerReport& rep) const {
1105     rep.eventSummary.totalEvents += totalEvents();
1106     rep.eventSummary.totalEventsPassed += totalEventsPassed();
1107     rep.eventSummary.totalEventsFailed += totalEventsFailed();
1108 
1109     fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1110     fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1111     fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1112   }
1113 
1114   void StreamSchedule::clearCounters() {
1115     using std::placeholders::_1;
1116     total_events_ = total_passed_ = 0;
1117     for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1118     for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1119     for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1120   }
1121 
1122   void StreamSchedule::resetAll() {
1123     skippingEvent_ = false;
1124     results_->reset();
1125   }
1126 
1127   void StreamSchedule::addToAllWorkers(Worker* w) { workerManager_.addToAllWorkers(w); }
1128 
1129   void StreamSchedule::resetEarlyDelete() {
1130     //must be sure we have cleared the count first
1131     for (auto& count : earlyDeleteBranchToCount_) {
1132       count.count = 0;
1133     }
1134     //now reset based on how many helpers use that branch
1135     for (auto& index : earlyDeleteHelperToBranchIndicies_) {
1136       ++(earlyDeleteBranchToCount_[index].count);
1137     }
1138     for (auto& helper : earlyDeleteHelpers_) {
1139       helper.reset();
1140     }
1141   }
1142 
1143   void StreamSchedule::makePathStatusInserters(
1144       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1145       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1146       ExceptionToActionTable const& actions) {
1147     int bitpos = 0;
1148     unsigned int indexEmpty = 0;
1149     unsigned int indexOfPath = 0;
1150     for (auto& pathStatusInserter : pathStatusInserters) {
1151       std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1152       WorkerPtr workerPtr(
1153           new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1154       pathStatusInserterWorkers_.emplace_back(workerPtr);
1155       workerPtr->setActivityRegistry(actReg_);
1156       addToAllWorkers(workerPtr.get());
1157 
1158       // A little complexity here because a C++ Path object is not
1159       // instantiated and put into end_paths if there are no modules
1160       // on the configured path.
1161       if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1162         ++indexEmpty;
1163       } else {
1164         trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1165         ++indexOfPath;
1166       }
1167       ++bitpos;
1168     }
1169 
1170     bitpos = 0;
1171     indexEmpty = 0;
1172     indexOfPath = 0;
1173     for (auto& endPathStatusInserter : endPathStatusInserters) {
1174       std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1175       WorkerPtr workerPtr(
1176           new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1177       endPathStatusInserterWorkers_.emplace_back(workerPtr);
1178       workerPtr->setActivityRegistry(actReg_);
1179       addToAllWorkers(workerPtr.get());
1180 
1181       // A little complexity here because a C++ Path object is not
1182       // instantiated and put into end_paths if there are no modules
1183       // on the configured path.
1184       if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1185         ++indexEmpty;
1186       } else {
1187         end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1188         ++indexOfPath;
1189       }
1190       ++bitpos;
1191     }
1192   }
1193 }  // namespace edm