Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-06-03 00:59:03

0001 #include "FWCore/Framework/interface/Schedule.h"
0002 
0003 #include "DataFormats/Common/interface/setIsMergeable.h"
0004 #include "DataFormats/Common/interface/TriggerResults.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0008 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0011 #include "FWCore/Framework/interface/EDConsumerBase.h"
0012 #include "FWCore/Framework/src/OutputModuleDescription.h"
0013 #include "FWCore/Framework/interface/SubProcess.h"
0014 #include "FWCore/Framework/interface/TriggerNamesService.h"
0015 #include "FWCore/Framework/src/TriggerReport.h"
0016 #include "FWCore/Framework/src/TriggerTimingReport.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/src/Factory.h"
0019 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0020 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0021 #include "FWCore/Framework/interface/ModuleRegistry.h"
0022 #include "FWCore/Framework/src/TriggerResultInserter.h"
0023 #include "FWCore/Framework/src/PathStatusInserter.h"
0024 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0025 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0026 #include "FWCore/Concurrency/interface/chain_first.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0031 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0032 #include "FWCore/Utilities/interface/Algorithms.h"
0033 #include "FWCore/Utilities/interface/ConvertException.h"
0034 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0035 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0036 #include "FWCore/Utilities/interface/TypeID.h"
0037 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0038 
0039 #include <array>
0040 #include <algorithm>
0041 #include <cassert>
0042 #include <cstdlib>
0043 #include <functional>
0044 #include <iomanip>
0045 #include <list>
0046 #include <map>
0047 #include <set>
0048 #include <exception>
0049 #include <sstream>
0050 
0051 #include "make_shared_noexcept_false.h"
0052 #include "processEDAliases.h"
0053 
0054 namespace edm {
0055 
0056   class Maker;
0057 
0058   namespace {
0059     using std::placeholders::_1;
0060 
0061     bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0062       return std::binary_search(v.begin(), v.end(), s);
0063     }
0064 
0065     // Here we make the trigger results inserter directly.  This should
0066     // probably be a utility in the WorkerRegistry or elsewhere.
0067 
0068     std::shared_ptr<TriggerResultInserter> makeInserter(ParameterSet& proc_pset,
0069                                                         PreallocationConfiguration const& iPrealloc,
0070                                                         ProductRegistry& preg,
0071                                                         ExceptionToActionTable const& actions,
0072                                                         std::shared_ptr<ActivityRegistry> areg,
0073                                                         std::shared_ptr<ProcessConfiguration> processConfiguration) {
0074       ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
0075       trig_pset->registerIt();
0076 
0077       WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
0078       ModuleDescription md(trig_pset->id(),
0079                            "TriggerResultInserter",
0080                            "TriggerResults",
0081                            processConfiguration.get(),
0082                            ModuleDescription::getUniqueID());
0083 
0084       areg->preModuleConstructionSignal_(md);
0085       bool postCalled = false;
0086       std::shared_ptr<TriggerResultInserter> returnValue;
0087       // Caught exception is rethrown
0088       CMS_SA_ALLOW try {
0089         maker::ModuleHolderT<TriggerResultInserter> holder(
0090             make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),
0091             static_cast<Maker const*>(nullptr));
0092         holder.setModuleDescription(md);
0093         holder.registerProductsAndCallbacks(&preg);
0094         returnValue = holder.module();
0095         postCalled = true;
0096         // if exception then post will be called in the catch block
0097         areg->postModuleConstructionSignal_(md);
0098       } catch (...) {
0099         if (!postCalled) {
0100           CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0101             // If post throws an exception ignore it because we are already handling another exception
0102           }
0103         }
0104         throw;
0105       }
0106       return returnValue;
0107     }
0108 
0109     template <typename T>
0110     void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
0111                                  std::vector<std::string> const& pathNames,
0112                                  PreallocationConfiguration const& iPrealloc,
0113                                  ProductRegistry& preg,
0114                                  std::shared_ptr<ActivityRegistry> areg,
0115                                  std::shared_ptr<ProcessConfiguration> processConfiguration,
0116                                  std::string const& moduleTypeName) {
0117       ParameterSet pset;
0118       pset.addParameter<std::string>("@module_type", moduleTypeName);
0119       pset.addParameter<std::string>("@module_edm_type", "EDProducer");
0120       pset.registerIt();
0121 
0122       pathStatusInserters.reserve(pathNames.size());
0123 
0124       for (auto const& pathName : pathNames) {
0125         ModuleDescription md(
0126             pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
0127 
0128         areg->preModuleConstructionSignal_(md);
0129         bool postCalled = false;
0130         // Caught exception is rethrown
0131         CMS_SA_ALLOW try {
0132           maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
0133                                          static_cast<Maker const*>(nullptr));
0134           holder.setModuleDescription(md);
0135           holder.registerProductsAndCallbacks(&preg);
0136           pathStatusInserters.emplace_back(holder.module());
0137           postCalled = true;
0138           // if exception then post will be called in the catch block
0139           areg->postModuleConstructionSignal_(md);
0140         } catch (...) {
0141           if (!postCalled) {
0142             CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0143               // If post throws an exception ignore it because we are already handling another exception
0144             }
0145           }
0146           throw;
0147         }
0148       }
0149     }
0150 
0151     typedef std::vector<std::string> vstring;
0152 
0153     void processSwitchProducers(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
0154       // Update Switch BranchDescriptions for the chosen case
0155       struct BranchesCases {
0156         BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0157         std::vector<BranchKey> chosenBranches;
0158         std::vector<std::string> caseLabels;
0159       };
0160       std::map<std::string, BranchesCases> switchMap;
0161       for (auto& prod : preg.productListUpdator()) {
0162         if (prod.second.isSwitchAlias()) {
0163           auto it = switchMap.find(prod.second.moduleLabel());
0164           if (it == switchMap.end()) {
0165             auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0166             auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0167                                               switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0168             assert(inserted.second);
0169             it = inserted.first;
0170           }
0171 
0172           bool found = false;
0173           for (auto const& productIter : preg.productList()) {
0174             BranchKey const& branchKey = productIter.first;
0175             // The alias-for product must be in the same process as
0176             // the SwitchProducer (earlier processes or SubProcesses
0177             // may contain products with same type, module label, and
0178             // instance name)
0179             if (branchKey.processName() != processName) {
0180               continue;
0181             }
0182 
0183             BranchDescription const& desc = productIter.second;
0184             if (desc.branchType() == prod.second.branchType() and
0185                 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0186                 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0187                 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0188               prod.second.setSwitchAliasForBranch(desc);
0189               it->second.chosenBranches.push_back(prod.first);  // with moduleLabel of the Switch
0190               found = true;
0191             }
0192           }
0193           if (not found) {
0194             Exception ex(errors::LogicError);
0195             ex << "Trying to find a BranchDescription to be aliased-for by SwitchProducer with\n"
0196                << "  friendly class name = " << prod.second.friendlyClassName() << "\n"
0197                << "  module label = " << prod.second.moduleLabel() << "\n"
0198                << "  product instance name = " << prod.second.productInstanceName() << "\n"
0199                << "  process name = " << processName
0200                << "\n\nbut did not find any. Please contact a framework developer.";
0201             ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0202             throw ex;
0203           }
0204         }
0205       }
0206       if (switchMap.empty())
0207         return;
0208 
0209       for (auto& elem : switchMap) {
0210         std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0211       }
0212 
0213       auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0214         std::map<std::string, std::vector<BranchKey>> caseBranches;
0215         for (auto const& item : preg.productList()) {
0216           if (item.first.processName() != processName)
0217             continue;
0218 
0219           if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0220               found != caseLabels.end()) {
0221             caseBranches[*found].push_back(item.first);
0222           }
0223         }
0224 
0225         for (auto const& caseLabel : caseLabels) {
0226           ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0227           auto& branches = caseBranches[caseLabel];
0228           std::sort(branches.begin(), branches.end());
0229           for (auto const& branch : branches) {
0230             ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0231           }
0232         }
0233       };
0234 
0235       // Check that non-chosen cases declare exactly the same branches
0236       // Also set the alias-for branches to transient
0237       std::vector<bool> foundBranches;
0238       for (auto const& switchItem : switchMap) {
0239         auto const& switchLabel = switchItem.first;
0240         auto const& chosenBranches = switchItem.second.chosenBranches;
0241         auto const& caseLabels = switchItem.second.caseLabels;
0242         foundBranches.resize(chosenBranches.size());
0243         for (auto const& caseLabel : caseLabels) {
0244           std::fill(foundBranches.begin(), foundBranches.end(), false);
0245           for (auto& nonConstItem : preg.productListUpdator()) {
0246             auto const& item = nonConstItem;
0247             if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0248               // Set the alias-for branch as transient so it gets fully ignored in output.
0249               // I tried first to implicitly drop all branches with
0250               // '@' in ProductSelector, but that gave problems on
0251               // input (those branches would be implicitly dropped on
0252               // input as well, leading to the SwitchProducer branches
0253               // do be dropped as dependent ones, as the alias
0254               // detection logic in RootFile says that the
0255               // SwitchProducer branches are not alias branches)
0256               nonConstItem.second.setTransient(true);
0257 
0258               auto range = std::equal_range(chosenBranches.begin(),
0259                                             chosenBranches.end(),
0260                                             BranchKey(item.first.friendlyClassName(),
0261                                                       switchLabel,
0262                                                       item.first.productInstanceName(),
0263                                                       item.first.processName()));
0264               if (range.first == range.second) {
0265                 Exception ex(errors::Configuration);
0266                 ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0267                    << item.first << " that is not produced by the chosen case "
0268                    << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0269                           .getUntrackedParameter<std::string>("@chosen_case")
0270                    << ". If the intention is to produce only a subset of the products listed below, each case with "
0271                       "more products needs to be replaced with an EDAlias to only the necessary products, and the "
0272                       "EDProducer itself needs to be moved to a Task.\n\n";
0273                 addProductsToException(caseLabels, ex);
0274                 throw ex;
0275               }
0276               assert(std::distance(range.first, range.second) == 1);
0277               foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0278 
0279               // Check that there are no BranchAliases for any of the cases
0280               auto const& bd = item.second;
0281               if (not bd.branchAliases().empty()) {
0282                 auto ex = Exception(errors::UnimplementedFeature)
0283                           << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0284                              "aliases for SwitchProducer with label "
0285                           << switchLabel << " for case " << caseLabel << ":";
0286                 for (auto const& branchAlias : bd.branchAliases()) {
0287                   ex << " " << branchAlias;
0288                 }
0289                 throw ex;
0290               }
0291             }
0292           }
0293 
0294           for (size_t i = 0; i < chosenBranches.size(); i++) {
0295             if (not foundBranches[i]) {
0296               auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0297                                      .getUntrackedParameter<std::string>("@chosen_case");
0298               Exception ex(errors::Configuration);
0299               ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0300                  << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0301                  << chosenLabel
0302                  << ". If the intention is to produce only a subset of the products listed below, each case with more "
0303                     "products needs to be replaced with an EDAlias to only the necessary products, and the "
0304                     "EDProducer itself needs to be moved to a Task.\n\n";
0305               addProductsToException(caseLabels, ex);
0306               throw ex;
0307             }
0308           }
0309         }
0310       }
0311     }
0312 
0313     void reduceParameterSet(ParameterSet& proc_pset,
0314                             vstring const& end_path_name_list,
0315                             vstring& modulesInConfig,
0316                             std::set<std::string> const& usedModuleLabels,
0317                             std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0318       // Before calculating the ParameterSetID of the top level ParameterSet or
0319       // saving it in the registry drop from the top level ParameterSet all
0320       // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
0321       // production is not enabled also drop all the EDFilters and EDProducers
0322       // that are not scheduled. Drop the ParameterSet used to configure the module
0323       // itself. Also drop the other traces of these labels in the top level
0324       // ParameterSet: Remove that labels from @all_modules and from all the
0325       // end paths. If this makes any end paths empty, then remove the end path
0326       // name from @end_paths, and @paths.
0327 
0328       // First make a list of labels to drop
0329       vstring outputModuleLabels;
0330       std::string edmType;
0331       std::string const moduleEdmType("@module_edm_type");
0332       std::string const outputModule("OutputModule");
0333       std::string const edAnalyzer("EDAnalyzer");
0334       std::string const edFilter("EDFilter");
0335       std::string const edProducer("EDProducer");
0336 
0337       std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0338 
0339       //need a list of all modules on paths in order to determine
0340       // if an EDAnalyzer only appears on an end path
0341       vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0342       std::set<std::string> modulesOnPaths;
0343       {
0344         std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0345         for (auto const& endPath : end_path_name_list) {
0346           noEndPaths.erase(endPath);
0347         }
0348         {
0349           vstring labels;
0350           for (auto const& path : noEndPaths) {
0351             labels = proc_pset.getParameter<vstring>(path);
0352             modulesOnPaths.insert(labels.begin(), labels.end());
0353           }
0354         }
0355       }
0356       //Initially fill labelsToBeDropped with all module mentioned in
0357       // the configuration but which are not being used by the system
0358       std::vector<std::string> labelsToBeDropped;
0359       labelsToBeDropped.reserve(modulesInConfigSet.size());
0360       std::set_difference(modulesInConfigSet.begin(),
0361                           modulesInConfigSet.end(),
0362                           usedModuleLabels.begin(),
0363                           usedModuleLabels.end(),
0364                           std::back_inserter(labelsToBeDropped));
0365 
0366       const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0367       for (auto const& modLabel : usedModuleLabels) {
0368         // Do nothing for modules that do not have a ParameterSet. Modules of type
0369         // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
0370         if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0371           edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0372           if (edmType == outputModule) {
0373             outputModuleLabels.push_back(modLabel);
0374             labelsToBeDropped.push_back(modLabel);
0375           }
0376           if (edmType == edAnalyzer) {
0377             if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0378               labelsToBeDropped.push_back(modLabel);
0379             }
0380           }
0381         }
0382       }
0383       //labelsToBeDropped must be sorted
0384       std::inplace_merge(
0385           labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0386 
0387       // drop the parameter sets used to configure the modules
0388       for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0389 
0390       // drop the labels from @all_modules
0391       vstring::iterator endAfterRemove =
0392           std::remove_if(modulesInConfig.begin(),
0393                          modulesInConfig.end(),
0394                          std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0395       modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0396       proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0397 
0398       // drop the labels from all end paths
0399       vstring endPathsToBeDropped;
0400       vstring labels;
0401       for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0402            iEndPath != endEndPath;
0403            ++iEndPath) {
0404         labels = proc_pset.getParameter<vstring>(*iEndPath);
0405         vstring::iterator iSave = labels.begin();
0406         vstring::iterator iBegin = labels.begin();
0407 
0408         for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0409           if (binary_search_string(labelsToBeDropped, *iLabel)) {
0410             if (binary_search_string(outputModuleLabels, *iLabel)) {
0411               outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0412             }
0413           } else {
0414             if (iSave != iLabel) {
0415               iSave->swap(*iLabel);
0416             }
0417             ++iSave;
0418           }
0419         }
0420         labels.erase(iSave, labels.end());
0421         if (labels.empty()) {
0422           // remove empty end paths and save their names
0423           proc_pset.eraseSimpleParameter(*iEndPath);
0424           endPathsToBeDropped.push_back(*iEndPath);
0425         } else {
0426           proc_pset.addParameter<vstring>(*iEndPath, labels);
0427         }
0428       }
0429       sort_all(endPathsToBeDropped);
0430 
0431       // remove empty end paths from @paths
0432       endAfterRemove = std::remove_if(scheduledPaths.begin(),
0433                                       scheduledPaths.end(),
0434                                       std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0435       scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0436       proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0437 
0438       // remove empty end paths from @end_paths
0439       vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0440       endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0441                                       scheduledEndPaths.end(),
0442                                       std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0443       scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0444       proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0445     }
0446 
0447     class RngEDConsumer : public EDConsumerBase {
0448     public:
0449       explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
0450         Service<RandomNumberGenerator> rng;
0451         if (rng.isAvailable()) {
0452           rng->consumes(consumesCollector());
0453           for (auto const& consumesInfo : this->consumesInfo()) {
0454             typesConsumed.emplace(consumesInfo.type());
0455           }
0456         }
0457       }
0458     };
0459 
0460     template <typename F>
0461     auto doCleanup(F&& iF) {
0462       auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0463         CMS_SA_ALLOW try { f(); } catch (...) {
0464         }
0465         if (iPtr) {
0466           iTask.doneWaiting(*iPtr);
0467         }
0468       };
0469       return wrapped;
0470     }
0471   }  // namespace
0472   // -----------------------------
0473 
0474   typedef std::vector<std::string> vstring;
0475 
0476   // -----------------------------
0477 
0478   Schedule::Schedule(ParameterSet& proc_pset,
0479                      service::TriggerNamesService const& tns,
0480                      ProductRegistry& preg,
0481                      BranchIDListHelper& branchIDListHelper,
0482                      ProcessBlockHelperBase& processBlockHelper,
0483                      ThinnedAssociationsHelper& thinnedAssociationsHelper,
0484                      SubProcessParentageHelper const* subProcessParentageHelper,
0485                      ExceptionToActionTable const& actions,
0486                      std::shared_ptr<ActivityRegistry> areg,
0487                      std::shared_ptr<ProcessConfiguration> processConfiguration,
0488                      bool hasSubprocesses,
0489                      PreallocationConfiguration const& prealloc,
0490                      ProcessContext const* processContext)
0491       :  //Only create a resultsInserter if there is a trigger path
0492         resultsInserter_{tns.getTrigPaths().empty()
0493                              ? std::shared_ptr<TriggerResultInserter>{}
0494                              : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
0495         moduleRegistry_(new ModuleRegistry()),
0496         all_output_communicators_(),
0497         preallocConfig_(prealloc),
0498         pathNames_(&tns.getTrigPaths()),
0499         endPathNames_(&tns.getEndPaths()),
0500         wantSummary_(tns.wantSummary()) {
0501     makePathStatusInserters(pathStatusInserters_,
0502                             *pathNames_,
0503                             prealloc,
0504                             preg,
0505                             areg,
0506                             processConfiguration,
0507                             std::string("PathStatusInserter"));
0508 
0509     makePathStatusInserters(endPathStatusInserters_,
0510                             *endPathNames_,
0511                             prealloc,
0512                             preg,
0513                             areg,
0514                             processConfiguration,
0515                             std::string("EndPathStatusInserter"));
0516 
0517     assert(0 < prealloc.numberOfStreams());
0518     streamSchedules_.reserve(prealloc.numberOfStreams());
0519     for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0520       streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
0521                                                                                pathStatusInserters_,
0522                                                                                endPathStatusInserters_,
0523                                                                                moduleRegistry(),
0524                                                                                proc_pset,
0525                                                                                tns,
0526                                                                                prealloc,
0527                                                                                preg,
0528                                                                                branchIDListHelper,
0529                                                                                actions,
0530                                                                                areg,
0531                                                                                processConfiguration,
0532                                                                                StreamID{i},
0533                                                                                processContext));
0534     }
0535 
0536     //TriggerResults are injected automatically by StreamSchedules and are
0537     // unknown to the ModuleRegistry
0538     const std::string kTriggerResults("TriggerResults");
0539     std::vector<std::string> modulesToUse;
0540     modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
0541     for (auto const& worker : streamSchedules_[0]->allWorkers()) {
0542       if (worker->description()->moduleLabel() != kTriggerResults) {
0543         modulesToUse.push_back(worker->description()->moduleLabel());
0544       }
0545     }
0546     //The unscheduled modules are at the end of the list, but we want them at the front
0547     unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
0548     if (nUnscheduledModules > 0) {
0549       std::vector<std::string> temp;
0550       temp.reserve(modulesToUse.size());
0551       auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
0552       std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
0553       std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
0554       temp.swap(modulesToUse);
0555     }
0556 
0557     // propagate_const<T> has no reset() function
0558     globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0559                                                        pathStatusInserters_,
0560                                                        endPathStatusInserters_,
0561                                                        moduleRegistry(),
0562                                                        modulesToUse,
0563                                                        proc_pset,
0564                                                        preg,
0565                                                        prealloc,
0566                                                        actions,
0567                                                        areg,
0568                                                        processConfiguration,
0569                                                        processContext);
0570 
0571     //TriggerResults is not in the top level ParameterSet so the call to
0572     // reduceParameterSet would fail to find it. Just remove it up front.
0573     std::set<std::string> usedModuleLabels;
0574     for (auto const& worker : allWorkers()) {
0575       if (worker->description()->moduleLabel() != kTriggerResults) {
0576         usedModuleLabels.insert(worker->description()->moduleLabel());
0577       }
0578     }
0579     std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0580     std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0581     reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0582     {
0583       std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0584       detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0585     }
0586 
0587     // At this point all BranchDescriptions are created. Mark now the
0588     // ones of unscheduled workers to be on-demand.
0589     if (nUnscheduledModules > 0) {
0590       std::set<std::string> unscheduledModules(modulesToUse.begin(), modulesToUse.begin() + nUnscheduledModules);
0591       preg.setUnscheduledProducts(unscheduledModules);
0592     }
0593 
0594     processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0595     proc_pset.registerIt();
0596     processConfiguration->setParameterSetID(proc_pset.id());
0597     processConfiguration->setProcessConfigurationID();
0598 
0599     // This is used for a little sanity-check to make sure no code
0600     // modifications alter the number of workers at a later date.
0601     size_t all_workers_count = allWorkers().size();
0602 
0603     moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0604       auto comm = iHolder->createOutputModuleCommunicator();
0605       if (comm) {
0606         all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0607       }
0608     });
0609     // Now that the output workers are filled in, set any output limits or information.
0610     limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
0611 
0612     // Sanity check: make sure nobody has added a worker after we've
0613     // already relied on the WorkerManager being full.
0614     assert(all_workers_count == allWorkers().size());
0615 
0616     branchIDListHelper.updateFromRegistry(preg);
0617 
0618     for (auto const& worker : streamSchedules_[0]->allWorkers()) {
0619       worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
0620     }
0621 
0622     processBlockHelper.updateForNewProcess(preg, processConfiguration->processName());
0623 
0624     // The output modules consume products in kept branches.
0625     // So we must set this up before freezing.
0626     for (auto& c : all_output_communicators_) {
0627       c->selectProducts(preg, thinnedAssociationsHelper, processBlockHelper);
0628     }
0629 
0630     for (auto& product : preg.productListUpdator()) {
0631       setIsMergeable(product.second);
0632     }
0633 
0634     {
0635       // We now get a collection of types that may be consumed.
0636       std::set<TypeID> productTypesConsumed;
0637       std::set<TypeID> elementTypesConsumed;
0638       // Loop over all modules
0639       for (auto const& worker : allWorkers()) {
0640         for (auto const& consumesInfo : worker->consumesInfo()) {
0641           if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0642             productTypesConsumed.emplace(consumesInfo.type());
0643           } else {
0644             elementTypesConsumed.emplace(consumesInfo.type());
0645           }
0646         }
0647       }
0648       // The SubProcess class is not a module, yet it may consume.
0649       if (hasSubprocesses) {
0650         productTypesConsumed.emplace(typeid(TriggerResults));
0651       }
0652       // The RandomNumberGeneratorService is not a module, yet it consumes.
0653       { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
0654       preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0655     }
0656 
0657     for (auto& c : all_output_communicators_) {
0658       c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
0659     }
0660 
0661     if (wantSummary_) {
0662       std::vector<const ModuleDescription*> modDesc;
0663       const auto& workers = allWorkers();
0664       modDesc.reserve(workers.size());
0665 
0666       std::transform(workers.begin(),
0667                      workers.end(),
0668                      std::back_inserter(modDesc),
0669                      [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0670 
0671       // propagate_const<T> has no reset() function
0672       summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0673       auto timeKeeperPtr = summaryTimeKeeper_.get();
0674 
0675       areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0676 
0677       areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0678       areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0679       areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0680       areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0681       areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0682       areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0683 
0684       areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0685       areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0686 
0687       areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0688       areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0689 
0690       areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0691       areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0692       //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
0693       //timeKeeperPtr->startModuleEvent(iContext,iMod);
0694       //});
0695     }
0696 
0697   }  // Schedule::Schedule
0698 
0699   void Schedule::limitOutput(ParameterSet const& proc_pset,
0700                              BranchIDLists const& branchIDLists,
0701                              SubProcessParentageHelper const* subProcessParentageHelper) {
0702     std::string const output("output");
0703 
0704     ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0705     int maxEventSpecs = 0;
0706     int maxEventsOut = -1;
0707     ParameterSet const* vMaxEventsOut = nullptr;
0708     std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0709     if (search_all(intNamesE, output)) {
0710       maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0711       ++maxEventSpecs;
0712     }
0713     std::vector<std::string> psetNamesE;
0714     maxEventsPSet.getParameterSetNames(psetNamesE, false);
0715     if (search_all(psetNamesE, output)) {
0716       vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0717       ++maxEventSpecs;
0718     }
0719 
0720     if (maxEventSpecs > 1) {
0721       throw Exception(errors::Configuration)
0722           << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0723     }
0724 
0725     for (auto& c : all_output_communicators_) {
0726       OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
0727       if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0728         std::string const& moduleLabel = c->description().moduleLabel();
0729         try {
0730           desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0731         } catch (Exception const&) {
0732           throw Exception(errors::Configuration)
0733               << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0734         }
0735       }
0736       c->configure(desc);
0737     }
0738   }
0739 
0740   bool Schedule::terminate() const {
0741     if (all_output_communicators_.empty()) {
0742       return false;
0743     }
0744     for (auto& c : all_output_communicators_) {
0745       if (!c->limitReached()) {
0746         // Found an output module that has not reached output event count.
0747         return false;
0748       }
0749     }
0750     LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0751                                      << "has reached its configured limit.\n";
0752     return true;
0753   }
0754 
0755   void Schedule::endJob(ExceptionCollector& collector) {
0756     globalSchedule_->endJob(collector);
0757     if (collector.hasThrown()) {
0758       return;
0759     }
0760 
0761     if (wantSummary_ == false)
0762       return;
0763     {
0764       TriggerReport tr;
0765       getTriggerReport(tr);
0766 
0767       // The trigger report (pass/fail etc.):
0768 
0769       LogFwkVerbatim("FwkSummary") << "";
0770       if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
0771         LogFwkVerbatim("FwkSummary") << "TrigReport Process: "
0772                                      << streamSchedules_[0]->context().processContext()->processName();
0773       }
0774       LogFwkVerbatim("FwkSummary") << "TrigReport "
0775                                    << "---------- Event  Summary ------------";
0776       if (!tr.trigPathSummaries.empty()) {
0777         LogFwkVerbatim("FwkSummary") << "TrigReport"
0778                                      << " Events total = " << tr.eventSummary.totalEvents
0779                                      << " passed = " << tr.eventSummary.totalEventsPassed
0780                                      << " failed = " << tr.eventSummary.totalEventsFailed << "";
0781       } else {
0782         LogFwkVerbatim("FwkSummary") << "TrigReport"
0783                                      << " Events total = " << tr.eventSummary.totalEvents
0784                                      << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0785       }
0786 
0787       LogFwkVerbatim("FwkSummary") << "";
0788       LogFwkVerbatim("FwkSummary") << "TrigReport "
0789                                    << "---------- Path   Summary ------------";
0790       LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0791                                    << " " << std::right << std::setw(10) << "Executed"
0792                                    << " " << std::right << std::setw(10) << "Passed"
0793                                    << " " << std::right << std::setw(10) << "Failed"
0794                                    << " " << std::right << std::setw(10) << "Error"
0795                                    << " "
0796                                    << "Name"
0797                                    << "";
0798       for (auto const& p : tr.trigPathSummaries) {
0799         LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
0800                                      << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
0801                                      << std::right << std::setw(10) << p.timesPassed << " " << std::right
0802                                      << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0803                                      << p.timesExcept << " " << p.name << "";
0804       }
0805 
0806       /*
0807       std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
0808       std::vector<int>::const_iterator epe = empty_trig_paths_.end();
0809       std::vector<std::string>::const_iterator  epn = empty_trig_path_names_.begin();
0810       for (; epi != epe; ++epi, ++epn) {
0811 
0812         LogFwkVerbatim("FwkSummary") << "TrigReport "
0813         << std::right << std::setw(5) << 1
0814         << std::right << std::setw(5) << *epi << " "
0815         << std::right << std::setw(10) << totalEvents() << " "
0816         << std::right << std::setw(10) << totalEvents() << " "
0817         << std::right << std::setw(10) << 0 << " "
0818         << std::right << std::setw(10) << 0 << " "
0819         << *epn << "";
0820       }
0821        */
0822 
0823       LogFwkVerbatim("FwkSummary") << "";
0824       LogFwkVerbatim("FwkSummary") << "TrigReport "
0825                                    << "-------End-Path   Summary ------------";
0826       LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0827                                    << " " << std::right << std::setw(10) << "Executed"
0828                                    << " " << std::right << std::setw(10) << "Passed"
0829                                    << " " << std::right << std::setw(10) << "Failed"
0830                                    << " " << std::right << std::setw(10) << "Error"
0831                                    << " "
0832                                    << "Name"
0833                                    << "";
0834       for (auto const& p : tr.endPathSummaries) {
0835         LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
0836                                      << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
0837                                      << std::right << std::setw(10) << p.timesPassed << " " << std::right
0838                                      << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0839                                      << p.timesExcept << " " << p.name << "";
0840       }
0841 
0842       for (auto const& p : tr.trigPathSummaries) {
0843         LogFwkVerbatim("FwkSummary") << "";
0844         LogFwkVerbatim("FwkSummary") << "TrigReport "
0845                                      << "---------- Modules in Path: " << p.name << " ------------";
0846         LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0847                                      << " " << std::right << std::setw(10) << "Visited"
0848                                      << " " << std::right << std::setw(10) << "Passed"
0849                                      << " " << std::right << std::setw(10) << "Failed"
0850                                      << " " << std::right << std::setw(10) << "Error"
0851                                      << " "
0852                                      << "Name"
0853                                      << "";
0854 
0855         for (auto const& mod : p.moduleInPathSummaries) {
0856           LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
0857                                        << mod.bitPosition << " " << std::right << std::setw(10) << mod.timesVisited
0858                                        << " " << std::right << std::setw(10) << mod.timesPassed << " " << std::right
0859                                        << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
0860                                        << mod.timesExcept << " " << mod.moduleLabel << "";
0861         }
0862       }
0863 
0864       for (auto const& p : tr.endPathSummaries) {
0865         LogFwkVerbatim("FwkSummary") << "";
0866         LogFwkVerbatim("FwkSummary") << "TrigReport "
0867                                      << "------ Modules in End-Path: " << p.name << " ------------";
0868         LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0869                                      << " " << std::right << std::setw(10) << "Visited"
0870                                      << " " << std::right << std::setw(10) << "Passed"
0871                                      << " " << std::right << std::setw(10) << "Failed"
0872                                      << " " << std::right << std::setw(10) << "Error"
0873                                      << " "
0874                                      << "Name"
0875                                      << "";
0876 
0877         unsigned int bitpos = 0;
0878         for (auto const& mod : p.moduleInPathSummaries) {
0879           LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
0880                                        << bitpos << " " << std::right << std::setw(10) << mod.timesVisited << " "
0881                                        << std::right << std::setw(10) << mod.timesPassed << " " << std::right
0882                                        << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
0883                                        << mod.timesExcept << " " << mod.moduleLabel << "";
0884           ++bitpos;
0885         }
0886       }
0887 
0888       LogFwkVerbatim("FwkSummary") << "";
0889       LogFwkVerbatim("FwkSummary") << "TrigReport "
0890                                    << "---------- Module Summary ------------";
0891       LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Visited"
0892                                    << " " << std::right << std::setw(10) << "Executed"
0893                                    << " " << std::right << std::setw(10) << "Passed"
0894                                    << " " << std::right << std::setw(10) << "Failed"
0895                                    << " " << std::right << std::setw(10) << "Error"
0896                                    << " "
0897                                    << "Name"
0898                                    << "";
0899       for (auto const& worker : tr.workerSummaries) {
0900         LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " "
0901                                      << std::right << std::setw(10) << worker.timesRun << " " << std::right
0902                                      << std::setw(10) << worker.timesPassed << " " << std::right << std::setw(10)
0903                                      << worker.timesFailed << " " << std::right << std::setw(10) << worker.timesExcept
0904                                      << " " << worker.moduleLabel << "";
0905       }
0906       LogFwkVerbatim("FwkSummary") << "";
0907     }
0908     // The timing report (CPU and Real Time):
0909     TriggerTimingReport tr;
0910     getTriggerTimingReport(tr);
0911 
0912     const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0913 
0914     LogFwkVerbatim("FwkSummary") << "TimeReport "
0915                                  << "---------- Event  Summary ---[sec]----";
0916     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0917                                  << "       event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0918     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0919                                  << "      event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0920     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0921                                  << "     sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0922     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0923                                  << " efficiency CPU/Real/thread = "
0924                                  << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0925                                         preallocConfig_.numberOfThreads();
0926 
0927     constexpr int kColumn1Size = 10;
0928     constexpr int kColumn2Size = 12;
0929     constexpr int kColumn3Size = 12;
0930     LogFwkVerbatim("FwkSummary") << "";
0931     LogFwkVerbatim("FwkSummary") << "TimeReport "
0932                                  << "---------- Path   Summary ---[Real sec]----";
0933     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0934                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0935                                  << "  Name";
0936     for (auto const& p : tr.trigPathSummaries) {
0937       const int timesRun = std::max(1, p.timesRun);
0938       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0939                                    << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
0940                                    << std::setw(kColumn2Size) << p.realTime / timesRun << "  " << p.name << "";
0941     }
0942     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0943                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0944                                  << "  Name"
0945                                  << "";
0946 
0947     LogFwkVerbatim("FwkSummary") << "";
0948     LogFwkVerbatim("FwkSummary") << "TimeReport "
0949                                  << "-------End-Path   Summary ---[Real sec]----";
0950     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0951                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0952                                  << "  Name"
0953                                  << "";
0954     for (auto const& p : tr.endPathSummaries) {
0955       const int timesRun = std::max(1, p.timesRun);
0956 
0957       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0958                                    << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
0959                                    << std::setw(kColumn2Size) << p.realTime / timesRun << "  " << p.name << "";
0960     }
0961     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0962                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0963                                  << "  Name"
0964                                  << "";
0965 
0966     for (auto const& p : tr.trigPathSummaries) {
0967       LogFwkVerbatim("FwkSummary") << "";
0968       LogFwkVerbatim("FwkSummary") << "TimeReport "
0969                                    << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
0970       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0971                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
0972                                    << "  Name"
0973                                    << "";
0974       for (auto const& mod : p.moduleInPathSummaries) {
0975         LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0976                                      << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
0977                                      << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << "  "
0978                                      << mod.moduleLabel << "";
0979       }
0980     }
0981     if (not tr.trigPathSummaries.empty()) {
0982       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0983                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
0984                                    << "  Name"
0985                                    << "";
0986     }
0987     for (auto const& p : tr.endPathSummaries) {
0988       LogFwkVerbatim("FwkSummary") << "";
0989       LogFwkVerbatim("FwkSummary") << "TimeReport "
0990                                    << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
0991       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0992                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
0993                                    << "  Name"
0994                                    << "";
0995       for (auto const& mod : p.moduleInPathSummaries) {
0996         LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0997                                      << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
0998                                      << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << "  "
0999                                      << mod.moduleLabel << "";
1000       }
1001     }
1002     if (not tr.endPathSummaries.empty()) {
1003       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1004                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
1005                                    << "  Name"
1006                                    << "";
1007     }
1008     LogFwkVerbatim("FwkSummary") << "";
1009     LogFwkVerbatim("FwkSummary") << "TimeReport "
1010                                  << "---------- Module Summary ---[Real sec]----";
1011     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1012                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1013                                  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1014                                  << "  Name"
1015                                  << "";
1016     for (auto const& worker : tr.workerSummaries) {
1017       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1018                                    << std::setw(kColumn1Size) << worker.realTime / totalEvents << " " << std::right
1019                                    << std::setw(kColumn2Size) << worker.realTime / std::max(1, worker.timesRun) << " "
1020                                    << std::right << std::setw(kColumn3Size)
1021                                    << worker.realTime / std::max(1, worker.timesVisited) << "  " << worker.moduleLabel
1022                                    << "";
1023     }
1024     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1025                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1026                                  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1027                                  << "  Name"
1028                                  << "";
1029 
1030     LogFwkVerbatim("FwkSummary") << "";
1031     LogFwkVerbatim("FwkSummary") << "T---Report end!"
1032                                  << "";
1033     LogFwkVerbatim("FwkSummary") << "";
1034   }
1035 
1036   void Schedule::closeOutputFiles() {
1037     using std::placeholders::_1;
1038     for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
1039     for (auto& worker : allWorkers()) {
1040       worker->respondToCloseOutputFile();
1041     }
1042   }
1043 
1044   void Schedule::openOutputFiles(FileBlock& fb) {
1045     using std::placeholders::_1;
1046     for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1047   }
1048 
1049   void Schedule::writeRunAsync(WaitingTaskHolder task,
1050                                RunPrincipal const& rp,
1051                                ProcessContext const* processContext,
1052                                ActivityRegistry* activityRegistry,
1053                                MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1054     auto token = ServiceRegistry::instance().presentToken();
1055     GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
1056                                 LuminosityBlockID(rp.run(), 0),
1057                                 rp.index(),
1058                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1059                                 rp.endTime(),
1060                                 processContext);
1061 
1062     using namespace edm::waiting_task;
1063     chain::first([&](auto nextTask) {
1064       //services can depend on other services
1065       ServiceRegistry::Operate op(token);
1066 
1067       // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1068       CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1069       }
1070       for (auto& c : all_output_communicators_) {
1071         c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1072       }
1073     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1074       //services can depend on other services
1075       ServiceRegistry::Operate op(token);
1076 
1077       activityRegistry->postGlobalWriteRunSignal_(globalContext);
1078     })) |
1079         chain::runLast(task);
1080   }
1081 
1082   void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
1083                                         ProcessBlockPrincipal const& pbp,
1084                                         ProcessContext const* processContext,
1085                                         ActivityRegistry* activityRegistry) {
1086     auto token = ServiceRegistry::instance().presentToken();
1087     GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
1088                                 LuminosityBlockID(),
1089                                 RunIndex::invalidRunIndex(),
1090                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1091                                 Timestamp::invalidTimestamp(),
1092                                 processContext);
1093 
1094     using namespace edm::waiting_task;
1095     chain::first([&](auto nextTask) {
1096       // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1097       ServiceRegistry::Operate op(token);
1098       CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1099       }
1100       for (auto& c : all_output_communicators_) {
1101         c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1102       }
1103     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1104       //services can depend on other services
1105       ServiceRegistry::Operate op(token);
1106 
1107       activityRegistry->postWriteProcessBlockSignal_(globalContext);
1108     })) |
1109         chain::runLast(std::move(task));
1110   }
1111 
1112   void Schedule::writeLumiAsync(WaitingTaskHolder task,
1113                                 LuminosityBlockPrincipal const& lbp,
1114                                 ProcessContext const* processContext,
1115                                 ActivityRegistry* activityRegistry) {
1116     auto token = ServiceRegistry::instance().presentToken();
1117     GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1118                                 lbp.id(),
1119                                 lbp.runPrincipal().index(),
1120                                 lbp.index(),
1121                                 lbp.beginTime(),
1122                                 processContext);
1123 
1124     using namespace edm::waiting_task;
1125     chain::first([&](auto nextTask) {
1126       ServiceRegistry::Operate op(token);
1127       CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1128       }
1129       for (auto& c : all_output_communicators_) {
1130         c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1131       }
1132     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1133       //services can depend on other services
1134       ServiceRegistry::Operate op(token);
1135 
1136       activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1137     })) |
1138         chain::runLast(task);
1139   }
1140 
1141   bool Schedule::shouldWeCloseOutput() const {
1142     using std::placeholders::_1;
1143     // Return true iff at least one output module returns true.
1144     return (std::find_if(all_output_communicators_.begin(),
1145                          all_output_communicators_.end(),
1146                          std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1147             all_output_communicators_.end());
1148   }
1149 
1150   void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1151     using std::placeholders::_1;
1152     for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1153   }
1154 
1155   void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1156     using std::placeholders::_1;
1157     for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1158   }
1159 
1160   void Schedule::beginJob(ProductRegistry const& iRegistry,
1161                           eventsetup::ESRecordsToProxyIndices const& iESIndices,
1162                           ProcessBlockHelperBase const& processBlockHelperBase) {
1163     globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase);
1164   }
1165 
1166   void Schedule::beginStream(unsigned int iStreamID) {
1167     assert(iStreamID < streamSchedules_.size());
1168     streamSchedules_[iStreamID]->beginStream();
1169   }
1170 
1171   void Schedule::endStream(unsigned int iStreamID) {
1172     assert(iStreamID < streamSchedules_.size());
1173     streamSchedules_[iStreamID]->endStream();
1174   }
1175 
1176   void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1177                                       unsigned int iStreamID,
1178                                       EventTransitionInfo& info,
1179                                       ServiceToken const& token) {
1180     assert(iStreamID < streamSchedules_.size());
1181     streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1182   }
1183 
1184   bool Schedule::changeModule(std::string const& iLabel,
1185                               ParameterSet const& iPSet,
1186                               const ProductRegistry& iRegistry,
1187                               eventsetup::ESRecordsToProxyIndices const& iIndices) {
1188     Worker* found = nullptr;
1189     for (auto const& worker : allWorkers()) {
1190       if (worker->description()->moduleLabel() == iLabel) {
1191         found = worker;
1192         break;
1193       }
1194     }
1195     if (nullptr == found) {
1196       return false;
1197     }
1198 
1199     auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1200 
1201     globalSchedule_->replaceModule(newMod, iLabel);
1202 
1203     for (auto& s : streamSchedules_) {
1204       s->replaceModule(newMod, iLabel);
1205     }
1206 
1207     {
1208       //Need to updateLookup in order to make getByToken work
1209       auto const processBlockLookup = iRegistry.productLookup(InProcess);
1210       auto const runLookup = iRegistry.productLookup(InRun);
1211       auto const lumiLookup = iRegistry.productLookup(InLumi);
1212       auto const eventLookup = iRegistry.productLookup(InEvent);
1213       found->updateLookup(InProcess, *runLookup);
1214       found->updateLookup(InRun, *runLookup);
1215       found->updateLookup(InLumi, *lumiLookup);
1216       found->updateLookup(InEvent, *eventLookup);
1217       found->updateLookup(iIndices);
1218 
1219       auto const& processName = newMod->moduleDescription().processName();
1220       auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1221       auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1222       auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1223       auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1224       found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1225       found->resolvePutIndicies(InRun, runModuleToIndicies);
1226       found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1227       found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1228     }
1229 
1230     return true;
1231   }
1232 
1233   void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1234     globalSchedule_->deleteModule(iLabel);
1235     for (auto& stream : streamSchedules_) {
1236       stream->deleteModule(iLabel);
1237     }
1238     moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1239   }
1240 
1241   void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1242                                        edm::ProductRegistry const& preg) {
1243     for (auto& stream : streamSchedules_) {
1244       stream->initializeEarlyDelete(*moduleRegistry(), branchesToDeleteEarly, preg);
1245     }
1246   }
1247 
1248   std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1249     std::vector<ModuleDescription const*> result;
1250     result.reserve(allWorkers().size());
1251 
1252     for (auto const& worker : allWorkers()) {
1253       ModuleDescription const* p = worker->description();
1254       result.push_back(p);
1255     }
1256     return result;
1257   }
1258 
1259   Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1260 
1261   void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1262     for (auto const& worker : allWorkers()) {
1263       worker->convertCurrentProcessAlias(processName);
1264     }
1265   }
1266 
1267   void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1268     streamSchedules_[0]->availablePaths(oLabelsToFill);
1269   }
1270 
1271   void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1272 
1273   void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1274 
1275   void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1276     streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1277   }
1278 
1279   void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1280                                           std::vector<ModuleDescription const*>& descriptions,
1281                                           unsigned int hint) const {
1282     streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1283   }
1284 
1285   void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1286                                              std::vector<ModuleDescription const*>& descriptions,
1287                                              unsigned int hint) const {
1288     streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1289   }
1290 
1291   void Schedule::fillModuleAndConsumesInfo(
1292       std::vector<ModuleDescription const*>& allModuleDescriptions,
1293       std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
1294       std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>& modulesWhoseProductsAreConsumedBy,
1295       std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
1296       ProductRegistry const& preg) const {
1297     allModuleDescriptions.clear();
1298     moduleIDToIndex.clear();
1299     for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1300       modulesWhoseProductsAreConsumedBy[iBranchType].clear();
1301     }
1302     modulesInPreviousProcessesWhoseProductsAreConsumedBy.clear();
1303 
1304     allModuleDescriptions.reserve(allWorkers().size());
1305     moduleIDToIndex.reserve(allWorkers().size());
1306     for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1307       modulesWhoseProductsAreConsumedBy[iBranchType].resize(allWorkers().size());
1308     }
1309     modulesInPreviousProcessesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1310 
1311     std::map<std::string, ModuleDescription const*> labelToDesc;
1312     unsigned int i = 0;
1313     for (auto const& worker : allWorkers()) {
1314       ModuleDescription const* p = worker->description();
1315       allModuleDescriptions.push_back(p);
1316       moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1317       labelToDesc[p->moduleLabel()] = p;
1318       ++i;
1319     }
1320     sort_all(moduleIDToIndex);
1321 
1322     i = 0;
1323     for (auto const& worker : allWorkers()) {
1324       std::array<std::vector<ModuleDescription const*>*, NumBranchTypes> modules;
1325       for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1326         modules[iBranchType] = &modulesWhoseProductsAreConsumedBy[iBranchType].at(i);
1327       }
1328 
1329       std::vector<ModuleProcessName>& modulesInPreviousProcesses =
1330           modulesInPreviousProcessesWhoseProductsAreConsumedBy.at(i);
1331       try {
1332         worker->modulesWhoseProductsAreConsumed(modules, modulesInPreviousProcesses, preg, labelToDesc);
1333       } catch (cms::Exception& ex) {
1334         ex.addContext("Calling Worker::modulesWhoseProductsAreConsumed() for module " +
1335                       worker->description()->moduleLabel());
1336         throw;
1337       }
1338       ++i;
1339     }
1340   }
1341 
1342   void Schedule::getTriggerReport(TriggerReport& rep) const {
1343     rep.eventSummary.totalEvents = 0;
1344     rep.eventSummary.totalEventsPassed = 0;
1345     rep.eventSummary.totalEventsFailed = 0;
1346     for (auto& s : streamSchedules_) {
1347       s->getTriggerReport(rep);
1348     }
1349     sort_all(rep.workerSummaries);
1350   }
1351 
1352   void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1353     rep.eventSummary.totalEvents = 0;
1354     rep.eventSummary.cpuTime = 0.;
1355     rep.eventSummary.realTime = 0.;
1356     summaryTimeKeeper_->fillTriggerTimingReport(rep);
1357   }
1358 
1359   int Schedule::totalEvents() const {
1360     int returnValue = 0;
1361     for (auto& s : streamSchedules_) {
1362       returnValue += s->totalEvents();
1363     }
1364     return returnValue;
1365   }
1366 
1367   int Schedule::totalEventsPassed() const {
1368     int returnValue = 0;
1369     for (auto& s : streamSchedules_) {
1370       returnValue += s->totalEventsPassed();
1371     }
1372     return returnValue;
1373   }
1374 
1375   int Schedule::totalEventsFailed() const {
1376     int returnValue = 0;
1377     for (auto& s : streamSchedules_) {
1378       returnValue += s->totalEventsFailed();
1379     }
1380     return returnValue;
1381   }
1382 
1383   void Schedule::clearCounters() {
1384     for (auto& s : streamSchedules_) {
1385       s->clearCounters();
1386     }
1387   }
1388 }  // namespace edm