Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-29 22:58:04

0001 #include "FWCore/Framework/interface/Schedule.h"
0002 
0003 #include "DataFormats/Common/interface/setIsMergeable.h"
0004 #include "DataFormats/Common/interface/TriggerResults.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0008 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 #include "FWCore/AbstractServices/interface/RandomNumberGenerator.h"
0011 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0012 #include "FWCore/Framework/interface/ConsumesCollector.h"
0013 #include "FWCore/Framework/interface/EDConsumerBase.h"
0014 #include "FWCore/Framework/src/OutputModuleDescription.h"
0015 #include "FWCore/Framework/interface/TriggerNamesService.h"
0016 #include "FWCore/Framework/src/TriggerReport.h"
0017 #include "FWCore/Framework/src/TriggerTimingReport.h"
0018 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0019 #include "FWCore/Framework/src/ModuleHolderFactory.h"
0020 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0021 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0022 #include "FWCore/Framework/interface/ModuleRegistry.h"
0023 #include "FWCore/Framework/interface/ModuleRegistryUtilities.h"
0024 #include "FWCore/Framework/src/TriggerResultInserter.h"
0025 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0026 #include "FWCore/Framework/src/PathStatusInserter.h"
0027 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0028 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0029 #include "FWCore/Framework/interface/ComponentDescription.h"
0030 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0031 #include "FWCore/Concurrency/interface/chain_first.h"
0032 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0033 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0034 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0035 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0036 #include "FWCore/ServiceRegistry/interface/ModuleConsumesInfo.h"
0037 #include "FWCore/Utilities/interface/Algorithms.h"
0038 #include "FWCore/Utilities/interface/ConvertException.h"
0039 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0040 #include "FWCore/Utilities/interface/TypeID.h"
0041 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0042 
0043 #include <array>
0044 #include <algorithm>
0045 #include <cassert>
0046 #include <cstdlib>
0047 #include <functional>
0048 #include <iomanip>
0049 #include <list>
0050 #include <map>
0051 #include <set>
0052 #include <exception>
0053 #include <sstream>
0054 
0055 #include "make_shared_noexcept_false.h"
0056 #include "processEDAliases.h"
0057 
0058 namespace edm {
0059 
0060   class ModuleMakerBase;
0061 
0062   namespace {
0063     using std::placeholders::_1;
0064 
0065     bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0066       return std::binary_search(v.begin(), v.end(), s);
0067     }
0068 
0069     // Here we make the trigger results inserter directly.  This should
0070     // probably be a utility in the WorkerRegistry or elsewhere.
0071 
0072     std::shared_ptr<TriggerResultInserter> makeInserter(ParameterSet& proc_pset,
0073                                                         PreallocationConfiguration const& iPrealloc,
0074                                                         SignallingProductRegistryFiller& preg,
0075                                                         ExceptionToActionTable const& actions,
0076                                                         std::shared_ptr<ActivityRegistry> areg,
0077                                                         std::shared_ptr<ProcessConfiguration const> processConfiguration,
0078                                                         ModuleRegistry& moduleRegistry) {
0079       ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
0080       trig_pset->registerIt();
0081 
0082       WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
0083       ModuleDescription md(trig_pset->id(),
0084                            "TriggerResultInserter",
0085                            "TriggerResults",
0086                            processConfiguration.get(),
0087                            ModuleDescription::getUniqueID());
0088 
0089       return moduleRegistry.makeExplicitModule<TriggerResultInserter>(md,
0090                                                                       iPrealloc,
0091                                                                       &preg,
0092                                                                       areg->preModuleConstructionSignal_,
0093                                                                       areg->postModuleConstructionSignal_,
0094                                                                       *trig_pset,
0095                                                                       iPrealloc.numberOfStreams());
0096     }
0097 
0098     template <typename T>
0099     void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
0100                                  std::vector<std::string> const& pathNames,
0101                                  PreallocationConfiguration const& iPrealloc,
0102                                  SignallingProductRegistryFiller& preg,
0103                                  std::shared_ptr<ActivityRegistry> areg,
0104                                  std::shared_ptr<ProcessConfiguration const> processConfiguration,
0105                                  ModuleRegistry& moduleRegistry,
0106                                  std::string const& moduleTypeName) {
0107       ParameterSet pset;
0108       pset.addParameter<std::string>("@module_type", moduleTypeName);
0109       pset.addParameter<std::string>("@module_edm_type", "EDProducer");
0110       pset.registerIt();
0111 
0112       pathStatusInserters.reserve(pathNames.size());
0113 
0114       for (auto const& pathName : pathNames) {
0115         ModuleDescription md(
0116             pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
0117         auto module = moduleRegistry.makeExplicitModule<T>(md,
0118                                                            iPrealloc,
0119                                                            &preg,
0120                                                            areg->preModuleConstructionSignal_,
0121                                                            areg->postModuleConstructionSignal_,
0122                                                            iPrealloc.numberOfStreams());
0123         pathStatusInserters.emplace_back(std::move(module));
0124       }
0125     }
0126 
0127     typedef std::vector<std::string> vstring;
0128 
0129     void processSwitchProducers(ParameterSet const& proc_pset,
0130                                 std::string const& processName,
0131                                 SignallingProductRegistryFiller& preg) {
0132       // Update Switch ProductDescriptions for the chosen case
0133       struct BranchesCases {
0134         BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0135         std::vector<BranchKey> chosenBranches;
0136         std::vector<std::string> caseLabels;
0137       };
0138       std::map<std::string, BranchesCases> switchMap;
0139       for (auto& prod : preg.productListUpdator()) {
0140         if (prod.second.isSwitchAlias()) {
0141           auto it = switchMap.find(prod.second.moduleLabel());
0142           if (it == switchMap.end()) {
0143             auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0144             auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0145                                               switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0146             assert(inserted.second);
0147             it = inserted.first;
0148           }
0149 
0150           bool found = false;
0151           for (auto const& productIter : preg.registry().productList()) {
0152             BranchKey const& branchKey = productIter.first;
0153             // The alias-for product must be in the same process as
0154             // the SwitchProducer (earlier processes
0155             // may contain products with same type, module label, and
0156             // instance name)
0157             if (branchKey.processName() != processName) {
0158               continue;
0159             }
0160 
0161             ProductDescription const& desc = productIter.second;
0162             if (desc.branchType() == prod.second.branchType() and
0163                 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0164                 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0165                 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0166               prod.second.setSwitchAliasForBranch(desc);
0167               if (!prod.second.transient()) {
0168                 it->second.chosenBranches.push_back(prod.first);  // with moduleLabel of the Switch
0169               }
0170               found = true;
0171             }
0172           }
0173           if (not found) {
0174             Exception ex(errors::LogicError);
0175             ex << "Trying to find a ProductDescription to be aliased-for by SwitchProducer with\n"
0176                << "  friendly class name = " << prod.second.friendlyClassName() << "\n"
0177                << "  module label = " << prod.second.moduleLabel() << "\n"
0178                << "  product instance name = " << prod.second.productInstanceName() << "\n"
0179                << "  process name = " << processName
0180                << "\n\nbut did not find any. Please contact a framework developer.";
0181             ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0182             throw ex;
0183           }
0184         }
0185       }
0186       if (switchMap.empty())
0187         return;
0188 
0189       for (auto& elem : switchMap) {
0190         std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0191       }
0192 
0193       auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0194         std::map<std::string, std::vector<BranchKey>> caseBranches;
0195         for (auto const& item : preg.registry().productList()) {
0196           if (item.first.processName() != processName)
0197             continue;
0198 
0199           if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0200               found != caseLabels.end()) {
0201             caseBranches[*found].push_back(item.first);
0202           }
0203         }
0204 
0205         for (auto const& caseLabel : caseLabels) {
0206           ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0207           auto& branches = caseBranches[caseLabel];
0208           std::sort(branches.begin(), branches.end());
0209           for (auto const& branch : branches) {
0210             ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0211           }
0212         }
0213       };
0214 
0215       // Check that non-chosen cases declare exactly the same non-transient branches
0216       // Also set the alias-for branches to transient
0217       std::vector<bool> foundBranches;
0218       for (auto const& switchItem : switchMap) {
0219         auto const& switchLabel = switchItem.first;
0220         auto const& chosenBranches = switchItem.second.chosenBranches;
0221         auto const& caseLabels = switchItem.second.caseLabels;
0222         foundBranches.resize(chosenBranches.size());
0223         for (auto const& caseLabel : caseLabels) {
0224           std::fill(foundBranches.begin(), foundBranches.end(), false);
0225           for (auto& nonConstItem : preg.productListUpdator()) {
0226             auto const& item = nonConstItem;
0227             if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0228               // Check that products which are not transient in the dictionary are consistent between
0229               // all the cases of a SwitchProducer.
0230               if (!item.second.transient()) {
0231                 auto range = std::equal_range(chosenBranches.begin(),
0232                                               chosenBranches.end(),
0233                                               BranchKey(item.first.friendlyClassName(),
0234                                                         switchLabel,
0235                                                         item.first.productInstanceName(),
0236                                                         item.first.processName()));
0237                 if (range.first == range.second) {
0238                   Exception ex(errors::Configuration);
0239                   ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0240                      << item.first << " that is not produced by the chosen case "
0241                      << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0242                             .getUntrackedParameter<std::string>("@chosen_case")
0243                      << " and that product is not transient. "
0244                      << "If the intention is to produce only a subset of the non-transient products listed below, each "
0245                         "case with more non-transient products needs to be replaced with an EDAlias to only the "
0246                         "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0247                   addProductsToException(caseLabels, ex);
0248                   throw ex;
0249                 }
0250                 assert(std::distance(range.first, range.second) == 1);
0251                 foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0252               }
0253 
0254               // Set the alias-for branch as transient so it gets fully ignored in output.
0255               // I tried first to implicitly drop all branches with
0256               // '@' in ProductSelector, but that gave problems on
0257               // input (those branches would be implicitly dropped on
0258               // input as well, leading to the SwitchProducer branches
0259               // do be dropped as dependent ones, as the alias
0260               // detection logic in RootFile says that the
0261               // SwitchProducer branches are not alias branches)
0262               nonConstItem.second.setTransient(true);
0263 
0264               // Check that there are no BranchAliases for any of the cases
0265               auto const& bd = item.second;
0266               if (not bd.branchAliases().empty()) {
0267                 auto ex = Exception(errors::UnimplementedFeature)
0268                           << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0269                              "aliases for SwitchProducer with label "
0270                           << switchLabel << " for case " << caseLabel << ":";
0271                 for (auto const& branchAlias : bd.branchAliases()) {
0272                   ex << " " << branchAlias;
0273                 }
0274                 throw ex;
0275               }
0276             }
0277           }
0278 
0279           for (size_t i = 0; i < chosenBranches.size(); i++) {
0280             if (not foundBranches[i]) {
0281               auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0282                                      .getUntrackedParameter<std::string>("@chosen_case");
0283               Exception ex(errors::Configuration);
0284               ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0285                  << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0286                  << chosenLabel << " and that product is not transient. "
0287                  << "If the intention is to produce only a subset of the non-transient products listed below, each "
0288                     "case with more non-transient products needs to be replaced with an EDAlias to only the "
0289                     "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0290               addProductsToException(caseLabels, ex);
0291               throw ex;
0292             }
0293           }
0294         }
0295       }
0296     }
0297 
0298     void reduceParameterSet(ParameterSet& proc_pset,
0299                             vstring const& end_path_name_list,
0300                             vstring& modulesInConfig,
0301                             std::set<std::string> const& usedModuleLabels,
0302                             std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0303       // Before calculating the ParameterSetID of the top level ParameterSet or
0304       // saving it in the registry drop from the top level ParameterSet all
0305       // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
0306       // production is not enabled also drop all the EDFilters and EDProducers
0307       // that are not scheduled. Drop the ParameterSet used to configure the module
0308       // itself. Also drop the other traces of these labels in the top level
0309       // ParameterSet: Remove that labels from @all_modules and from all the
0310       // end paths. If this makes any end paths empty, then remove the end path
0311       // name from @end_paths, and @paths.
0312 
0313       // First make a list of labels to drop
0314       vstring outputModuleLabels;
0315       std::string edmType;
0316       std::string const moduleEdmType("@module_edm_type");
0317       std::string const outputModule("OutputModule");
0318       std::string const edAnalyzer("EDAnalyzer");
0319       std::string const edFilter("EDFilter");
0320       std::string const edProducer("EDProducer");
0321 
0322       std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0323 
0324       //need a list of all modules on paths in order to determine
0325       // if an EDAnalyzer only appears on an end path
0326       vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0327       std::set<std::string> modulesOnPaths;
0328       {
0329         std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0330         for (auto const& endPath : end_path_name_list) {
0331           noEndPaths.erase(endPath);
0332         }
0333         {
0334           vstring labels;
0335           for (auto const& path : noEndPaths) {
0336             labels = proc_pset.getParameter<vstring>(path);
0337             modulesOnPaths.insert(labels.begin(), labels.end());
0338           }
0339         }
0340       }
0341       //Initially fill labelsToBeDropped with all module mentioned in
0342       // the configuration but which are not being used by the system
0343       std::vector<std::string> labelsToBeDropped;
0344       labelsToBeDropped.reserve(modulesInConfigSet.size());
0345       std::set_difference(modulesInConfigSet.begin(),
0346                           modulesInConfigSet.end(),
0347                           usedModuleLabels.begin(),
0348                           usedModuleLabels.end(),
0349                           std::back_inserter(labelsToBeDropped));
0350 
0351       const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0352       for (auto const& modLabel : usedModuleLabels) {
0353         // Do nothing for modules that do not have a ParameterSet. Modules of type
0354         // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
0355         if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0356           edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0357           if (edmType == outputModule) {
0358             outputModuleLabels.push_back(modLabel);
0359             labelsToBeDropped.push_back(modLabel);
0360           }
0361           if (edmType == edAnalyzer) {
0362             if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0363               labelsToBeDropped.push_back(modLabel);
0364             }
0365           }
0366         }
0367       }
0368       //labelsToBeDropped must be sorted
0369       std::inplace_merge(
0370           labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0371 
0372       // drop the parameter sets used to configure the modules
0373       for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0374 
0375       // drop the labels from @all_modules
0376       vstring::iterator endAfterRemove =
0377           std::remove_if(modulesInConfig.begin(),
0378                          modulesInConfig.end(),
0379                          std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0380       modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0381       proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0382 
0383       // drop the labels from all end paths
0384       vstring endPathsToBeDropped;
0385       vstring labels;
0386       for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0387            iEndPath != endEndPath;
0388            ++iEndPath) {
0389         labels = proc_pset.getParameter<vstring>(*iEndPath);
0390         vstring::iterator iSave = labels.begin();
0391         vstring::iterator iBegin = labels.begin();
0392 
0393         for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0394           if (binary_search_string(labelsToBeDropped, *iLabel)) {
0395             if (binary_search_string(outputModuleLabels, *iLabel)) {
0396               outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0397             }
0398           } else {
0399             if (iSave != iLabel) {
0400               iSave->swap(*iLabel);
0401             }
0402             ++iSave;
0403           }
0404         }
0405         labels.erase(iSave, labels.end());
0406         if (labels.empty()) {
0407           // remove empty end paths and save their names
0408           proc_pset.eraseSimpleParameter(*iEndPath);
0409           endPathsToBeDropped.push_back(*iEndPath);
0410         } else {
0411           proc_pset.addParameter<vstring>(*iEndPath, labels);
0412         }
0413       }
0414       sort_all(endPathsToBeDropped);
0415 
0416       // remove empty end paths from @paths
0417       endAfterRemove = std::remove_if(scheduledPaths.begin(),
0418                                       scheduledPaths.end(),
0419                                       std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0420       scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0421       proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0422 
0423       // remove empty end paths from @end_paths
0424       vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0425       endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0426                                       scheduledEndPaths.end(),
0427                                       std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0428       scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0429       proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0430     }
0431 
0432     class RngEDConsumer : public EDConsumerBase {
0433     public:
0434       explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
0435         Service<RandomNumberGenerator> rng;
0436         if (rng.isAvailable()) {
0437           rng->consumes(consumesCollector());
0438           for (auto const& consumesInfo : this->moduleConsumesInfos()) {
0439             typesConsumed.emplace(consumesInfo.type());
0440           }
0441         }
0442       }
0443     };
0444 
0445     template <typename F>
0446     auto doCleanup(F&& iF) {
0447       auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0448         CMS_SA_ALLOW try { f(); } catch (...) {
0449         }
0450         if (iPtr) {
0451           iTask.doneWaiting(*iPtr);
0452         }
0453       };
0454       return wrapped;
0455     }
0456   }  // namespace
0457   // -----------------------------
0458 
0459   typedef std::vector<std::string> vstring;
0460 
0461   // -----------------------------
0462 
0463   Schedule::Schedule(ParameterSet& proc_pset,
0464                      service::TriggerNamesService const& tns,
0465                      SignallingProductRegistryFiller& preg,
0466                      ExceptionToActionTable const& actions,
0467                      std::shared_ptr<ActivityRegistry> areg,
0468                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0469                      PreallocationConfiguration const& prealloc,
0470                      ProcessContext const* processContext,
0471                      ModuleTypeResolverMaker const* resolverMaker)
0472       :  //Only create a resultsInserter if there is a trigger path
0473         moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
0474         resultsInserter_{
0475             tns.getTrigPaths().empty()
0476                 ? std::shared_ptr<TriggerResultInserter>{}
0477                 : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration, *moduleRegistry_)},
0478         all_output_communicators_(),
0479         preallocConfig_(prealloc),
0480         pathNames_(&tns.getTrigPaths()),
0481         endPathNames_(&tns.getEndPaths()),
0482         wantSummary_(tns.wantSummary()) {
0483     makePathStatusInserters(pathStatusInserters_,
0484                             *pathNames_,
0485                             prealloc,
0486                             preg,
0487                             areg,
0488                             processConfiguration,
0489                             *moduleRegistry_,
0490                             std::string("PathStatusInserter"));
0491 
0492     if (endPathNames_->size() > 1) {
0493       makePathStatusInserters(endPathStatusInserters_,
0494                               *endPathNames_,
0495                               prealloc,
0496                               preg,
0497                               areg,
0498                               processConfiguration,
0499                               *moduleRegistry_,
0500                               std::string("EndPathStatusInserter"));
0501     }
0502 
0503     assert(0 < prealloc.numberOfStreams());
0504     streamSchedules_.reserve(prealloc.numberOfStreams());
0505     for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0506       streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
0507                                                                                pathStatusInserters_,
0508                                                                                endPathStatusInserters_,
0509                                                                                moduleRegistry(),
0510                                                                                proc_pset,
0511                                                                                tns,
0512                                                                                prealloc,
0513                                                                                preg,
0514                                                                                actions,
0515                                                                                areg,
0516                                                                                processConfiguration,
0517                                                                                StreamID{i},
0518                                                                                processContext));
0519     }
0520 
0521     const std::string kTriggerResults("TriggerResults");
0522     std::vector<std::string> modulesToUse;
0523     modulesToUse.reserve(streamSchedules_[0]->allWorkersLumisAndEvents().size());
0524     for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
0525       if (worker->description()->moduleLabel() != kTriggerResults) {
0526         modulesToUse.push_back(worker->description()->moduleLabel());
0527       }
0528     }
0529     //The unscheduled modules are at the end of the list, but we want them at the front
0530     unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
0531     if (nUnscheduledModules > 0) {
0532       std::vector<std::string> temp;
0533       temp.reserve(modulesToUse.size());
0534       auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
0535       std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
0536       std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
0537       temp.swap(modulesToUse);
0538     }
0539 
0540     // propagate_const<T> has no reset() function
0541     globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0542                                                        pathStatusInserters_,
0543                                                        endPathStatusInserters_,
0544                                                        moduleRegistry(),
0545                                                        modulesToUse,
0546                                                        proc_pset,
0547                                                        preg,
0548                                                        prealloc,
0549                                                        actions,
0550                                                        areg,
0551                                                        processConfiguration,
0552                                                        processContext);
0553   }
0554 
0555   void Schedule::finishSetup(ParameterSet& proc_pset,
0556                              service::TriggerNamesService const& tns,
0557                              SignallingProductRegistryFiller& preg,
0558                              BranchIDListHelper& branchIDListHelper,
0559                              ProcessBlockHelperBase& processBlockHelper,
0560                              ThinnedAssociationsHelper& thinnedAssociationsHelper,
0561                              std::shared_ptr<ActivityRegistry> areg,
0562                              std::shared_ptr<ProcessConfiguration> processConfiguration,
0563                              PreallocationConfiguration const& prealloc,
0564                              ProcessContext const* processContext) {
0565     //TriggerResults is not in the top level ParameterSet so the call to
0566     // reduceParameterSet would fail to find it. Just remove it up front.
0567     const std::string kTriggerResults("TriggerResults");
0568 
0569     std::set<std::string> usedModuleLabels;
0570     for (auto const& worker : allWorkers()) {
0571       if (worker->description()->moduleLabel() != kTriggerResults) {
0572         usedModuleLabels.insert(worker->description()->moduleLabel());
0573       }
0574     }
0575     std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0576     std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0577     reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0578     {
0579       std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0580       detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0581     }
0582 
0583     // At this point all ProductDescriptions are created. Mark now the
0584     // ones of unscheduled workers to be on-demand.
0585     {
0586       auto const& unsched = streamSchedules_[0]->unscheduledWorkersLumisAndEvents();
0587       if (not unsched.empty()) {
0588         std::set<std::string> unscheduledModules;
0589         std::transform(unsched.begin(),
0590                        unsched.end(),
0591                        std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
0592                        [](auto worker) { return worker->description()->moduleLabel(); });
0593         preg.setUnscheduledProducts(unscheduledModules);
0594       }
0595     }
0596 
0597     processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0598     proc_pset.registerIt();
0599     processConfiguration->setParameterSetID(proc_pset.id());
0600     processConfiguration->setProcessConfigurationID();
0601 
0602     // This is used for a little sanity-check to make sure no code
0603     // modifications alter the number of workers at a later date.
0604     size_t all_workers_count = allWorkers().size();
0605 
0606     moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0607       auto comm = iHolder->createOutputModuleCommunicator();
0608       if (comm) {
0609         all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0610       }
0611     });
0612     // Now that the output workers are filled in, set any output limits or information.
0613     limitOutput(proc_pset, branchIDListHelper.branchIDLists());
0614 
0615     // Sanity check: make sure nobody has added a worker after we've
0616     // already relied on the WorkerManager being full.
0617     assert(all_workers_count == allWorkers().size());
0618 
0619     branchIDListHelper.updateFromRegistry(preg.registry());
0620 
0621     moduleRegistry_->forAllModuleHolders([&preg, &thinnedAssociationsHelper](auto& iHolder) {
0622       iHolder->registerThinnedAssociations(preg.registry(), thinnedAssociationsHelper);
0623     });
0624 
0625     processBlockHelper.updateForNewProcess(preg.registry(), processConfiguration->processName());
0626 
0627     // The output modules consume products in kept branches.
0628     // So we must set this up before freezing.
0629     for (auto& c : all_output_communicators_) {
0630       c->selectProducts(preg.registry(), thinnedAssociationsHelper, processBlockHelper);
0631     }
0632 
0633     for (auto& product : preg.productListUpdator()) {
0634       setIsMergeable(product.second);
0635     }
0636 
0637     {
0638       // We now get a collection of types that may be consumed.
0639       std::set<TypeID> productTypesConsumed;
0640       std::set<TypeID> elementTypesConsumed;
0641       // Loop over all modules
0642       for (auto const& worker : allWorkers()) {
0643         for (auto const& consumesInfo : worker->moduleConsumesInfos()) {
0644           if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0645             productTypesConsumed.emplace(consumesInfo.type());
0646           } else {
0647             elementTypesConsumed.emplace(consumesInfo.type());
0648           }
0649         }
0650       }
0651       // The RandomNumberGeneratorService is not a module, yet it consumes.
0652       { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
0653       preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0654     }
0655 
0656     for (auto& c : all_output_communicators_) {
0657       c->setEventSelectionInfo(outputModulePathPositions, preg.registry().anyProductProduced());
0658     }
0659 
0660     if (wantSummary_) {
0661       std::vector<const ModuleDescription*> modDesc;
0662       const auto& workers = allWorkers();
0663       modDesc.reserve(workers.size());
0664 
0665       std::transform(workers.begin(),
0666                      workers.end(),
0667                      std::back_inserter(modDesc),
0668                      [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0669 
0670       // propagate_const<T> has no reset() function
0671       summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0672       auto timeKeeperPtr = summaryTimeKeeper_.get();
0673 
0674       areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0675 
0676       areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0677       areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0678       areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0679       areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0680       areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0681       areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0682 
0683       areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0684       areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0685 
0686       areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0687       areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0688 
0689       areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0690       areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0691       //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
0692       //timeKeeperPtr->startModuleEvent(iContext,iMod);
0693       //});
0694     }
0695 
0696   }  // Schedule::Schedule
0697 
0698   void Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
0699     std::string const output("output");
0700 
0701     ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0702     int maxEventSpecs = 0;
0703     int maxEventsOut = -1;
0704     ParameterSet const* vMaxEventsOut = nullptr;
0705     std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0706     if (search_all(intNamesE, output)) {
0707       maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0708       ++maxEventSpecs;
0709     }
0710     std::vector<std::string> psetNamesE;
0711     maxEventsPSet.getParameterSetNames(psetNamesE, false);
0712     if (search_all(psetNamesE, output)) {
0713       vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0714       ++maxEventSpecs;
0715     }
0716 
0717     if (maxEventSpecs > 1) {
0718       throw Exception(errors::Configuration)
0719           << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0720     }
0721 
0722     for (auto& c : all_output_communicators_) {
0723       OutputModuleDescription desc(branchIDLists, maxEventsOut);
0724       if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0725         std::string const& moduleLabel = c->description().moduleLabel();
0726         try {
0727           desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0728         } catch (Exception const&) {
0729           throw Exception(errors::Configuration)
0730               << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0731         }
0732       }
0733       c->configure(desc);
0734     }
0735   }
0736 
0737   bool Schedule::terminate() const {
0738     if (all_output_communicators_.empty()) {
0739       return false;
0740     }
0741     for (auto& c : all_output_communicators_) {
0742       if (!c->limitReached()) {
0743         // Found an output module that has not reached output event count.
0744         return false;
0745       }
0746     }
0747     LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0748                                      << "has reached its configured limit.\n";
0749     return true;
0750   }
0751 
0752   void Schedule::endJob(ExceptionCollector& collector) {
0753     globalSchedule_->endJob(collector, *moduleRegistry_);
0754     if (collector.hasThrown()) {
0755       return;
0756     }
0757 
0758     if (wantSummary_) {
0759       try {
0760         convertException::wrap([this]() { sendFwkSummaryToMessageLogger(); });
0761       } catch (cms::Exception const& ex) {
0762         collector.addException(ex);
0763       }
0764     }
0765   }
0766 
0767   void Schedule::sendFwkSummaryToMessageLogger() const {
0768     //Function to loop over items in a container and periodically
0769     // flush to the message logger.
0770     auto logForEach = [](auto const& iContainer, auto iMessage) {
0771       auto logger = LogFwkVerbatim("FwkSummary");
0772       int count = 0;
0773       for (auto const& element : iContainer) {
0774         iMessage(logger, element);
0775         if (++count == 10) {
0776           logger = LogFwkVerbatim("FwkSummary");
0777           count = 0;
0778         } else {
0779           logger << "\n";
0780         }
0781       }
0782     };
0783 
0784     {
0785       TriggerReport tr;
0786       getTriggerReport(tr);
0787 
0788       // The trigger report (pass/fail etc.):
0789 
0790       LogFwkVerbatim("FwkSummary") << "";
0791       LogFwkVerbatim("FwkSummary") << "TrigReport "
0792                                    << "---------- Event  Summary ------------";
0793       if (!tr.trigPathSummaries.empty()) {
0794         LogFwkVerbatim("FwkSummary") << "TrigReport"
0795                                      << " Events total = " << tr.eventSummary.totalEvents
0796                                      << " passed = " << tr.eventSummary.totalEventsPassed
0797                                      << " failed = " << tr.eventSummary.totalEventsFailed << "";
0798       } else {
0799         LogFwkVerbatim("FwkSummary") << "TrigReport"
0800                                      << " Events total = " << tr.eventSummary.totalEvents
0801                                      << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0802       }
0803 
0804       LogFwkVerbatim("FwkSummary") << "";
0805       LogFwkVerbatim("FwkSummary") << "TrigReport "
0806                                    << "---------- Path   Summary ------------";
0807       LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0808                                    << " " << std::right << std::setw(10) << "Executed"
0809                                    << " " << std::right << std::setw(10) << "Passed"
0810                                    << " " << std::right << std::setw(10) << "Failed"
0811                                    << " " << std::right << std::setw(10) << "Error"
0812                                    << " "
0813                                    << "Name"
0814                                    << "";
0815       logForEach(tr.trigPathSummaries, [](auto& logger, auto const& p) {
0816         logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << p.bitPosition << " "
0817                << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0818                << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0819                << p.timesExcept << " " << p.name;
0820       });
0821 
0822       /*
0823       std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
0824       std::vector<int>::const_iterator epe = empty_trig_paths_.end();
0825       std::vector<std::string>::const_iterator  epn = empty_trig_path_names_.begin();
0826       for (; epi != epe; ++epi, ++epn) {
0827 
0828         LogFwkVerbatim("FwkSummary") << "TrigReport "
0829         << std::right << std::setw(5) << 1
0830         << std::right << std::setw(5) << *epi << " "
0831         << std::right << std::setw(10) << totalEvents() << " "
0832         << std::right << std::setw(10) << totalEvents() << " "
0833         << std::right << std::setw(10) << 0 << " "
0834         << std::right << std::setw(10) << 0 << " "
0835         << *epn << "";
0836       }
0837        */
0838 
0839       LogFwkVerbatim("FwkSummary") << "\n"
0840                                    << "TrigReport "
0841                                    << "-------End-Path   Summary ------------\n"
0842                                    << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0843                                    << " " << std::right << std::setw(10) << "Executed"
0844                                    << " " << std::right << std::setw(10) << "Passed"
0845                                    << " " << std::right << std::setw(10) << "Failed"
0846                                    << " " << std::right << std::setw(10) << "Error"
0847                                    << " "
0848                                    << "Name";
0849       logForEach(tr.endPathSummaries, [](auto& logger, auto const& p) {
0850         logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << p.bitPosition << " "
0851                << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0852                << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0853                << p.timesExcept << " " << p.name;
0854       });
0855 
0856       for (auto const& p : tr.trigPathSummaries) {
0857         LogFwkVerbatim("FwkSummary") << "\n"
0858                                      << "TrigReport "
0859                                      << "---------- Modules in Path: " << p.name << " ------------\n"
0860                                      << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0861                                      << " " << std::right << std::setw(10) << "Visited"
0862                                      << " " << std::right << std::setw(10) << "Passed"
0863                                      << " " << std::right << std::setw(10) << "Failed"
0864                                      << " " << std::right << std::setw(10) << "Error"
0865                                      << " "
0866                                      << "Name";
0867 
0868         logForEach(p.moduleInPathSummaries, [](auto& logger, auto const& mod) {
0869           logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << mod.bitPosition
0870                  << " " << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0871                  << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0872                  << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0873         });
0874       }
0875 
0876       for (auto const& p : tr.endPathSummaries) {
0877         LogFwkVerbatim("FwkSummary") << "\n"
0878                                      << "TrigReport "
0879                                      << "------ Modules in End-Path: " << p.name << " ------------\n"
0880                                      << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0881                                      << " " << std::right << std::setw(10) << "Visited"
0882                                      << " " << std::right << std::setw(10) << "Passed"
0883                                      << " " << std::right << std::setw(10) << "Failed"
0884                                      << " " << std::right << std::setw(10) << "Error"
0885                                      << " "
0886                                      << "Name";
0887 
0888         unsigned int bitpos = 0;
0889         logForEach(p.moduleInPathSummaries, [&bitpos](auto& logger, auto const& mod) {
0890           logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << bitpos << " "
0891                  << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0892                  << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0893                  << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0894           ++bitpos;
0895         });
0896       }
0897 
0898       LogFwkVerbatim("FwkSummary") << "\n"
0899                                    << "TrigReport "
0900                                    << "---------- Module Summary ------------\n"
0901                                    << "TrigReport " << std::right << std::setw(10) << "Visited"
0902                                    << " " << std::right << std::setw(10) << "Executed"
0903                                    << " " << std::right << std::setw(10) << "Passed"
0904                                    << " " << std::right << std::setw(10) << "Failed"
0905                                    << " " << std::right << std::setw(10) << "Error"
0906                                    << " "
0907                                    << "Name";
0908 
0909       logForEach(tr.workerSummaries, [](auto& logger, auto const& worker) {
0910         logger << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " " << std::right
0911                << std::setw(10) << worker.timesRun << " " << std::right << std::setw(10) << worker.timesPassed << " "
0912                << std::right << std::setw(10) << worker.timesFailed << " " << std::right << std::setw(10)
0913                << worker.timesExcept << " " << worker.moduleLabel;
0914       });
0915       LogFwkVerbatim("FwkSummary") << "";
0916     }
0917     // The timing report (CPU and Real Time):
0918     TriggerTimingReport tr;
0919     getTriggerTimingReport(tr);
0920 
0921     const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0922 
0923     LogFwkVerbatim("FwkSummary") << "TimeReport "
0924                                  << "---------- Event  Summary ---[sec]----";
0925     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0926                                  << "       event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0927     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0928                                  << "      event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0929     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0930                                  << "     sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0931     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0932                                  << " efficiency CPU/Real/thread = "
0933                                  << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0934                                         preallocConfig_.numberOfThreads();
0935 
0936     constexpr int kColumn1Size = 10;
0937     constexpr int kColumn2Size = 12;
0938     constexpr int kColumn3Size = 12;
0939     LogFwkVerbatim("FwkSummary") << "";
0940     LogFwkVerbatim("FwkSummary") << "TimeReport "
0941                                  << "---------- Path   Summary ---[Real sec]----";
0942     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0943                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0944                                  << "  Name";
0945     logForEach(tr.trigPathSummaries, [&](auto& logger, auto const& p) {
0946       const int timesRun = std::max(1, p.timesRun);
0947       logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0948              << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0949              << "  " << p.name;
0950     });
0951     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0952                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0953                                  << "  Name";
0954 
0955     LogFwkVerbatim("FwkSummary") << "\n"
0956                                  << "TimeReport "
0957                                  << "-------End-Path   Summary ---[Real sec]----\n"
0958                                  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0959                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0960                                  << "  Name";
0961     logForEach(tr.endPathSummaries, [&](auto& logger, auto const& p) {
0962       const int timesRun = std::max(1, p.timesRun);
0963 
0964       logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0965              << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0966              << "  " << p.name;
0967     });
0968     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0969                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0970                                  << "  Name";
0971 
0972     for (auto const& p : tr.trigPathSummaries) {
0973       LogFwkVerbatim("FwkSummary") << "\n"
0974                                    << "TimeReport "
0975                                    << "---------- Modules in Path: " << p.name << " ---[Real sec]----\n"
0976                                    << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0977                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
0978                                    << "  Name";
0979       logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
0980         logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0981                << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
0982                << mod.realTime / std::max(1, mod.timesVisited) << "  " << mod.moduleLabel;
0983       });
0984     }
0985     if (not tr.trigPathSummaries.empty()) {
0986       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0987                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
0988                                    << "  Name";
0989     }
0990     for (auto const& p : tr.endPathSummaries) {
0991       LogFwkVerbatim("FwkSummary") << "\n"
0992                                    << "TimeReport "
0993                                    << "------ Modules in End-Path: " << p.name << " ---[Real sec]----\n"
0994                                    << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0995                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
0996                                    << "  Name";
0997       logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
0998         logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0999                << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1000                << mod.realTime / std::max(1, mod.timesVisited) << "  " << mod.moduleLabel;
1001       });
1002     }
1003     if (not tr.endPathSummaries.empty()) {
1004       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1005                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
1006                                    << "  Name";
1007     }
1008     LogFwkVerbatim("FwkSummary") << "\n"
1009                                  << "TimeReport "
1010                                  << "---------- Module Summary ---[Real sec]----\n"
1011                                  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1012                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1013                                  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1014                                  << "  Name";
1015     logForEach(tr.workerSummaries, [&](auto& logger, auto const& worker) {
1016       logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1017              << worker.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1018              << worker.realTime / std::max(1, worker.timesRun) << " " << std::right << std::setw(kColumn3Size)
1019              << worker.realTime / std::max(1, worker.timesVisited) << "  " << worker.moduleLabel;
1020     });
1021     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1022                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1023                                  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1024                                  << "  Name";
1025 
1026     LogFwkVerbatim("FwkSummary") << "\nT---Report end!\n\n";
1027   }
1028 
1029   void Schedule::closeOutputFiles() {
1030     using std::placeholders::_1;
1031     for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
1032     for (auto& worker : allWorkers()) {
1033       worker->respondToCloseOutputFile();
1034     }
1035   }
1036 
1037   void Schedule::openOutputFiles(FileBlock& fb) {
1038     using std::placeholders::_1;
1039     for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1040   }
1041 
1042   void Schedule::writeRunAsync(WaitingTaskHolder task,
1043                                RunPrincipal const& rp,
1044                                ProcessContext const* processContext,
1045                                ActivityRegistry* activityRegistry,
1046                                MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1047     auto token = ServiceRegistry::instance().presentToken();
1048     GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
1049                                 LuminosityBlockID(rp.run(), 0),
1050                                 rp.index(),
1051                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1052                                 rp.endTime(),
1053                                 processContext);
1054 
1055     using namespace edm::waiting_task;
1056     chain::first([&](auto nextTask) {
1057       //services can depend on other services
1058       ServiceRegistry::Operate op(token);
1059 
1060       // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1061       CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1062       }
1063       for (auto& c : all_output_communicators_) {
1064         c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1065       }
1066     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1067       //services can depend on other services
1068       ServiceRegistry::Operate op(token);
1069 
1070       activityRegistry->postGlobalWriteRunSignal_(globalContext);
1071     })) |
1072         chain::runLast(task);
1073   }
1074 
1075   void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
1076                                         ProcessBlockPrincipal const& pbp,
1077                                         ProcessContext const* processContext,
1078                                         ActivityRegistry* activityRegistry) {
1079     auto token = ServiceRegistry::instance().presentToken();
1080     GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
1081                                 LuminosityBlockID(),
1082                                 RunIndex::invalidRunIndex(),
1083                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1084                                 Timestamp::invalidTimestamp(),
1085                                 processContext);
1086 
1087     using namespace edm::waiting_task;
1088     chain::first([&](auto nextTask) {
1089       // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1090       ServiceRegistry::Operate op(token);
1091       CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1092       }
1093       for (auto& c : all_output_communicators_) {
1094         c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1095       }
1096     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1097       //services can depend on other services
1098       ServiceRegistry::Operate op(token);
1099 
1100       activityRegistry->postWriteProcessBlockSignal_(globalContext);
1101     })) |
1102         chain::runLast(std::move(task));
1103   }
1104 
1105   void Schedule::writeLumiAsync(WaitingTaskHolder task,
1106                                 LuminosityBlockPrincipal const& lbp,
1107                                 ProcessContext const* processContext,
1108                                 ActivityRegistry* activityRegistry) {
1109     auto token = ServiceRegistry::instance().presentToken();
1110     GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1111                                 lbp.id(),
1112                                 lbp.runPrincipal().index(),
1113                                 lbp.index(),
1114                                 lbp.beginTime(),
1115                                 processContext);
1116 
1117     using namespace edm::waiting_task;
1118     chain::first([&](auto nextTask) {
1119       ServiceRegistry::Operate op(token);
1120       CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1121       }
1122       for (auto& c : all_output_communicators_) {
1123         c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1124       }
1125     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1126       //services can depend on other services
1127       ServiceRegistry::Operate op(token);
1128 
1129       activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1130     })) |
1131         chain::runLast(task);
1132   }
1133 
1134   bool Schedule::shouldWeCloseOutput() const {
1135     using std::placeholders::_1;
1136     // Return true iff at least one output module returns true.
1137     return (std::find_if(all_output_communicators_.begin(),
1138                          all_output_communicators_.end(),
1139                          std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1140             all_output_communicators_.end());
1141   }
1142 
1143   void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1144     using std::placeholders::_1;
1145     for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1146   }
1147 
1148   void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1149     using std::placeholders::_1;
1150     for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1151   }
1152 
1153   void Schedule::beginJob(ProductRegistry const& iRegistry,
1154                           eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
1155                           ProcessBlockHelperBase const& processBlockHelperBase,
1156                           std::string const& iProcessName) {
1157     finishModulesInitialization(*moduleRegistry_, iRegistry, iESIndices, processBlockHelperBase, iProcessName);
1158     globalSchedule_->beginJob(*moduleRegistry_);
1159   }
1160 
1161   void Schedule::beginStream(unsigned int streamID) {
1162     assert(streamID < streamSchedules_.size());
1163     streamSchedules_[streamID]->beginStream(*moduleRegistry_);
1164   }
1165 
1166   void Schedule::endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept {
1167     assert(streamID < streamSchedules_.size());
1168     streamSchedules_[streamID]->endStream(*moduleRegistry_, collector, collectorMutex);
1169   }
1170 
1171   void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1172                                       unsigned int iStreamID,
1173                                       EventTransitionInfo& info,
1174                                       ServiceToken const& token) {
1175     assert(iStreamID < streamSchedules_.size());
1176     streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1177   }
1178 
1179   bool Schedule::changeModule(std::string const& iLabel,
1180                               ParameterSet const& iPSet,
1181                               const SignallingProductRegistryFiller& iRegistry,
1182                               eventsetup::ESRecordsToProductResolverIndices const& iIndices) {
1183     Worker* found = nullptr;
1184     for (auto const& worker : allWorkers()) {
1185       if (worker->description()->moduleLabel() == iLabel) {
1186         found = worker;
1187         break;
1188       }
1189     }
1190     if (nullptr == found) {
1191       return false;
1192     }
1193 
1194     auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1195 
1196     globalSchedule_->replaceModule(newMod, iLabel);
1197 
1198     for (auto& s : streamSchedules_) {
1199       s->replaceModule(newMod, iLabel);
1200     }
1201 
1202     {
1203       //Need to updateLookup in order to make getByToken work
1204       auto const processBlockLookup = iRegistry.registry().productLookup(InProcess);
1205       auto const runLookup = iRegistry.registry().productLookup(InRun);
1206       auto const lumiLookup = iRegistry.registry().productLookup(InLumi);
1207       auto const eventLookup = iRegistry.registry().productLookup(InEvent);
1208       newMod->updateLookup(InProcess, *runLookup);
1209       newMod->updateLookup(InRun, *runLookup);
1210       newMod->updateLookup(InLumi, *lumiLookup);
1211       newMod->updateLookup(InEvent, *eventLookup);
1212       newMod->updateLookup(iIndices);
1213 
1214       auto const& processName = newMod->moduleDescription().processName();
1215       auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1216       auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1217       auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1218       auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1219       newMod->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1220       newMod->resolvePutIndicies(InRun, runModuleToIndicies);
1221       newMod->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1222       newMod->resolvePutIndicies(InEvent, eventModuleToIndicies);
1223     }
1224 
1225     return true;
1226   }
1227 
1228   void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1229     globalSchedule_->deleteModule(iLabel);
1230     for (auto& stream : streamSchedules_) {
1231       stream->deleteModule(iLabel);
1232     }
1233     moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1234   }
1235 
1236   void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1237                                        std::multimap<std::string, std::string> const& referencesToBranches,
1238                                        std::vector<std::string> const& modulesToSkip,
1239                                        edm::ProductRegistry const& preg) {
1240     for (auto& stream : streamSchedules_) {
1241       stream->initializeEarlyDelete(
1242           *moduleRegistry(), branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1243     }
1244   }
1245 
1246   std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1247     std::vector<ModuleDescription const*> result;
1248     result.reserve(allWorkers().size());
1249 
1250     for (auto const& worker : allWorkers()) {
1251       ModuleDescription const* p = worker->description();
1252       result.push_back(p);
1253     }
1254     return result;
1255   }
1256 
1257   Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1258 
1259   void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1260     moduleRegistry_->forAllModuleHolders([&](auto& iHolder) { iHolder->convertCurrentProcessAlias(processName); });
1261   }
1262 
1263   void Schedule::releaseMemoryPostLookupSignal() {
1264     moduleRegistry_->forAllModuleHolders([&](auto& iHolder) { iHolder->releaseMemoryPostLookupSignal(); });
1265   }
1266 
1267   void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1268     streamSchedules_[0]->availablePaths(oLabelsToFill);
1269   }
1270 
1271   void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1272 
1273   void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1274 
1275   void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1276     streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1277   }
1278 
1279   void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1280                                           std::vector<ModuleDescription const*>& descriptions,
1281                                           unsigned int hint) const {
1282     streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1283   }
1284 
1285   void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1286                                              std::vector<ModuleDescription const*>& descriptions,
1287                                              unsigned int hint) const {
1288     streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1289   }
1290 
1291   void Schedule::getTriggerReport(TriggerReport& rep) const {
1292     rep.eventSummary.totalEvents = 0;
1293     rep.eventSummary.totalEventsPassed = 0;
1294     rep.eventSummary.totalEventsFailed = 0;
1295     for (auto& s : streamSchedules_) {
1296       s->getTriggerReport(rep);
1297     }
1298     sort_all(rep.workerSummaries);
1299   }
1300 
1301   void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1302     rep.eventSummary.totalEvents = 0;
1303     rep.eventSummary.cpuTime = 0.;
1304     rep.eventSummary.realTime = 0.;
1305     summaryTimeKeeper_->fillTriggerTimingReport(rep);
1306   }
1307 
1308   int Schedule::totalEvents() const {
1309     int returnValue = 0;
1310     for (auto& s : streamSchedules_) {
1311       returnValue += s->totalEvents();
1312     }
1313     return returnValue;
1314   }
1315 
1316   int Schedule::totalEventsPassed() const {
1317     int returnValue = 0;
1318     for (auto& s : streamSchedules_) {
1319       returnValue += s->totalEventsPassed();
1320     }
1321     return returnValue;
1322   }
1323 
1324   int Schedule::totalEventsFailed() const {
1325     int returnValue = 0;
1326     for (auto& s : streamSchedules_) {
1327       returnValue += s->totalEventsFailed();
1328     }
1329     return returnValue;
1330   }
1331 
1332   void Schedule::clearCounters() {
1333     for (auto& s : streamSchedules_) {
1334       s->clearCounters();
1335     }
1336   }
1337 }  // namespace edm