Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-17 01:30:21

0001 #include "FWCore/Framework/interface/Schedule.h"
0002 
0003 #include "DataFormats/Common/interface/setIsMergeable.h"
0004 #include "DataFormats/Common/interface/TriggerResults.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0008 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 #include "FWCore/AbstractServices/interface/RandomNumberGenerator.h"
0011 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0012 #include "FWCore/Framework/interface/ConsumesCollector.h"
0013 #include "FWCore/Framework/interface/EDConsumerBase.h"
0014 #include "FWCore/Framework/src/OutputModuleDescription.h"
0015 #include "FWCore/Framework/interface/TriggerNamesService.h"
0016 #include "FWCore/Framework/src/TriggerReport.h"
0017 #include "FWCore/Framework/src/TriggerTimingReport.h"
0018 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0019 #include "FWCore/Framework/src/ModuleHolderFactory.h"
0020 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0021 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0022 #include "FWCore/Framework/interface/ModuleRegistry.h"
0023 #include "FWCore/Framework/src/TriggerResultInserter.h"
0024 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0025 #include "FWCore/Framework/src/PathStatusInserter.h"
0026 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0027 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0028 #include "FWCore/Framework/interface/ComponentDescription.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 #include "FWCore/Concurrency/interface/chain_first.h"
0031 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0032 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0033 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0034 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0035 #include "FWCore/ServiceRegistry/interface/ModuleConsumesInfo.h"
0036 #include "FWCore/Utilities/interface/Algorithms.h"
0037 #include "FWCore/Utilities/interface/ConvertException.h"
0038 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0039 #include "FWCore/Utilities/interface/TypeID.h"
0040 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0041 
0042 #include <array>
0043 #include <algorithm>
0044 #include <cassert>
0045 #include <cstdlib>
0046 #include <functional>
0047 #include <iomanip>
0048 #include <list>
0049 #include <map>
0050 #include <set>
0051 #include <exception>
0052 #include <sstream>
0053 
0054 #include "make_shared_noexcept_false.h"
0055 #include "processEDAliases.h"
0056 
0057 namespace edm {
0058 
0059   class ModuleMakerBase;
0060 
0061   namespace {
0062     using std::placeholders::_1;
0063 
0064     bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0065       return std::binary_search(v.begin(), v.end(), s);
0066     }
0067 
0068     // Here we make the trigger results inserter directly.  This should
0069     // probably be a utility in the WorkerRegistry or elsewhere.
0070 
0071     std::shared_ptr<TriggerResultInserter> makeInserter(
0072         ParameterSet& proc_pset,
0073         PreallocationConfiguration const& iPrealloc,
0074         SignallingProductRegistryFiller& preg,
0075         ExceptionToActionTable const& actions,
0076         std::shared_ptr<ActivityRegistry> areg,
0077         std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0078       ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
0079       trig_pset->registerIt();
0080 
0081       WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
0082       ModuleDescription md(trig_pset->id(),
0083                            "TriggerResultInserter",
0084                            "TriggerResults",
0085                            processConfiguration.get(),
0086                            ModuleDescription::getUniqueID());
0087 
0088       areg->preModuleConstructionSignal_(md);
0089       bool postCalled = false;
0090       std::shared_ptr<TriggerResultInserter> returnValue;
0091       // Caught exception is rethrown
0092       CMS_SA_ALLOW try {
0093         auto module = make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams());
0094         maker::ModuleHolderT<TriggerResultInserter::ModuleType>::finishModuleInitialization(
0095             *module, md, iPrealloc, &preg);
0096         returnValue = module;
0097         postCalled = true;
0098         // if exception then post will be called in the catch block
0099         areg->postModuleConstructionSignal_(md);
0100       } catch (...) {
0101         if (!postCalled) {
0102           CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0103             // If post throws an exception ignore it because we are already handling another exception
0104           }
0105         }
0106         throw;
0107       }
0108       return returnValue;
0109     }
0110 
0111     template <typename T>
0112     void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
0113                                  std::vector<std::string> const& pathNames,
0114                                  PreallocationConfiguration const& iPrealloc,
0115                                  SignallingProductRegistryFiller& preg,
0116                                  std::shared_ptr<ActivityRegistry> areg,
0117                                  std::shared_ptr<ProcessConfiguration const> processConfiguration,
0118                                  std::string const& moduleTypeName) {
0119       ParameterSet pset;
0120       pset.addParameter<std::string>("@module_type", moduleTypeName);
0121       pset.addParameter<std::string>("@module_edm_type", "EDProducer");
0122       pset.registerIt();
0123 
0124       pathStatusInserters.reserve(pathNames.size());
0125 
0126       for (auto const& pathName : pathNames) {
0127         ModuleDescription md(
0128             pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
0129 
0130         areg->preModuleConstructionSignal_(md);
0131         bool postCalled = false;
0132         // Caught exception is rethrown
0133         CMS_SA_ALLOW try {
0134           auto module = make_shared_noexcept_false<T>(iPrealloc.numberOfStreams());
0135           maker::ModuleHolderT<typename T::ModuleType>::finishModuleInitialization(*module, md, iPrealloc, &preg);
0136           pathStatusInserters.emplace_back(module);
0137           postCalled = true;
0138           // if exception then post will be called in the catch block
0139           areg->postModuleConstructionSignal_(md);
0140         } catch (...) {
0141           if (!postCalled) {
0142             CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0143               // If post throws an exception ignore it because we are already handling another exception
0144             }
0145           }
0146           throw;
0147         }
0148       }
0149     }
0150 
0151     typedef std::vector<std::string> vstring;
0152 
0153     void processSwitchProducers(ParameterSet const& proc_pset,
0154                                 std::string const& processName,
0155                                 SignallingProductRegistryFiller& preg) {
0156       // Update Switch ProductDescriptions for the chosen case
0157       struct BranchesCases {
0158         BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0159         std::vector<BranchKey> chosenBranches;
0160         std::vector<std::string> caseLabels;
0161       };
0162       std::map<std::string, BranchesCases> switchMap;
0163       for (auto& prod : preg.productListUpdator()) {
0164         if (prod.second.isSwitchAlias()) {
0165           auto it = switchMap.find(prod.second.moduleLabel());
0166           if (it == switchMap.end()) {
0167             auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0168             auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0169                                               switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0170             assert(inserted.second);
0171             it = inserted.first;
0172           }
0173 
0174           bool found = false;
0175           for (auto const& productIter : preg.registry().productList()) {
0176             BranchKey const& branchKey = productIter.first;
0177             // The alias-for product must be in the same process as
0178             // the SwitchProducer (earlier processes
0179             // may contain products with same type, module label, and
0180             // instance name)
0181             if (branchKey.processName() != processName) {
0182               continue;
0183             }
0184 
0185             ProductDescription const& desc = productIter.second;
0186             if (desc.branchType() == prod.second.branchType() and
0187                 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0188                 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0189                 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0190               prod.second.setSwitchAliasForBranch(desc);
0191               if (!prod.second.transient()) {
0192                 it->second.chosenBranches.push_back(prod.first);  // with moduleLabel of the Switch
0193               }
0194               found = true;
0195             }
0196           }
0197           if (not found) {
0198             Exception ex(errors::LogicError);
0199             ex << "Trying to find a ProductDescription to be aliased-for by SwitchProducer with\n"
0200                << "  friendly class name = " << prod.second.friendlyClassName() << "\n"
0201                << "  module label = " << prod.second.moduleLabel() << "\n"
0202                << "  product instance name = " << prod.second.productInstanceName() << "\n"
0203                << "  process name = " << processName
0204                << "\n\nbut did not find any. Please contact a framework developer.";
0205             ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0206             throw ex;
0207           }
0208         }
0209       }
0210       if (switchMap.empty())
0211         return;
0212 
0213       for (auto& elem : switchMap) {
0214         std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0215       }
0216 
0217       auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0218         std::map<std::string, std::vector<BranchKey>> caseBranches;
0219         for (auto const& item : preg.registry().productList()) {
0220           if (item.first.processName() != processName)
0221             continue;
0222 
0223           if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0224               found != caseLabels.end()) {
0225             caseBranches[*found].push_back(item.first);
0226           }
0227         }
0228 
0229         for (auto const& caseLabel : caseLabels) {
0230           ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0231           auto& branches = caseBranches[caseLabel];
0232           std::sort(branches.begin(), branches.end());
0233           for (auto const& branch : branches) {
0234             ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0235           }
0236         }
0237       };
0238 
0239       // Check that non-chosen cases declare exactly the same non-transient branches
0240       // Also set the alias-for branches to transient
0241       std::vector<bool> foundBranches;
0242       for (auto const& switchItem : switchMap) {
0243         auto const& switchLabel = switchItem.first;
0244         auto const& chosenBranches = switchItem.second.chosenBranches;
0245         auto const& caseLabels = switchItem.second.caseLabels;
0246         foundBranches.resize(chosenBranches.size());
0247         for (auto const& caseLabel : caseLabels) {
0248           std::fill(foundBranches.begin(), foundBranches.end(), false);
0249           for (auto& nonConstItem : preg.productListUpdator()) {
0250             auto const& item = nonConstItem;
0251             if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0252               // Check that products which are not transient in the dictionary are consistent between
0253               // all the cases of a SwitchProducer.
0254               if (!item.second.transient()) {
0255                 auto range = std::equal_range(chosenBranches.begin(),
0256                                               chosenBranches.end(),
0257                                               BranchKey(item.first.friendlyClassName(),
0258                                                         switchLabel,
0259                                                         item.first.productInstanceName(),
0260                                                         item.first.processName()));
0261                 if (range.first == range.second) {
0262                   Exception ex(errors::Configuration);
0263                   ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0264                      << item.first << " that is not produced by the chosen case "
0265                      << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0266                             .getUntrackedParameter<std::string>("@chosen_case")
0267                      << " and that product is not transient. "
0268                      << "If the intention is to produce only a subset of the non-transient products listed below, each "
0269                         "case with more non-transient products needs to be replaced with an EDAlias to only the "
0270                         "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0271                   addProductsToException(caseLabels, ex);
0272                   throw ex;
0273                 }
0274                 assert(std::distance(range.first, range.second) == 1);
0275                 foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0276               }
0277 
0278               // Set the alias-for branch as transient so it gets fully ignored in output.
0279               // I tried first to implicitly drop all branches with
0280               // '@' in ProductSelector, but that gave problems on
0281               // input (those branches would be implicitly dropped on
0282               // input as well, leading to the SwitchProducer branches
0283               // do be dropped as dependent ones, as the alias
0284               // detection logic in RootFile says that the
0285               // SwitchProducer branches are not alias branches)
0286               nonConstItem.second.setTransient(true);
0287 
0288               // Check that there are no BranchAliases for any of the cases
0289               auto const& bd = item.second;
0290               if (not bd.branchAliases().empty()) {
0291                 auto ex = Exception(errors::UnimplementedFeature)
0292                           << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0293                              "aliases for SwitchProducer with label "
0294                           << switchLabel << " for case " << caseLabel << ":";
0295                 for (auto const& branchAlias : bd.branchAliases()) {
0296                   ex << " " << branchAlias;
0297                 }
0298                 throw ex;
0299               }
0300             }
0301           }
0302 
0303           for (size_t i = 0; i < chosenBranches.size(); i++) {
0304             if (not foundBranches[i]) {
0305               auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0306                                      .getUntrackedParameter<std::string>("@chosen_case");
0307               Exception ex(errors::Configuration);
0308               ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0309                  << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0310                  << chosenLabel << " and that product is not transient. "
0311                  << "If the intention is to produce only a subset of the non-transient products listed below, each "
0312                     "case with more non-transient products needs to be replaced with an EDAlias to only the "
0313                     "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0314               addProductsToException(caseLabels, ex);
0315               throw ex;
0316             }
0317           }
0318         }
0319       }
0320     }
0321 
0322     void reduceParameterSet(ParameterSet& proc_pset,
0323                             vstring const& end_path_name_list,
0324                             vstring& modulesInConfig,
0325                             std::set<std::string> const& usedModuleLabels,
0326                             std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0327       // Before calculating the ParameterSetID of the top level ParameterSet or
0328       // saving it in the registry drop from the top level ParameterSet all
0329       // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
0330       // production is not enabled also drop all the EDFilters and EDProducers
0331       // that are not scheduled. Drop the ParameterSet used to configure the module
0332       // itself. Also drop the other traces of these labels in the top level
0333       // ParameterSet: Remove that labels from @all_modules and from all the
0334       // end paths. If this makes any end paths empty, then remove the end path
0335       // name from @end_paths, and @paths.
0336 
0337       // First make a list of labels to drop
0338       vstring outputModuleLabels;
0339       std::string edmType;
0340       std::string const moduleEdmType("@module_edm_type");
0341       std::string const outputModule("OutputModule");
0342       std::string const edAnalyzer("EDAnalyzer");
0343       std::string const edFilter("EDFilter");
0344       std::string const edProducer("EDProducer");
0345 
0346       std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0347 
0348       //need a list of all modules on paths in order to determine
0349       // if an EDAnalyzer only appears on an end path
0350       vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0351       std::set<std::string> modulesOnPaths;
0352       {
0353         std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0354         for (auto const& endPath : end_path_name_list) {
0355           noEndPaths.erase(endPath);
0356         }
0357         {
0358           vstring labels;
0359           for (auto const& path : noEndPaths) {
0360             labels = proc_pset.getParameter<vstring>(path);
0361             modulesOnPaths.insert(labels.begin(), labels.end());
0362           }
0363         }
0364       }
0365       //Initially fill labelsToBeDropped with all module mentioned in
0366       // the configuration but which are not being used by the system
0367       std::vector<std::string> labelsToBeDropped;
0368       labelsToBeDropped.reserve(modulesInConfigSet.size());
0369       std::set_difference(modulesInConfigSet.begin(),
0370                           modulesInConfigSet.end(),
0371                           usedModuleLabels.begin(),
0372                           usedModuleLabels.end(),
0373                           std::back_inserter(labelsToBeDropped));
0374 
0375       const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0376       for (auto const& modLabel : usedModuleLabels) {
0377         // Do nothing for modules that do not have a ParameterSet. Modules of type
0378         // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
0379         if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0380           edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0381           if (edmType == outputModule) {
0382             outputModuleLabels.push_back(modLabel);
0383             labelsToBeDropped.push_back(modLabel);
0384           }
0385           if (edmType == edAnalyzer) {
0386             if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0387               labelsToBeDropped.push_back(modLabel);
0388             }
0389           }
0390         }
0391       }
0392       //labelsToBeDropped must be sorted
0393       std::inplace_merge(
0394           labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0395 
0396       // drop the parameter sets used to configure the modules
0397       for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0398 
0399       // drop the labels from @all_modules
0400       vstring::iterator endAfterRemove =
0401           std::remove_if(modulesInConfig.begin(),
0402                          modulesInConfig.end(),
0403                          std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0404       modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0405       proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0406 
0407       // drop the labels from all end paths
0408       vstring endPathsToBeDropped;
0409       vstring labels;
0410       for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0411            iEndPath != endEndPath;
0412            ++iEndPath) {
0413         labels = proc_pset.getParameter<vstring>(*iEndPath);
0414         vstring::iterator iSave = labels.begin();
0415         vstring::iterator iBegin = labels.begin();
0416 
0417         for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0418           if (binary_search_string(labelsToBeDropped, *iLabel)) {
0419             if (binary_search_string(outputModuleLabels, *iLabel)) {
0420               outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0421             }
0422           } else {
0423             if (iSave != iLabel) {
0424               iSave->swap(*iLabel);
0425             }
0426             ++iSave;
0427           }
0428         }
0429         labels.erase(iSave, labels.end());
0430         if (labels.empty()) {
0431           // remove empty end paths and save their names
0432           proc_pset.eraseSimpleParameter(*iEndPath);
0433           endPathsToBeDropped.push_back(*iEndPath);
0434         } else {
0435           proc_pset.addParameter<vstring>(*iEndPath, labels);
0436         }
0437       }
0438       sort_all(endPathsToBeDropped);
0439 
0440       // remove empty end paths from @paths
0441       endAfterRemove = std::remove_if(scheduledPaths.begin(),
0442                                       scheduledPaths.end(),
0443                                       std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0444       scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0445       proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0446 
0447       // remove empty end paths from @end_paths
0448       vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0449       endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0450                                       scheduledEndPaths.end(),
0451                                       std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0452       scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0453       proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0454     }
0455 
0456     class RngEDConsumer : public EDConsumerBase {
0457     public:
0458       explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
0459         Service<RandomNumberGenerator> rng;
0460         if (rng.isAvailable()) {
0461           rng->consumes(consumesCollector());
0462           for (auto const& consumesInfo : this->moduleConsumesInfos()) {
0463             typesConsumed.emplace(consumesInfo.type());
0464           }
0465         }
0466       }
0467     };
0468 
0469     template <typename F>
0470     auto doCleanup(F&& iF) {
0471       auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0472         CMS_SA_ALLOW try { f(); } catch (...) {
0473         }
0474         if (iPtr) {
0475           iTask.doneWaiting(*iPtr);
0476         }
0477       };
0478       return wrapped;
0479     }
0480   }  // namespace
0481   // -----------------------------
0482 
0483   typedef std::vector<std::string> vstring;
0484 
0485   // -----------------------------
0486 
0487   Schedule::Schedule(ParameterSet& proc_pset,
0488                      service::TriggerNamesService const& tns,
0489                      SignallingProductRegistryFiller& preg,
0490                      ExceptionToActionTable const& actions,
0491                      std::shared_ptr<ActivityRegistry> areg,
0492                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0493                      PreallocationConfiguration const& prealloc,
0494                      ProcessContext const* processContext,
0495                      ModuleTypeResolverMaker const* resolverMaker)
0496       :  //Only create a resultsInserter if there is a trigger path
0497         resultsInserter_{tns.getTrigPaths().empty()
0498                              ? std::shared_ptr<TriggerResultInserter>{}
0499                              : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
0500         moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
0501         all_output_communicators_(),
0502         preallocConfig_(prealloc),
0503         pathNames_(&tns.getTrigPaths()),
0504         endPathNames_(&tns.getEndPaths()),
0505         wantSummary_(tns.wantSummary()) {
0506     makePathStatusInserters(pathStatusInserters_,
0507                             *pathNames_,
0508                             prealloc,
0509                             preg,
0510                             areg,
0511                             processConfiguration,
0512                             std::string("PathStatusInserter"));
0513 
0514     if (endPathNames_->size() > 1) {
0515       makePathStatusInserters(endPathStatusInserters_,
0516                               *endPathNames_,
0517                               prealloc,
0518                               preg,
0519                               areg,
0520                               processConfiguration,
0521                               std::string("EndPathStatusInserter"));
0522     }
0523 
0524     assert(0 < prealloc.numberOfStreams());
0525     streamSchedules_.reserve(prealloc.numberOfStreams());
0526     for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0527       streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
0528                                                                                pathStatusInserters_,
0529                                                                                endPathStatusInserters_,
0530                                                                                moduleRegistry(),
0531                                                                                proc_pset,
0532                                                                                tns,
0533                                                                                prealloc,
0534                                                                                preg,
0535                                                                                actions,
0536                                                                                areg,
0537                                                                                processConfiguration,
0538                                                                                StreamID{i},
0539                                                                                processContext));
0540     }
0541 
0542     //TriggerResults are injected automatically by StreamSchedules and are
0543     // unknown to the ModuleRegistry
0544     const std::string kTriggerResults("TriggerResults");
0545     std::vector<std::string> modulesToUse;
0546     modulesToUse.reserve(streamSchedules_[0]->allWorkersLumisAndEvents().size());
0547     for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
0548       if (worker->description()->moduleLabel() != kTriggerResults) {
0549         modulesToUse.push_back(worker->description()->moduleLabel());
0550       }
0551     }
0552     //The unscheduled modules are at the end of the list, but we want them at the front
0553     unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
0554     if (nUnscheduledModules > 0) {
0555       std::vector<std::string> temp;
0556       temp.reserve(modulesToUse.size());
0557       auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
0558       std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
0559       std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
0560       temp.swap(modulesToUse);
0561     }
0562 
0563     // propagate_const<T> has no reset() function
0564     globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0565                                                        pathStatusInserters_,
0566                                                        endPathStatusInserters_,
0567                                                        moduleRegistry(),
0568                                                        modulesToUse,
0569                                                        proc_pset,
0570                                                        preg,
0571                                                        prealloc,
0572                                                        actions,
0573                                                        areg,
0574                                                        processConfiguration,
0575                                                        processContext);
0576   }
0577 
0578   void Schedule::finishSetup(ParameterSet& proc_pset,
0579                              service::TriggerNamesService const& tns,
0580                              SignallingProductRegistryFiller& preg,
0581                              BranchIDListHelper& branchIDListHelper,
0582                              ProcessBlockHelperBase& processBlockHelper,
0583                              ThinnedAssociationsHelper& thinnedAssociationsHelper,
0584                              std::shared_ptr<ActivityRegistry> areg,
0585                              std::shared_ptr<ProcessConfiguration> processConfiguration,
0586                              PreallocationConfiguration const& prealloc,
0587                              ProcessContext const* processContext) {
0588     //TriggerResults is not in the top level ParameterSet so the call to
0589     // reduceParameterSet would fail to find it. Just remove it up front.
0590     const std::string kTriggerResults("TriggerResults");
0591 
0592     std::set<std::string> usedModuleLabels;
0593     for (auto const& worker : allWorkers()) {
0594       if (worker->description()->moduleLabel() != kTriggerResults) {
0595         usedModuleLabels.insert(worker->description()->moduleLabel());
0596       }
0597     }
0598     std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0599     std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0600     reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0601     {
0602       std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0603       detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0604     }
0605 
0606     // At this point all ProductDescriptions are created. Mark now the
0607     // ones of unscheduled workers to be on-demand.
0608     {
0609       auto const& unsched = streamSchedules_[0]->unscheduledWorkersLumisAndEvents();
0610       if (not unsched.empty()) {
0611         std::set<std::string> unscheduledModules;
0612         std::transform(unsched.begin(),
0613                        unsched.end(),
0614                        std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
0615                        [](auto worker) { return worker->description()->moduleLabel(); });
0616         preg.setUnscheduledProducts(unscheduledModules);
0617       }
0618     }
0619 
0620     processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0621     proc_pset.registerIt();
0622     processConfiguration->setParameterSetID(proc_pset.id());
0623     processConfiguration->setProcessConfigurationID();
0624 
0625     // This is used for a little sanity-check to make sure no code
0626     // modifications alter the number of workers at a later date.
0627     size_t all_workers_count = allWorkers().size();
0628 
0629     moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0630       auto comm = iHolder->createOutputModuleCommunicator();
0631       if (comm) {
0632         all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0633       }
0634     });
0635     // Now that the output workers are filled in, set any output limits or information.
0636     limitOutput(proc_pset, branchIDListHelper.branchIDLists());
0637 
0638     // Sanity check: make sure nobody has added a worker after we've
0639     // already relied on the WorkerManager being full.
0640     assert(all_workers_count == allWorkers().size());
0641 
0642     branchIDListHelper.updateFromRegistry(preg.registry());
0643 
0644     for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
0645       worker->registerThinnedAssociations(preg.registry(), thinnedAssociationsHelper);
0646     }
0647 
0648     processBlockHelper.updateForNewProcess(preg.registry(), processConfiguration->processName());
0649 
0650     // The output modules consume products in kept branches.
0651     // So we must set this up before freezing.
0652     for (auto& c : all_output_communicators_) {
0653       c->selectProducts(preg.registry(), thinnedAssociationsHelper, processBlockHelper);
0654     }
0655 
0656     for (auto& product : preg.productListUpdator()) {
0657       setIsMergeable(product.second);
0658     }
0659 
0660     {
0661       // We now get a collection of types that may be consumed.
0662       std::set<TypeID> productTypesConsumed;
0663       std::set<TypeID> elementTypesConsumed;
0664       // Loop over all modules
0665       for (auto const& worker : allWorkers()) {
0666         for (auto const& consumesInfo : worker->moduleConsumesInfos()) {
0667           if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0668             productTypesConsumed.emplace(consumesInfo.type());
0669           } else {
0670             elementTypesConsumed.emplace(consumesInfo.type());
0671           }
0672         }
0673       }
0674       // The RandomNumberGeneratorService is not a module, yet it consumes.
0675       { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
0676       preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0677     }
0678 
0679     for (auto& c : all_output_communicators_) {
0680       c->setEventSelectionInfo(outputModulePathPositions, preg.registry().anyProductProduced());
0681     }
0682 
0683     if (wantSummary_) {
0684       std::vector<const ModuleDescription*> modDesc;
0685       const auto& workers = allWorkers();
0686       modDesc.reserve(workers.size());
0687 
0688       std::transform(workers.begin(),
0689                      workers.end(),
0690                      std::back_inserter(modDesc),
0691                      [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0692 
0693       // propagate_const<T> has no reset() function
0694       summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0695       auto timeKeeperPtr = summaryTimeKeeper_.get();
0696 
0697       areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0698 
0699       areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0700       areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0701       areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0702       areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0703       areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0704       areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0705 
0706       areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0707       areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0708 
0709       areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0710       areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0711 
0712       areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0713       areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0714       //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
0715       //timeKeeperPtr->startModuleEvent(iContext,iMod);
0716       //});
0717     }
0718 
0719   }  // Schedule::Schedule
0720 
0721   void Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
0722     std::string const output("output");
0723 
0724     ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0725     int maxEventSpecs = 0;
0726     int maxEventsOut = -1;
0727     ParameterSet const* vMaxEventsOut = nullptr;
0728     std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0729     if (search_all(intNamesE, output)) {
0730       maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0731       ++maxEventSpecs;
0732     }
0733     std::vector<std::string> psetNamesE;
0734     maxEventsPSet.getParameterSetNames(psetNamesE, false);
0735     if (search_all(psetNamesE, output)) {
0736       vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0737       ++maxEventSpecs;
0738     }
0739 
0740     if (maxEventSpecs > 1) {
0741       throw Exception(errors::Configuration)
0742           << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0743     }
0744 
0745     for (auto& c : all_output_communicators_) {
0746       OutputModuleDescription desc(branchIDLists, maxEventsOut);
0747       if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0748         std::string const& moduleLabel = c->description().moduleLabel();
0749         try {
0750           desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0751         } catch (Exception const&) {
0752           throw Exception(errors::Configuration)
0753               << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0754         }
0755       }
0756       c->configure(desc);
0757     }
0758   }
0759 
0760   bool Schedule::terminate() const {
0761     if (all_output_communicators_.empty()) {
0762       return false;
0763     }
0764     for (auto& c : all_output_communicators_) {
0765       if (!c->limitReached()) {
0766         // Found an output module that has not reached output event count.
0767         return false;
0768       }
0769     }
0770     LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0771                                      << "has reached its configured limit.\n";
0772     return true;
0773   }
0774 
0775   void Schedule::endJob(ExceptionCollector& collector) {
0776     globalSchedule_->endJob(collector);
0777     if (collector.hasThrown()) {
0778       return;
0779     }
0780 
0781     if (wantSummary_) {
0782       try {
0783         convertException::wrap([this]() { sendFwkSummaryToMessageLogger(); });
0784       } catch (cms::Exception const& ex) {
0785         collector.addException(ex);
0786       }
0787     }
0788   }
0789 
0790   void Schedule::sendFwkSummaryToMessageLogger() const {
0791     //Function to loop over items in a container and periodically
0792     // flush to the message logger.
0793     auto logForEach = [](auto const& iContainer, auto iMessage) {
0794       auto logger = LogFwkVerbatim("FwkSummary");
0795       int count = 0;
0796       for (auto const& element : iContainer) {
0797         iMessage(logger, element);
0798         if (++count == 10) {
0799           logger = LogFwkVerbatim("FwkSummary");
0800           count = 0;
0801         } else {
0802           logger << "\n";
0803         }
0804       }
0805     };
0806 
0807     {
0808       TriggerReport tr;
0809       getTriggerReport(tr);
0810 
0811       // The trigger report (pass/fail etc.):
0812 
0813       LogFwkVerbatim("FwkSummary") << "";
0814       LogFwkVerbatim("FwkSummary") << "TrigReport "
0815                                    << "---------- Event  Summary ------------";
0816       if (!tr.trigPathSummaries.empty()) {
0817         LogFwkVerbatim("FwkSummary") << "TrigReport"
0818                                      << " Events total = " << tr.eventSummary.totalEvents
0819                                      << " passed = " << tr.eventSummary.totalEventsPassed
0820                                      << " failed = " << tr.eventSummary.totalEventsFailed << "";
0821       } else {
0822         LogFwkVerbatim("FwkSummary") << "TrigReport"
0823                                      << " Events total = " << tr.eventSummary.totalEvents
0824                                      << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0825       }
0826 
0827       LogFwkVerbatim("FwkSummary") << "";
0828       LogFwkVerbatim("FwkSummary") << "TrigReport "
0829                                    << "---------- Path   Summary ------------";
0830       LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0831                                    << " " << std::right << std::setw(10) << "Executed"
0832                                    << " " << std::right << std::setw(10) << "Passed"
0833                                    << " " << std::right << std::setw(10) << "Failed"
0834                                    << " " << std::right << std::setw(10) << "Error"
0835                                    << " "
0836                                    << "Name"
0837                                    << "";
0838       logForEach(tr.trigPathSummaries, [](auto& logger, auto const& p) {
0839         logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << p.bitPosition << " "
0840                << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0841                << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0842                << p.timesExcept << " " << p.name;
0843       });
0844 
0845       /*
0846       std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
0847       std::vector<int>::const_iterator epe = empty_trig_paths_.end();
0848       std::vector<std::string>::const_iterator  epn = empty_trig_path_names_.begin();
0849       for (; epi != epe; ++epi, ++epn) {
0850 
0851         LogFwkVerbatim("FwkSummary") << "TrigReport "
0852         << std::right << std::setw(5) << 1
0853         << std::right << std::setw(5) << *epi << " "
0854         << std::right << std::setw(10) << totalEvents() << " "
0855         << std::right << std::setw(10) << totalEvents() << " "
0856         << std::right << std::setw(10) << 0 << " "
0857         << std::right << std::setw(10) << 0 << " "
0858         << *epn << "";
0859       }
0860        */
0861 
0862       LogFwkVerbatim("FwkSummary") << "\n"
0863                                    << "TrigReport "
0864                                    << "-------End-Path   Summary ------------\n"
0865                                    << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0866                                    << " " << std::right << std::setw(10) << "Executed"
0867                                    << " " << std::right << std::setw(10) << "Passed"
0868                                    << " " << std::right << std::setw(10) << "Failed"
0869                                    << " " << std::right << std::setw(10) << "Error"
0870                                    << " "
0871                                    << "Name";
0872       logForEach(tr.endPathSummaries, [](auto& logger, auto const& p) {
0873         logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << p.bitPosition << " "
0874                << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0875                << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0876                << p.timesExcept << " " << p.name;
0877       });
0878 
0879       for (auto const& p : tr.trigPathSummaries) {
0880         LogFwkVerbatim("FwkSummary") << "\n"
0881                                      << "TrigReport "
0882                                      << "---------- Modules in Path: " << p.name << " ------------\n"
0883                                      << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0884                                      << " " << std::right << std::setw(10) << "Visited"
0885                                      << " " << std::right << std::setw(10) << "Passed"
0886                                      << " " << std::right << std::setw(10) << "Failed"
0887                                      << " " << std::right << std::setw(10) << "Error"
0888                                      << " "
0889                                      << "Name";
0890 
0891         logForEach(p.moduleInPathSummaries, [](auto& logger, auto const& mod) {
0892           logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << mod.bitPosition
0893                  << " " << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0894                  << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0895                  << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0896         });
0897       }
0898 
0899       for (auto const& p : tr.endPathSummaries) {
0900         LogFwkVerbatim("FwkSummary") << "\n"
0901                                      << "TrigReport "
0902                                      << "------ Modules in End-Path: " << p.name << " ------------\n"
0903                                      << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0904                                      << " " << std::right << std::setw(10) << "Visited"
0905                                      << " " << std::right << std::setw(10) << "Passed"
0906                                      << " " << std::right << std::setw(10) << "Failed"
0907                                      << " " << std::right << std::setw(10) << "Error"
0908                                      << " "
0909                                      << "Name";
0910 
0911         unsigned int bitpos = 0;
0912         logForEach(p.moduleInPathSummaries, [&bitpos](auto& logger, auto const& mod) {
0913           logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << bitpos << " "
0914                  << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0915                  << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0916                  << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0917           ++bitpos;
0918         });
0919       }
0920 
0921       LogFwkVerbatim("FwkSummary") << "\n"
0922                                    << "TrigReport "
0923                                    << "---------- Module Summary ------------\n"
0924                                    << "TrigReport " << std::right << std::setw(10) << "Visited"
0925                                    << " " << std::right << std::setw(10) << "Executed"
0926                                    << " " << std::right << std::setw(10) << "Passed"
0927                                    << " " << std::right << std::setw(10) << "Failed"
0928                                    << " " << std::right << std::setw(10) << "Error"
0929                                    << " "
0930                                    << "Name";
0931 
0932       logForEach(tr.workerSummaries, [](auto& logger, auto const& worker) {
0933         logger << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " " << std::right
0934                << std::setw(10) << worker.timesRun << " " << std::right << std::setw(10) << worker.timesPassed << " "
0935                << std::right << std::setw(10) << worker.timesFailed << " " << std::right << std::setw(10)
0936                << worker.timesExcept << " " << worker.moduleLabel;
0937       });
0938       LogFwkVerbatim("FwkSummary") << "";
0939     }
0940     // The timing report (CPU and Real Time):
0941     TriggerTimingReport tr;
0942     getTriggerTimingReport(tr);
0943 
0944     const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0945 
0946     LogFwkVerbatim("FwkSummary") << "TimeReport "
0947                                  << "---------- Event  Summary ---[sec]----";
0948     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0949                                  << "       event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0950     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0951                                  << "      event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0952     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0953                                  << "     sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0954     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0955                                  << " efficiency CPU/Real/thread = "
0956                                  << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0957                                         preallocConfig_.numberOfThreads();
0958 
0959     constexpr int kColumn1Size = 10;
0960     constexpr int kColumn2Size = 12;
0961     constexpr int kColumn3Size = 12;
0962     LogFwkVerbatim("FwkSummary") << "";
0963     LogFwkVerbatim("FwkSummary") << "TimeReport "
0964                                  << "---------- Path   Summary ---[Real sec]----";
0965     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0966                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0967                                  << "  Name";
0968     logForEach(tr.trigPathSummaries, [&](auto& logger, auto const& p) {
0969       const int timesRun = std::max(1, p.timesRun);
0970       logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0971              << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0972              << "  " << p.name;
0973     });
0974     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0975                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0976                                  << "  Name";
0977 
0978     LogFwkVerbatim("FwkSummary") << "\n"
0979                                  << "TimeReport "
0980                                  << "-------End-Path   Summary ---[Real sec]----\n"
0981                                  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0982                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0983                                  << "  Name";
0984     logForEach(tr.endPathSummaries, [&](auto& logger, auto const& p) {
0985       const int timesRun = std::max(1, p.timesRun);
0986 
0987       logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0988              << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0989              << "  " << p.name;
0990     });
0991     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0992                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0993                                  << "  Name";
0994 
0995     for (auto const& p : tr.trigPathSummaries) {
0996       LogFwkVerbatim("FwkSummary") << "\n"
0997                                    << "TimeReport "
0998                                    << "---------- Modules in Path: " << p.name << " ---[Real sec]----\n"
0999                                    << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1000                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
1001                                    << "  Name";
1002       logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1003         logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1004                << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1005                << mod.realTime / std::max(1, mod.timesVisited) << "  " << mod.moduleLabel;
1006       });
1007     }
1008     if (not tr.trigPathSummaries.empty()) {
1009       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1010                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
1011                                    << "  Name";
1012     }
1013     for (auto const& p : tr.endPathSummaries) {
1014       LogFwkVerbatim("FwkSummary") << "\n"
1015                                    << "TimeReport "
1016                                    << "------ Modules in End-Path: " << p.name << " ---[Real sec]----\n"
1017                                    << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1018                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
1019                                    << "  Name";
1020       logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1021         logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1022                << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1023                << mod.realTime / std::max(1, mod.timesVisited) << "  " << mod.moduleLabel;
1024       });
1025     }
1026     if (not tr.endPathSummaries.empty()) {
1027       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1028                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
1029                                    << "  Name";
1030     }
1031     LogFwkVerbatim("FwkSummary") << "\n"
1032                                  << "TimeReport "
1033                                  << "---------- Module Summary ---[Real sec]----\n"
1034                                  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1035                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1036                                  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1037                                  << "  Name";
1038     logForEach(tr.workerSummaries, [&](auto& logger, auto const& worker) {
1039       logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1040              << worker.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1041              << worker.realTime / std::max(1, worker.timesRun) << " " << std::right << std::setw(kColumn3Size)
1042              << worker.realTime / std::max(1, worker.timesVisited) << "  " << worker.moduleLabel;
1043     });
1044     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1045                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1046                                  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1047                                  << "  Name";
1048 
1049     LogFwkVerbatim("FwkSummary") << "\nT---Report end!\n\n";
1050   }
1051 
1052   void Schedule::closeOutputFiles() {
1053     using std::placeholders::_1;
1054     for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
1055     for (auto& worker : allWorkers()) {
1056       worker->respondToCloseOutputFile();
1057     }
1058   }
1059 
1060   void Schedule::openOutputFiles(FileBlock& fb) {
1061     using std::placeholders::_1;
1062     for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1063   }
1064 
1065   void Schedule::writeRunAsync(WaitingTaskHolder task,
1066                                RunPrincipal const& rp,
1067                                ProcessContext const* processContext,
1068                                ActivityRegistry* activityRegistry,
1069                                MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1070     auto token = ServiceRegistry::instance().presentToken();
1071     GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
1072                                 LuminosityBlockID(rp.run(), 0),
1073                                 rp.index(),
1074                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1075                                 rp.endTime(),
1076                                 processContext);
1077 
1078     using namespace edm::waiting_task;
1079     chain::first([&](auto nextTask) {
1080       //services can depend on other services
1081       ServiceRegistry::Operate op(token);
1082 
1083       // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1084       CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1085       }
1086       for (auto& c : all_output_communicators_) {
1087         c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1088       }
1089     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1090       //services can depend on other services
1091       ServiceRegistry::Operate op(token);
1092 
1093       activityRegistry->postGlobalWriteRunSignal_(globalContext);
1094     })) |
1095         chain::runLast(task);
1096   }
1097 
1098   void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
1099                                         ProcessBlockPrincipal const& pbp,
1100                                         ProcessContext const* processContext,
1101                                         ActivityRegistry* activityRegistry) {
1102     auto token = ServiceRegistry::instance().presentToken();
1103     GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
1104                                 LuminosityBlockID(),
1105                                 RunIndex::invalidRunIndex(),
1106                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1107                                 Timestamp::invalidTimestamp(),
1108                                 processContext);
1109 
1110     using namespace edm::waiting_task;
1111     chain::first([&](auto nextTask) {
1112       // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1113       ServiceRegistry::Operate op(token);
1114       CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1115       }
1116       for (auto& c : all_output_communicators_) {
1117         c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1118       }
1119     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1120       //services can depend on other services
1121       ServiceRegistry::Operate op(token);
1122 
1123       activityRegistry->postWriteProcessBlockSignal_(globalContext);
1124     })) |
1125         chain::runLast(std::move(task));
1126   }
1127 
1128   void Schedule::writeLumiAsync(WaitingTaskHolder task,
1129                                 LuminosityBlockPrincipal const& lbp,
1130                                 ProcessContext const* processContext,
1131                                 ActivityRegistry* activityRegistry) {
1132     auto token = ServiceRegistry::instance().presentToken();
1133     GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1134                                 lbp.id(),
1135                                 lbp.runPrincipal().index(),
1136                                 lbp.index(),
1137                                 lbp.beginTime(),
1138                                 processContext);
1139 
1140     using namespace edm::waiting_task;
1141     chain::first([&](auto nextTask) {
1142       ServiceRegistry::Operate op(token);
1143       CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1144       }
1145       for (auto& c : all_output_communicators_) {
1146         c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1147       }
1148     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1149       //services can depend on other services
1150       ServiceRegistry::Operate op(token);
1151 
1152       activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1153     })) |
1154         chain::runLast(task);
1155   }
1156 
1157   bool Schedule::shouldWeCloseOutput() const {
1158     using std::placeholders::_1;
1159     // Return true iff at least one output module returns true.
1160     return (std::find_if(all_output_communicators_.begin(),
1161                          all_output_communicators_.end(),
1162                          std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1163             all_output_communicators_.end());
1164   }
1165 
1166   void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1167     using std::placeholders::_1;
1168     for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1169   }
1170 
1171   void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1172     using std::placeholders::_1;
1173     for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1174   }
1175 
1176   void Schedule::beginJob(ProductRegistry const& iRegistry,
1177                           eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
1178                           ProcessBlockHelperBase const& processBlockHelperBase,
1179                           ProcessContext const& processContext) {
1180     globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase, processContext);
1181   }
1182 
1183   void Schedule::beginStream(unsigned int streamID) {
1184     assert(streamID < streamSchedules_.size());
1185     streamSchedules_[streamID]->beginStream();
1186   }
1187 
1188   void Schedule::endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept {
1189     assert(streamID < streamSchedules_.size());
1190     streamSchedules_[streamID]->endStream(collector, collectorMutex);
1191   }
1192 
1193   void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1194                                       unsigned int iStreamID,
1195                                       EventTransitionInfo& info,
1196                                       ServiceToken const& token) {
1197     assert(iStreamID < streamSchedules_.size());
1198     streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1199   }
1200 
1201   bool Schedule::changeModule(std::string const& iLabel,
1202                               ParameterSet const& iPSet,
1203                               const SignallingProductRegistryFiller& iRegistry,
1204                               eventsetup::ESRecordsToProductResolverIndices const& iIndices) {
1205     Worker* found = nullptr;
1206     for (auto const& worker : allWorkers()) {
1207       if (worker->description()->moduleLabel() == iLabel) {
1208         found = worker;
1209         break;
1210       }
1211     }
1212     if (nullptr == found) {
1213       return false;
1214     }
1215 
1216     auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1217 
1218     globalSchedule_->replaceModule(newMod, iLabel);
1219 
1220     for (auto& s : streamSchedules_) {
1221       s->replaceModule(newMod, iLabel);
1222     }
1223 
1224     {
1225       //Need to updateLookup in order to make getByToken work
1226       auto const processBlockLookup = iRegistry.registry().productLookup(InProcess);
1227       auto const runLookup = iRegistry.registry().productLookup(InRun);
1228       auto const lumiLookup = iRegistry.registry().productLookup(InLumi);
1229       auto const eventLookup = iRegistry.registry().productLookup(InEvent);
1230       found->updateLookup(InProcess, *runLookup);
1231       found->updateLookup(InRun, *runLookup);
1232       found->updateLookup(InLumi, *lumiLookup);
1233       found->updateLookup(InEvent, *eventLookup);
1234       found->updateLookup(iIndices);
1235 
1236       auto const& processName = newMod->moduleDescription().processName();
1237       auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1238       auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1239       auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1240       auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1241       found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1242       found->resolvePutIndicies(InRun, runModuleToIndicies);
1243       found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1244       found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1245     }
1246 
1247     return true;
1248   }
1249 
1250   void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1251     globalSchedule_->deleteModule(iLabel);
1252     for (auto& stream : streamSchedules_) {
1253       stream->deleteModule(iLabel);
1254     }
1255     moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1256   }
1257 
1258   void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1259                                        std::multimap<std::string, std::string> const& referencesToBranches,
1260                                        std::vector<std::string> const& modulesToSkip,
1261                                        edm::ProductRegistry const& preg) {
1262     for (auto& stream : streamSchedules_) {
1263       stream->initializeEarlyDelete(
1264           *moduleRegistry(), branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1265     }
1266   }
1267 
1268   std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1269     std::vector<ModuleDescription const*> result;
1270     result.reserve(allWorkers().size());
1271 
1272     for (auto const& worker : allWorkers()) {
1273       ModuleDescription const* p = worker->description();
1274       result.push_back(p);
1275     }
1276     return result;
1277   }
1278 
1279   Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1280 
1281   void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1282     for (auto const& worker : allWorkers()) {
1283       worker->convertCurrentProcessAlias(processName);
1284     }
1285   }
1286 
1287   void Schedule::releaseMemoryPostLookupSignal() { globalSchedule_->releaseMemoryPostLookupSignal(); }
1288 
1289   void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1290     streamSchedules_[0]->availablePaths(oLabelsToFill);
1291   }
1292 
1293   void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1294 
1295   void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1296 
1297   void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1298     streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1299   }
1300 
1301   void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1302                                           std::vector<ModuleDescription const*>& descriptions,
1303                                           unsigned int hint) const {
1304     streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1305   }
1306 
1307   void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1308                                              std::vector<ModuleDescription const*>& descriptions,
1309                                              unsigned int hint) const {
1310     streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1311   }
1312 
1313   void Schedule::getTriggerReport(TriggerReport& rep) const {
1314     rep.eventSummary.totalEvents = 0;
1315     rep.eventSummary.totalEventsPassed = 0;
1316     rep.eventSummary.totalEventsFailed = 0;
1317     for (auto& s : streamSchedules_) {
1318       s->getTriggerReport(rep);
1319     }
1320     sort_all(rep.workerSummaries);
1321   }
1322 
1323   void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1324     rep.eventSummary.totalEvents = 0;
1325     rep.eventSummary.cpuTime = 0.;
1326     rep.eventSummary.realTime = 0.;
1327     summaryTimeKeeper_->fillTriggerTimingReport(rep);
1328   }
1329 
1330   int Schedule::totalEvents() const {
1331     int returnValue = 0;
1332     for (auto& s : streamSchedules_) {
1333       returnValue += s->totalEvents();
1334     }
1335     return returnValue;
1336   }
1337 
1338   int Schedule::totalEventsPassed() const {
1339     int returnValue = 0;
1340     for (auto& s : streamSchedules_) {
1341       returnValue += s->totalEventsPassed();
1342     }
1343     return returnValue;
1344   }
1345 
1346   int Schedule::totalEventsFailed() const {
1347     int returnValue = 0;
1348     for (auto& s : streamSchedules_) {
1349       returnValue += s->totalEventsFailed();
1350     }
1351     return returnValue;
1352   }
1353 
1354   void Schedule::clearCounters() {
1355     for (auto& s : streamSchedules_) {
1356       s->clearCounters();
1357     }
1358   }
1359 }  // namespace edm