Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-01-11 16:27:16

0001 #include "FWCore/Framework/interface/Schedule.h"
0002 
0003 #include "DataFormats/Common/interface/setIsMergeable.h"
0004 #include "DataFormats/Common/interface/TriggerResults.h"
0005 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0007 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0008 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0011 #include "FWCore/Framework/interface/EDConsumerBase.h"
0012 #include "FWCore/Framework/src/OutputModuleDescription.h"
0013 #include "FWCore/Framework/interface/SubProcess.h"
0014 #include "FWCore/Framework/interface/TriggerNamesService.h"
0015 #include "FWCore/Framework/src/TriggerReport.h"
0016 #include "FWCore/Framework/src/TriggerTimingReport.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/src/Factory.h"
0019 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0020 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0021 #include "FWCore/Framework/interface/ModuleRegistry.h"
0022 #include "FWCore/Framework/src/TriggerResultInserter.h"
0023 #include "FWCore/Framework/src/PathStatusInserter.h"
0024 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0025 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0026 #include "FWCore/Concurrency/interface/chain_first.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0031 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0032 #include "FWCore/Utilities/interface/Algorithms.h"
0033 #include "FWCore/Utilities/interface/ConvertException.h"
0034 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0035 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0036 #include "FWCore/Utilities/interface/TypeID.h"
0037 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0038 
0039 #include <array>
0040 #include <algorithm>
0041 #include <cassert>
0042 #include <cstdlib>
0043 #include <functional>
0044 #include <iomanip>
0045 #include <list>
0046 #include <map>
0047 #include <set>
0048 #include <exception>
0049 #include <sstream>
0050 
0051 #include "make_shared_noexcept_false.h"
0052 #include "processEDAliases.h"
0053 
0054 namespace edm {
0055 
0056   class Maker;
0057 
0058   namespace {
0059     using std::placeholders::_1;
0060 
0061     bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0062       return std::binary_search(v.begin(), v.end(), s);
0063     }
0064 
0065     // Here we make the trigger results inserter directly.  This should
0066     // probably be a utility in the WorkerRegistry or elsewhere.
0067 
0068     std::shared_ptr<TriggerResultInserter> makeInserter(
0069         ParameterSet& proc_pset,
0070         PreallocationConfiguration const& iPrealloc,
0071         ProductRegistry& preg,
0072         ExceptionToActionTable const& actions,
0073         std::shared_ptr<ActivityRegistry> areg,
0074         std::shared_ptr<ProcessConfiguration const> processConfiguration) {
0075       ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
0076       trig_pset->registerIt();
0077 
0078       WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
0079       ModuleDescription md(trig_pset->id(),
0080                            "TriggerResultInserter",
0081                            "TriggerResults",
0082                            processConfiguration.get(),
0083                            ModuleDescription::getUniqueID());
0084 
0085       areg->preModuleConstructionSignal_(md);
0086       bool postCalled = false;
0087       std::shared_ptr<TriggerResultInserter> returnValue;
0088       // Caught exception is rethrown
0089       CMS_SA_ALLOW try {
0090         maker::ModuleHolderT<TriggerResultInserter> holder(
0091             make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),
0092             static_cast<Maker const*>(nullptr));
0093         holder.setModuleDescription(md);
0094         holder.registerProductsAndCallbacks(&preg);
0095         returnValue = holder.module();
0096         postCalled = true;
0097         // if exception then post will be called in the catch block
0098         areg->postModuleConstructionSignal_(md);
0099       } catch (...) {
0100         if (!postCalled) {
0101           CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0102             // If post throws an exception ignore it because we are already handling another exception
0103           }
0104         }
0105         throw;
0106       }
0107       return returnValue;
0108     }
0109 
0110     template <typename T>
0111     void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
0112                                  std::vector<std::string> const& pathNames,
0113                                  PreallocationConfiguration const& iPrealloc,
0114                                  ProductRegistry& preg,
0115                                  std::shared_ptr<ActivityRegistry> areg,
0116                                  std::shared_ptr<ProcessConfiguration const> processConfiguration,
0117                                  std::string const& moduleTypeName) {
0118       ParameterSet pset;
0119       pset.addParameter<std::string>("@module_type", moduleTypeName);
0120       pset.addParameter<std::string>("@module_edm_type", "EDProducer");
0121       pset.registerIt();
0122 
0123       pathStatusInserters.reserve(pathNames.size());
0124 
0125       for (auto const& pathName : pathNames) {
0126         ModuleDescription md(
0127             pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
0128 
0129         areg->preModuleConstructionSignal_(md);
0130         bool postCalled = false;
0131         // Caught exception is rethrown
0132         CMS_SA_ALLOW try {
0133           maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
0134                                          static_cast<Maker const*>(nullptr));
0135           holder.setModuleDescription(md);
0136           holder.registerProductsAndCallbacks(&preg);
0137           pathStatusInserters.emplace_back(holder.module());
0138           postCalled = true;
0139           // if exception then post will be called in the catch block
0140           areg->postModuleConstructionSignal_(md);
0141         } catch (...) {
0142           if (!postCalled) {
0143             CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
0144               // If post throws an exception ignore it because we are already handling another exception
0145             }
0146           }
0147           throw;
0148         }
0149       }
0150     }
0151 
0152     typedef std::vector<std::string> vstring;
0153 
0154     void processSwitchProducers(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
0155       // Update Switch BranchDescriptions for the chosen case
0156       struct BranchesCases {
0157         BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0158         std::vector<BranchKey> chosenBranches;
0159         std::vector<std::string> caseLabels;
0160       };
0161       std::map<std::string, BranchesCases> switchMap;
0162       for (auto& prod : preg.productListUpdator()) {
0163         if (prod.second.isSwitchAlias()) {
0164           auto it = switchMap.find(prod.second.moduleLabel());
0165           if (it == switchMap.end()) {
0166             auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0167             auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0168                                               switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0169             assert(inserted.second);
0170             it = inserted.first;
0171           }
0172 
0173           bool found = false;
0174           for (auto const& productIter : preg.productList()) {
0175             BranchKey const& branchKey = productIter.first;
0176             // The alias-for product must be in the same process as
0177             // the SwitchProducer (earlier processes or SubProcesses
0178             // may contain products with same type, module label, and
0179             // instance name)
0180             if (branchKey.processName() != processName) {
0181               continue;
0182             }
0183 
0184             BranchDescription const& desc = productIter.second;
0185             if (desc.branchType() == prod.second.branchType() and
0186                 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0187                 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0188                 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0189               prod.second.setSwitchAliasForBranch(desc);
0190               if (!prod.second.transient()) {
0191                 it->second.chosenBranches.push_back(prod.first);  // with moduleLabel of the Switch
0192               }
0193               found = true;
0194             }
0195           }
0196           if (not found) {
0197             Exception ex(errors::LogicError);
0198             ex << "Trying to find a BranchDescription to be aliased-for by SwitchProducer with\n"
0199                << "  friendly class name = " << prod.second.friendlyClassName() << "\n"
0200                << "  module label = " << prod.second.moduleLabel() << "\n"
0201                << "  product instance name = " << prod.second.productInstanceName() << "\n"
0202                << "  process name = " << processName
0203                << "\n\nbut did not find any. Please contact a framework developer.";
0204             ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0205             throw ex;
0206           }
0207         }
0208       }
0209       if (switchMap.empty())
0210         return;
0211 
0212       for (auto& elem : switchMap) {
0213         std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0214       }
0215 
0216       auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0217         std::map<std::string, std::vector<BranchKey>> caseBranches;
0218         for (auto const& item : preg.productList()) {
0219           if (item.first.processName() != processName)
0220             continue;
0221 
0222           if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0223               found != caseLabels.end()) {
0224             caseBranches[*found].push_back(item.first);
0225           }
0226         }
0227 
0228         for (auto const& caseLabel : caseLabels) {
0229           ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0230           auto& branches = caseBranches[caseLabel];
0231           std::sort(branches.begin(), branches.end());
0232           for (auto const& branch : branches) {
0233             ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0234           }
0235         }
0236       };
0237 
0238       // Check that non-chosen cases declare exactly the same non-transient branches
0239       // Also set the alias-for branches to transient
0240       std::vector<bool> foundBranches;
0241       for (auto const& switchItem : switchMap) {
0242         auto const& switchLabel = switchItem.first;
0243         auto const& chosenBranches = switchItem.second.chosenBranches;
0244         auto const& caseLabels = switchItem.second.caseLabels;
0245         foundBranches.resize(chosenBranches.size());
0246         for (auto const& caseLabel : caseLabels) {
0247           std::fill(foundBranches.begin(), foundBranches.end(), false);
0248           for (auto& nonConstItem : preg.productListUpdator()) {
0249             auto const& item = nonConstItem;
0250             if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0251               // Check that products which are not transient in the dictionary are consistent between
0252               // all the cases of a SwitchProducer.
0253               if (!item.second.transient()) {
0254                 auto range = std::equal_range(chosenBranches.begin(),
0255                                               chosenBranches.end(),
0256                                               BranchKey(item.first.friendlyClassName(),
0257                                                         switchLabel,
0258                                                         item.first.productInstanceName(),
0259                                                         item.first.processName()));
0260                 if (range.first == range.second) {
0261                   Exception ex(errors::Configuration);
0262                   ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0263                      << item.first << " that is not produced by the chosen case "
0264                      << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0265                             .getUntrackedParameter<std::string>("@chosen_case")
0266                      << " and that product is not transient. "
0267                      << "If the intention is to produce only a subset of the non-transient products listed below, each "
0268                         "case with more non-transient products needs to be replaced with an EDAlias to only the "
0269                         "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0270                   addProductsToException(caseLabels, ex);
0271                   throw ex;
0272                 }
0273                 assert(std::distance(range.first, range.second) == 1);
0274                 foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0275               }
0276 
0277               // Set the alias-for branch as transient so it gets fully ignored in output.
0278               // I tried first to implicitly drop all branches with
0279               // '@' in ProductSelector, but that gave problems on
0280               // input (those branches would be implicitly dropped on
0281               // input as well, leading to the SwitchProducer branches
0282               // do be dropped as dependent ones, as the alias
0283               // detection logic in RootFile says that the
0284               // SwitchProducer branches are not alias branches)
0285               nonConstItem.second.setTransient(true);
0286 
0287               // Check that there are no BranchAliases for any of the cases
0288               auto const& bd = item.second;
0289               if (not bd.branchAliases().empty()) {
0290                 auto ex = Exception(errors::UnimplementedFeature)
0291                           << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0292                              "aliases for SwitchProducer with label "
0293                           << switchLabel << " for case " << caseLabel << ":";
0294                 for (auto const& branchAlias : bd.branchAliases()) {
0295                   ex << " " << branchAlias;
0296                 }
0297                 throw ex;
0298               }
0299             }
0300           }
0301 
0302           for (size_t i = 0; i < chosenBranches.size(); i++) {
0303             if (not foundBranches[i]) {
0304               auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0305                                      .getUntrackedParameter<std::string>("@chosen_case");
0306               Exception ex(errors::Configuration);
0307               ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0308                  << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0309                  << chosenLabel << " and that product is not transient. "
0310                  << "If the intention is to produce only a subset of the non-transient products listed below, each "
0311                     "case with more non-transient products needs to be replaced with an EDAlias to only the "
0312                     "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0313               addProductsToException(caseLabels, ex);
0314               throw ex;
0315             }
0316           }
0317         }
0318       }
0319     }
0320 
0321     void reduceParameterSet(ParameterSet& proc_pset,
0322                             vstring const& end_path_name_list,
0323                             vstring& modulesInConfig,
0324                             std::set<std::string> const& usedModuleLabels,
0325                             std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0326       // Before calculating the ParameterSetID of the top level ParameterSet or
0327       // saving it in the registry drop from the top level ParameterSet all
0328       // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
0329       // production is not enabled also drop all the EDFilters and EDProducers
0330       // that are not scheduled. Drop the ParameterSet used to configure the module
0331       // itself. Also drop the other traces of these labels in the top level
0332       // ParameterSet: Remove that labels from @all_modules and from all the
0333       // end paths. If this makes any end paths empty, then remove the end path
0334       // name from @end_paths, and @paths.
0335 
0336       // First make a list of labels to drop
0337       vstring outputModuleLabels;
0338       std::string edmType;
0339       std::string const moduleEdmType("@module_edm_type");
0340       std::string const outputModule("OutputModule");
0341       std::string const edAnalyzer("EDAnalyzer");
0342       std::string const edFilter("EDFilter");
0343       std::string const edProducer("EDProducer");
0344 
0345       std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0346 
0347       //need a list of all modules on paths in order to determine
0348       // if an EDAnalyzer only appears on an end path
0349       vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0350       std::set<std::string> modulesOnPaths;
0351       {
0352         std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0353         for (auto const& endPath : end_path_name_list) {
0354           noEndPaths.erase(endPath);
0355         }
0356         {
0357           vstring labels;
0358           for (auto const& path : noEndPaths) {
0359             labels = proc_pset.getParameter<vstring>(path);
0360             modulesOnPaths.insert(labels.begin(), labels.end());
0361           }
0362         }
0363       }
0364       //Initially fill labelsToBeDropped with all module mentioned in
0365       // the configuration but which are not being used by the system
0366       std::vector<std::string> labelsToBeDropped;
0367       labelsToBeDropped.reserve(modulesInConfigSet.size());
0368       std::set_difference(modulesInConfigSet.begin(),
0369                           modulesInConfigSet.end(),
0370                           usedModuleLabels.begin(),
0371                           usedModuleLabels.end(),
0372                           std::back_inserter(labelsToBeDropped));
0373 
0374       const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0375       for (auto const& modLabel : usedModuleLabels) {
0376         // Do nothing for modules that do not have a ParameterSet. Modules of type
0377         // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
0378         if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0379           edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0380           if (edmType == outputModule) {
0381             outputModuleLabels.push_back(modLabel);
0382             labelsToBeDropped.push_back(modLabel);
0383           }
0384           if (edmType == edAnalyzer) {
0385             if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0386               labelsToBeDropped.push_back(modLabel);
0387             }
0388           }
0389         }
0390       }
0391       //labelsToBeDropped must be sorted
0392       std::inplace_merge(
0393           labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0394 
0395       // drop the parameter sets used to configure the modules
0396       for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0397 
0398       // drop the labels from @all_modules
0399       vstring::iterator endAfterRemove =
0400           std::remove_if(modulesInConfig.begin(),
0401                          modulesInConfig.end(),
0402                          std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0403       modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0404       proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0405 
0406       // drop the labels from all end paths
0407       vstring endPathsToBeDropped;
0408       vstring labels;
0409       for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0410            iEndPath != endEndPath;
0411            ++iEndPath) {
0412         labels = proc_pset.getParameter<vstring>(*iEndPath);
0413         vstring::iterator iSave = labels.begin();
0414         vstring::iterator iBegin = labels.begin();
0415 
0416         for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0417           if (binary_search_string(labelsToBeDropped, *iLabel)) {
0418             if (binary_search_string(outputModuleLabels, *iLabel)) {
0419               outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0420             }
0421           } else {
0422             if (iSave != iLabel) {
0423               iSave->swap(*iLabel);
0424             }
0425             ++iSave;
0426           }
0427         }
0428         labels.erase(iSave, labels.end());
0429         if (labels.empty()) {
0430           // remove empty end paths and save their names
0431           proc_pset.eraseSimpleParameter(*iEndPath);
0432           endPathsToBeDropped.push_back(*iEndPath);
0433         } else {
0434           proc_pset.addParameter<vstring>(*iEndPath, labels);
0435         }
0436       }
0437       sort_all(endPathsToBeDropped);
0438 
0439       // remove empty end paths from @paths
0440       endAfterRemove = std::remove_if(scheduledPaths.begin(),
0441                                       scheduledPaths.end(),
0442                                       std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0443       scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0444       proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0445 
0446       // remove empty end paths from @end_paths
0447       vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0448       endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0449                                       scheduledEndPaths.end(),
0450                                       std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0451       scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0452       proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0453     }
0454 
0455     class RngEDConsumer : public EDConsumerBase {
0456     public:
0457       explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
0458         Service<RandomNumberGenerator> rng;
0459         if (rng.isAvailable()) {
0460           rng->consumes(consumesCollector());
0461           for (auto const& consumesInfo : this->consumesInfo()) {
0462             typesConsumed.emplace(consumesInfo.type());
0463           }
0464         }
0465       }
0466     };
0467 
0468     template <typename F>
0469     auto doCleanup(F&& iF) {
0470       auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0471         CMS_SA_ALLOW try { f(); } catch (...) {
0472         }
0473         if (iPtr) {
0474           iTask.doneWaiting(*iPtr);
0475         }
0476       };
0477       return wrapped;
0478     }
0479   }  // namespace
0480   // -----------------------------
0481 
0482   typedef std::vector<std::string> vstring;
0483 
0484   // -----------------------------
0485 
0486   Schedule::Schedule(ParameterSet& proc_pset,
0487                      service::TriggerNamesService const& tns,
0488                      ProductRegistry& preg,
0489                      ExceptionToActionTable const& actions,
0490                      std::shared_ptr<ActivityRegistry> areg,
0491                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0492                      PreallocationConfiguration const& prealloc,
0493                      ProcessContext const* processContext,
0494                      ModuleTypeResolverMaker const* resolverMaker)
0495       :  //Only create a resultsInserter if there is a trigger path
0496         resultsInserter_{tns.getTrigPaths().empty()
0497                              ? std::shared_ptr<TriggerResultInserter>{}
0498                              : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
0499         moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
0500         all_output_communicators_(),
0501         preallocConfig_(prealloc),
0502         pathNames_(&tns.getTrigPaths()),
0503         endPathNames_(&tns.getEndPaths()),
0504         wantSummary_(tns.wantSummary()) {
0505     makePathStatusInserters(pathStatusInserters_,
0506                             *pathNames_,
0507                             prealloc,
0508                             preg,
0509                             areg,
0510                             processConfiguration,
0511                             std::string("PathStatusInserter"));
0512 
0513     makePathStatusInserters(endPathStatusInserters_,
0514                             *endPathNames_,
0515                             prealloc,
0516                             preg,
0517                             areg,
0518                             processConfiguration,
0519                             std::string("EndPathStatusInserter"));
0520 
0521     assert(0 < prealloc.numberOfStreams());
0522     streamSchedules_.reserve(prealloc.numberOfStreams());
0523     for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0524       streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
0525                                                                                pathStatusInserters_,
0526                                                                                endPathStatusInserters_,
0527                                                                                moduleRegistry(),
0528                                                                                proc_pset,
0529                                                                                tns,
0530                                                                                prealloc,
0531                                                                                preg,
0532                                                                                actions,
0533                                                                                areg,
0534                                                                                processConfiguration,
0535                                                                                StreamID{i},
0536                                                                                processContext));
0537     }
0538 
0539     //TriggerResults are injected automatically by StreamSchedules and are
0540     // unknown to the ModuleRegistry
0541     const std::string kTriggerResults("TriggerResults");
0542     std::vector<std::string> modulesToUse;
0543     modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
0544     for (auto const& worker : streamSchedules_[0]->allWorkers()) {
0545       if (worker->description()->moduleLabel() != kTriggerResults) {
0546         modulesToUse.push_back(worker->description()->moduleLabel());
0547       }
0548     }
0549     //The unscheduled modules are at the end of the list, but we want them at the front
0550     unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
0551     if (nUnscheduledModules > 0) {
0552       std::vector<std::string> temp;
0553       temp.reserve(modulesToUse.size());
0554       auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
0555       std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
0556       std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
0557       temp.swap(modulesToUse);
0558     }
0559 
0560     // propagate_const<T> has no reset() function
0561     globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0562                                                        pathStatusInserters_,
0563                                                        endPathStatusInserters_,
0564                                                        moduleRegistry(),
0565                                                        modulesToUse,
0566                                                        proc_pset,
0567                                                        preg,
0568                                                        prealloc,
0569                                                        actions,
0570                                                        areg,
0571                                                        processConfiguration,
0572                                                        processContext);
0573   }
0574 
0575   void Schedule::finishSetup(ParameterSet& proc_pset,
0576                              service::TriggerNamesService const& tns,
0577                              ProductRegistry& preg,
0578                              BranchIDListHelper& branchIDListHelper,
0579                              ProcessBlockHelperBase& processBlockHelper,
0580                              ThinnedAssociationsHelper& thinnedAssociationsHelper,
0581                              SubProcessParentageHelper const* subProcessParentageHelper,
0582                              std::shared_ptr<ActivityRegistry> areg,
0583                              std::shared_ptr<ProcessConfiguration> processConfiguration,
0584                              bool hasSubprocesses,
0585                              PreallocationConfiguration const& prealloc,
0586                              ProcessContext const* processContext) {
0587     //TriggerResults is not in the top level ParameterSet so the call to
0588     // reduceParameterSet would fail to find it. Just remove it up front.
0589     const std::string kTriggerResults("TriggerResults");
0590 
0591     std::set<std::string> usedModuleLabels;
0592     for (auto const& worker : allWorkers()) {
0593       if (worker->description()->moduleLabel() != kTriggerResults) {
0594         usedModuleLabels.insert(worker->description()->moduleLabel());
0595       }
0596     }
0597     std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0598     std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0599     reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0600     {
0601       std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0602       detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0603     }
0604 
0605     // At this point all BranchDescriptions are created. Mark now the
0606     // ones of unscheduled workers to be on-demand.
0607     {
0608       auto const& unsched = streamSchedules_[0]->unscheduledWorkers();
0609       if (not unsched.empty()) {
0610         std::set<std::string> unscheduledModules;
0611         std::transform(unsched.begin(),
0612                        unsched.end(),
0613                        std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
0614                        [](auto worker) { return worker->description()->moduleLabel(); });
0615         preg.setUnscheduledProducts(unscheduledModules);
0616       }
0617     }
0618 
0619     processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0620     proc_pset.registerIt();
0621     processConfiguration->setParameterSetID(proc_pset.id());
0622     processConfiguration->setProcessConfigurationID();
0623 
0624     // This is used for a little sanity-check to make sure no code
0625     // modifications alter the number of workers at a later date.
0626     size_t all_workers_count = allWorkers().size();
0627 
0628     moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0629       auto comm = iHolder->createOutputModuleCommunicator();
0630       if (comm) {
0631         all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0632       }
0633     });
0634     // Now that the output workers are filled in, set any output limits or information.
0635     limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
0636 
0637     // Sanity check: make sure nobody has added a worker after we've
0638     // already relied on the WorkerManager being full.
0639     assert(all_workers_count == allWorkers().size());
0640 
0641     branchIDListHelper.updateFromRegistry(preg);
0642 
0643     for (auto const& worker : streamSchedules_[0]->allWorkers()) {
0644       worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
0645     }
0646 
0647     processBlockHelper.updateForNewProcess(preg, processConfiguration->processName());
0648 
0649     // The output modules consume products in kept branches.
0650     // So we must set this up before freezing.
0651     for (auto& c : all_output_communicators_) {
0652       c->selectProducts(preg, thinnedAssociationsHelper, processBlockHelper);
0653     }
0654 
0655     for (auto& product : preg.productListUpdator()) {
0656       setIsMergeable(product.second);
0657     }
0658 
0659     {
0660       // We now get a collection of types that may be consumed.
0661       std::set<TypeID> productTypesConsumed;
0662       std::set<TypeID> elementTypesConsumed;
0663       // Loop over all modules
0664       for (auto const& worker : allWorkers()) {
0665         for (auto const& consumesInfo : worker->consumesInfo()) {
0666           if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0667             productTypesConsumed.emplace(consumesInfo.type());
0668           } else {
0669             elementTypesConsumed.emplace(consumesInfo.type());
0670           }
0671         }
0672       }
0673       // The SubProcess class is not a module, yet it may consume.
0674       if (hasSubprocesses) {
0675         productTypesConsumed.emplace(typeid(TriggerResults));
0676       }
0677       // The RandomNumberGeneratorService is not a module, yet it consumes.
0678       { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
0679       preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0680     }
0681 
0682     for (auto& c : all_output_communicators_) {
0683       c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
0684     }
0685 
0686     if (wantSummary_) {
0687       std::vector<const ModuleDescription*> modDesc;
0688       const auto& workers = allWorkers();
0689       modDesc.reserve(workers.size());
0690 
0691       std::transform(workers.begin(),
0692                      workers.end(),
0693                      std::back_inserter(modDesc),
0694                      [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0695 
0696       // propagate_const<T> has no reset() function
0697       summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0698       auto timeKeeperPtr = summaryTimeKeeper_.get();
0699 
0700       areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0701 
0702       areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0703       areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0704       areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0705       areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0706       areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0707       areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0708 
0709       areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0710       areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0711 
0712       areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0713       areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0714 
0715       areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0716       areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0717       //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
0718       //timeKeeperPtr->startModuleEvent(iContext,iMod);
0719       //});
0720     }
0721 
0722   }  // Schedule::Schedule
0723 
0724   void Schedule::limitOutput(ParameterSet const& proc_pset,
0725                              BranchIDLists const& branchIDLists,
0726                              SubProcessParentageHelper const* subProcessParentageHelper) {
0727     std::string const output("output");
0728 
0729     ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0730     int maxEventSpecs = 0;
0731     int maxEventsOut = -1;
0732     ParameterSet const* vMaxEventsOut = nullptr;
0733     std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0734     if (search_all(intNamesE, output)) {
0735       maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0736       ++maxEventSpecs;
0737     }
0738     std::vector<std::string> psetNamesE;
0739     maxEventsPSet.getParameterSetNames(psetNamesE, false);
0740     if (search_all(psetNamesE, output)) {
0741       vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0742       ++maxEventSpecs;
0743     }
0744 
0745     if (maxEventSpecs > 1) {
0746       throw Exception(errors::Configuration)
0747           << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0748     }
0749 
0750     for (auto& c : all_output_communicators_) {
0751       OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
0752       if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0753         std::string const& moduleLabel = c->description().moduleLabel();
0754         try {
0755           desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0756         } catch (Exception const&) {
0757           throw Exception(errors::Configuration)
0758               << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0759         }
0760       }
0761       c->configure(desc);
0762     }
0763   }
0764 
0765   bool Schedule::terminate() const {
0766     if (all_output_communicators_.empty()) {
0767       return false;
0768     }
0769     for (auto& c : all_output_communicators_) {
0770       if (!c->limitReached()) {
0771         // Found an output module that has not reached output event count.
0772         return false;
0773       }
0774     }
0775     LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0776                                      << "has reached its configured limit.\n";
0777     return true;
0778   }
0779 
0780   void Schedule::endJob(ExceptionCollector& collector) {
0781     globalSchedule_->endJob(collector);
0782     if (collector.hasThrown()) {
0783       return;
0784     }
0785 
0786     if (wantSummary_ == false)
0787       return;
0788     {
0789       TriggerReport tr;
0790       getTriggerReport(tr);
0791 
0792       // The trigger report (pass/fail etc.):
0793 
0794       LogFwkVerbatim("FwkSummary") << "";
0795       if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
0796         LogFwkVerbatim("FwkSummary") << "TrigReport Process: "
0797                                      << streamSchedules_[0]->context().processContext()->processName();
0798       }
0799       LogFwkVerbatim("FwkSummary") << "TrigReport "
0800                                    << "---------- Event  Summary ------------";
0801       if (!tr.trigPathSummaries.empty()) {
0802         LogFwkVerbatim("FwkSummary") << "TrigReport"
0803                                      << " Events total = " << tr.eventSummary.totalEvents
0804                                      << " passed = " << tr.eventSummary.totalEventsPassed
0805                                      << " failed = " << tr.eventSummary.totalEventsFailed << "";
0806       } else {
0807         LogFwkVerbatim("FwkSummary") << "TrigReport"
0808                                      << " Events total = " << tr.eventSummary.totalEvents
0809                                      << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0810       }
0811 
0812       LogFwkVerbatim("FwkSummary") << "";
0813       LogFwkVerbatim("FwkSummary") << "TrigReport "
0814                                    << "---------- Path   Summary ------------";
0815       LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0816                                    << " " << std::right << std::setw(10) << "Executed"
0817                                    << " " << std::right << std::setw(10) << "Passed"
0818                                    << " " << std::right << std::setw(10) << "Failed"
0819                                    << " " << std::right << std::setw(10) << "Error"
0820                                    << " "
0821                                    << "Name"
0822                                    << "";
0823       for (auto const& p : tr.trigPathSummaries) {
0824         LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
0825                                      << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
0826                                      << std::right << std::setw(10) << p.timesPassed << " " << std::right
0827                                      << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0828                                      << p.timesExcept << " " << p.name << "";
0829       }
0830 
0831       /*
0832       std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
0833       std::vector<int>::const_iterator epe = empty_trig_paths_.end();
0834       std::vector<std::string>::const_iterator  epn = empty_trig_path_names_.begin();
0835       for (; epi != epe; ++epi, ++epn) {
0836 
0837         LogFwkVerbatim("FwkSummary") << "TrigReport "
0838         << std::right << std::setw(5) << 1
0839         << std::right << std::setw(5) << *epi << " "
0840         << std::right << std::setw(10) << totalEvents() << " "
0841         << std::right << std::setw(10) << totalEvents() << " "
0842         << std::right << std::setw(10) << 0 << " "
0843         << std::right << std::setw(10) << 0 << " "
0844         << *epn << "";
0845       }
0846        */
0847 
0848       LogFwkVerbatim("FwkSummary") << "";
0849       LogFwkVerbatim("FwkSummary") << "TrigReport "
0850                                    << "-------End-Path   Summary ------------";
0851       LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0852                                    << " " << std::right << std::setw(10) << "Executed"
0853                                    << " " << std::right << std::setw(10) << "Passed"
0854                                    << " " << std::right << std::setw(10) << "Failed"
0855                                    << " " << std::right << std::setw(10) << "Error"
0856                                    << " "
0857                                    << "Name"
0858                                    << "";
0859       for (auto const& p : tr.endPathSummaries) {
0860         LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
0861                                      << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
0862                                      << std::right << std::setw(10) << p.timesPassed << " " << std::right
0863                                      << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0864                                      << p.timesExcept << " " << p.name << "";
0865       }
0866 
0867       for (auto const& p : tr.trigPathSummaries) {
0868         LogFwkVerbatim("FwkSummary") << "";
0869         LogFwkVerbatim("FwkSummary") << "TrigReport "
0870                                      << "---------- Modules in Path: " << p.name << " ------------";
0871         LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0872                                      << " " << std::right << std::setw(10) << "Visited"
0873                                      << " " << std::right << std::setw(10) << "Passed"
0874                                      << " " << std::right << std::setw(10) << "Failed"
0875                                      << " " << std::right << std::setw(10) << "Error"
0876                                      << " "
0877                                      << "Name"
0878                                      << "";
0879 
0880         for (auto const& mod : p.moduleInPathSummaries) {
0881           LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
0882                                        << mod.bitPosition << " " << std::right << std::setw(10) << mod.timesVisited
0883                                        << " " << std::right << std::setw(10) << mod.timesPassed << " " << std::right
0884                                        << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
0885                                        << mod.timesExcept << " " << mod.moduleLabel << "";
0886         }
0887       }
0888 
0889       for (auto const& p : tr.endPathSummaries) {
0890         LogFwkVerbatim("FwkSummary") << "";
0891         LogFwkVerbatim("FwkSummary") << "TrigReport "
0892                                      << "------ Modules in End-Path: " << p.name << " ------------";
0893         LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0894                                      << " " << std::right << std::setw(10) << "Visited"
0895                                      << " " << std::right << std::setw(10) << "Passed"
0896                                      << " " << std::right << std::setw(10) << "Failed"
0897                                      << " " << std::right << std::setw(10) << "Error"
0898                                      << " "
0899                                      << "Name"
0900                                      << "";
0901 
0902         unsigned int bitpos = 0;
0903         for (auto const& mod : p.moduleInPathSummaries) {
0904           LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
0905                                        << bitpos << " " << std::right << std::setw(10) << mod.timesVisited << " "
0906                                        << std::right << std::setw(10) << mod.timesPassed << " " << std::right
0907                                        << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
0908                                        << mod.timesExcept << " " << mod.moduleLabel << "";
0909           ++bitpos;
0910         }
0911       }
0912 
0913       LogFwkVerbatim("FwkSummary") << "";
0914       LogFwkVerbatim("FwkSummary") << "TrigReport "
0915                                    << "---------- Module Summary ------------";
0916       LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Visited"
0917                                    << " " << std::right << std::setw(10) << "Executed"
0918                                    << " " << std::right << std::setw(10) << "Passed"
0919                                    << " " << std::right << std::setw(10) << "Failed"
0920                                    << " " << std::right << std::setw(10) << "Error"
0921                                    << " "
0922                                    << "Name"
0923                                    << "";
0924       for (auto const& worker : tr.workerSummaries) {
0925         LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " "
0926                                      << std::right << std::setw(10) << worker.timesRun << " " << std::right
0927                                      << std::setw(10) << worker.timesPassed << " " << std::right << std::setw(10)
0928                                      << worker.timesFailed << " " << std::right << std::setw(10) << worker.timesExcept
0929                                      << " " << worker.moduleLabel << "";
0930       }
0931       LogFwkVerbatim("FwkSummary") << "";
0932     }
0933     // The timing report (CPU and Real Time):
0934     TriggerTimingReport tr;
0935     getTriggerTimingReport(tr);
0936 
0937     const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0938 
0939     LogFwkVerbatim("FwkSummary") << "TimeReport "
0940                                  << "---------- Event  Summary ---[sec]----";
0941     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0942                                  << "       event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0943     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0944                                  << "      event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0945     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0946                                  << "     sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0947     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0948                                  << " efficiency CPU/Real/thread = "
0949                                  << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0950                                         preallocConfig_.numberOfThreads();
0951 
0952     constexpr int kColumn1Size = 10;
0953     constexpr int kColumn2Size = 12;
0954     constexpr int kColumn3Size = 12;
0955     LogFwkVerbatim("FwkSummary") << "";
0956     LogFwkVerbatim("FwkSummary") << "TimeReport "
0957                                  << "---------- Path   Summary ---[Real sec]----";
0958     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0959                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0960                                  << "  Name";
0961     for (auto const& p : tr.trigPathSummaries) {
0962       const int timesRun = std::max(1, p.timesRun);
0963       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0964                                    << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
0965                                    << std::setw(kColumn2Size) << p.realTime / timesRun << "  " << p.name << "";
0966     }
0967     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0968                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0969                                  << "  Name"
0970                                  << "";
0971 
0972     LogFwkVerbatim("FwkSummary") << "";
0973     LogFwkVerbatim("FwkSummary") << "TimeReport "
0974                                  << "-------End-Path   Summary ---[Real sec]----";
0975     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0976                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0977                                  << "  Name"
0978                                  << "";
0979     for (auto const& p : tr.endPathSummaries) {
0980       const int timesRun = std::max(1, p.timesRun);
0981 
0982       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
0983                                    << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
0984                                    << std::setw(kColumn2Size) << p.realTime / timesRun << "  " << p.name << "";
0985     }
0986     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0987                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0988                                  << "  Name"
0989                                  << "";
0990 
0991     for (auto const& p : tr.trigPathSummaries) {
0992       LogFwkVerbatim("FwkSummary") << "";
0993       LogFwkVerbatim("FwkSummary") << "TimeReport "
0994                                    << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
0995       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0996                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
0997                                    << "  Name"
0998                                    << "";
0999       for (auto const& mod : p.moduleInPathSummaries) {
1000         LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1001                                      << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
1002                                      << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << "  "
1003                                      << mod.moduleLabel << "";
1004       }
1005     }
1006     if (not tr.trigPathSummaries.empty()) {
1007       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1008                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
1009                                    << "  Name"
1010                                    << "";
1011     }
1012     for (auto const& p : tr.endPathSummaries) {
1013       LogFwkVerbatim("FwkSummary") << "";
1014       LogFwkVerbatim("FwkSummary") << "TimeReport "
1015                                    << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
1016       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1017                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
1018                                    << "  Name"
1019                                    << "";
1020       for (auto const& mod : p.moduleInPathSummaries) {
1021         LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1022                                      << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
1023                                      << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << "  "
1024                                      << mod.moduleLabel << "";
1025       }
1026     }
1027     if (not tr.endPathSummaries.empty()) {
1028       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1029                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
1030                                    << "  Name"
1031                                    << "";
1032     }
1033     LogFwkVerbatim("FwkSummary") << "";
1034     LogFwkVerbatim("FwkSummary") << "TimeReport "
1035                                  << "---------- Module Summary ---[Real sec]----";
1036     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1037                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1038                                  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1039                                  << "  Name"
1040                                  << "";
1041     for (auto const& worker : tr.workerSummaries) {
1042       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1043                                    << std::setw(kColumn1Size) << worker.realTime / totalEvents << " " << std::right
1044                                    << std::setw(kColumn2Size) << worker.realTime / std::max(1, worker.timesRun) << " "
1045                                    << std::right << std::setw(kColumn3Size)
1046                                    << worker.realTime / std::max(1, worker.timesVisited) << "  " << worker.moduleLabel
1047                                    << "";
1048     }
1049     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1050                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1051                                  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1052                                  << "  Name"
1053                                  << "";
1054 
1055     LogFwkVerbatim("FwkSummary") << "";
1056     LogFwkVerbatim("FwkSummary") << "T---Report end!"
1057                                  << "";
1058     LogFwkVerbatim("FwkSummary") << "";
1059   }
1060 
1061   void Schedule::closeOutputFiles() {
1062     using std::placeholders::_1;
1063     for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
1064     for (auto& worker : allWorkers()) {
1065       worker->respondToCloseOutputFile();
1066     }
1067   }
1068 
1069   void Schedule::openOutputFiles(FileBlock& fb) {
1070     using std::placeholders::_1;
1071     for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1072   }
1073 
1074   void Schedule::writeRunAsync(WaitingTaskHolder task,
1075                                RunPrincipal const& rp,
1076                                ProcessContext const* processContext,
1077                                ActivityRegistry* activityRegistry,
1078                                MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1079     auto token = ServiceRegistry::instance().presentToken();
1080     GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
1081                                 LuminosityBlockID(rp.run(), 0),
1082                                 rp.index(),
1083                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1084                                 rp.endTime(),
1085                                 processContext);
1086 
1087     using namespace edm::waiting_task;
1088     chain::first([&](auto nextTask) {
1089       //services can depend on other services
1090       ServiceRegistry::Operate op(token);
1091 
1092       // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1093       CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1094       }
1095       for (auto& c : all_output_communicators_) {
1096         c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1097       }
1098     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1099       //services can depend on other services
1100       ServiceRegistry::Operate op(token);
1101 
1102       activityRegistry->postGlobalWriteRunSignal_(globalContext);
1103     })) |
1104         chain::runLast(task);
1105   }
1106 
1107   void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
1108                                         ProcessBlockPrincipal const& pbp,
1109                                         ProcessContext const* processContext,
1110                                         ActivityRegistry* activityRegistry) {
1111     auto token = ServiceRegistry::instance().presentToken();
1112     GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
1113                                 LuminosityBlockID(),
1114                                 RunIndex::invalidRunIndex(),
1115                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1116                                 Timestamp::invalidTimestamp(),
1117                                 processContext);
1118 
1119     using namespace edm::waiting_task;
1120     chain::first([&](auto nextTask) {
1121       // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1122       ServiceRegistry::Operate op(token);
1123       CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1124       }
1125       for (auto& c : all_output_communicators_) {
1126         c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1127       }
1128     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1129       //services can depend on other services
1130       ServiceRegistry::Operate op(token);
1131 
1132       activityRegistry->postWriteProcessBlockSignal_(globalContext);
1133     })) |
1134         chain::runLast(std::move(task));
1135   }
1136 
1137   void Schedule::writeLumiAsync(WaitingTaskHolder task,
1138                                 LuminosityBlockPrincipal const& lbp,
1139                                 ProcessContext const* processContext,
1140                                 ActivityRegistry* activityRegistry) {
1141     auto token = ServiceRegistry::instance().presentToken();
1142     GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1143                                 lbp.id(),
1144                                 lbp.runPrincipal().index(),
1145                                 lbp.index(),
1146                                 lbp.beginTime(),
1147                                 processContext);
1148 
1149     using namespace edm::waiting_task;
1150     chain::first([&](auto nextTask) {
1151       ServiceRegistry::Operate op(token);
1152       CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1153       }
1154       for (auto& c : all_output_communicators_) {
1155         c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1156       }
1157     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1158       //services can depend on other services
1159       ServiceRegistry::Operate op(token);
1160 
1161       activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1162     })) |
1163         chain::runLast(task);
1164   }
1165 
1166   bool Schedule::shouldWeCloseOutput() const {
1167     using std::placeholders::_1;
1168     // Return true iff at least one output module returns true.
1169     return (std::find_if(all_output_communicators_.begin(),
1170                          all_output_communicators_.end(),
1171                          std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1172             all_output_communicators_.end());
1173   }
1174 
1175   void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1176     using std::placeholders::_1;
1177     for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1178   }
1179 
1180   void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1181     using std::placeholders::_1;
1182     for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1183   }
1184 
1185   void Schedule::beginJob(ProductRegistry const& iRegistry,
1186                           eventsetup::ESRecordsToProxyIndices const& iESIndices,
1187                           ProcessBlockHelperBase const& processBlockHelperBase) {
1188     globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase);
1189   }
1190 
1191   void Schedule::beginStream(unsigned int iStreamID) {
1192     assert(iStreamID < streamSchedules_.size());
1193     streamSchedules_[iStreamID]->beginStream();
1194   }
1195 
1196   void Schedule::endStream(unsigned int iStreamID) {
1197     assert(iStreamID < streamSchedules_.size());
1198     streamSchedules_[iStreamID]->endStream();
1199   }
1200 
1201   void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1202                                       unsigned int iStreamID,
1203                                       EventTransitionInfo& info,
1204                                       ServiceToken const& token) {
1205     assert(iStreamID < streamSchedules_.size());
1206     streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1207   }
1208 
1209   bool Schedule::changeModule(std::string const& iLabel,
1210                               ParameterSet const& iPSet,
1211                               const ProductRegistry& iRegistry,
1212                               eventsetup::ESRecordsToProxyIndices const& iIndices) {
1213     Worker* found = nullptr;
1214     for (auto const& worker : allWorkers()) {
1215       if (worker->description()->moduleLabel() == iLabel) {
1216         found = worker;
1217         break;
1218       }
1219     }
1220     if (nullptr == found) {
1221       return false;
1222     }
1223 
1224     auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1225 
1226     globalSchedule_->replaceModule(newMod, iLabel);
1227 
1228     for (auto& s : streamSchedules_) {
1229       s->replaceModule(newMod, iLabel);
1230     }
1231 
1232     {
1233       //Need to updateLookup in order to make getByToken work
1234       auto const processBlockLookup = iRegistry.productLookup(InProcess);
1235       auto const runLookup = iRegistry.productLookup(InRun);
1236       auto const lumiLookup = iRegistry.productLookup(InLumi);
1237       auto const eventLookup = iRegistry.productLookup(InEvent);
1238       found->updateLookup(InProcess, *runLookup);
1239       found->updateLookup(InRun, *runLookup);
1240       found->updateLookup(InLumi, *lumiLookup);
1241       found->updateLookup(InEvent, *eventLookup);
1242       found->updateLookup(iIndices);
1243 
1244       auto const& processName = newMod->moduleDescription().processName();
1245       auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1246       auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1247       auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1248       auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1249       found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1250       found->resolvePutIndicies(InRun, runModuleToIndicies);
1251       found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1252       found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1253     }
1254 
1255     return true;
1256   }
1257 
1258   void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1259     globalSchedule_->deleteModule(iLabel);
1260     for (auto& stream : streamSchedules_) {
1261       stream->deleteModule(iLabel);
1262     }
1263     moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1264   }
1265 
1266   void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1267                                        std::multimap<std::string, std::string> const& referencesToBranches,
1268                                        std::vector<std::string> const& modulesToSkip,
1269                                        edm::ProductRegistry const& preg) {
1270     for (auto& stream : streamSchedules_) {
1271       stream->initializeEarlyDelete(
1272           *moduleRegistry(), branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1273     }
1274   }
1275 
1276   std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1277     std::vector<ModuleDescription const*> result;
1278     result.reserve(allWorkers().size());
1279 
1280     for (auto const& worker : allWorkers()) {
1281       ModuleDescription const* p = worker->description();
1282       result.push_back(p);
1283     }
1284     return result;
1285   }
1286 
1287   Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1288 
1289   void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1290     for (auto const& worker : allWorkers()) {
1291       worker->convertCurrentProcessAlias(processName);
1292     }
1293   }
1294 
1295   void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1296     streamSchedules_[0]->availablePaths(oLabelsToFill);
1297   }
1298 
1299   void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1300 
1301   void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1302 
1303   void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1304     streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1305   }
1306 
1307   void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1308                                           std::vector<ModuleDescription const*>& descriptions,
1309                                           unsigned int hint) const {
1310     streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1311   }
1312 
1313   void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1314                                              std::vector<ModuleDescription const*>& descriptions,
1315                                              unsigned int hint) const {
1316     streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1317   }
1318 
1319   void Schedule::fillModuleAndConsumesInfo(
1320       std::vector<ModuleDescription const*>& allModuleDescriptions,
1321       std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
1322       std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>& modulesWhoseProductsAreConsumedBy,
1323       std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
1324       ProductRegistry const& preg) const {
1325     allModuleDescriptions.clear();
1326     moduleIDToIndex.clear();
1327     for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1328       modulesWhoseProductsAreConsumedBy[iBranchType].clear();
1329     }
1330     modulesInPreviousProcessesWhoseProductsAreConsumedBy.clear();
1331 
1332     allModuleDescriptions.reserve(allWorkers().size());
1333     moduleIDToIndex.reserve(allWorkers().size());
1334     for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1335       modulesWhoseProductsAreConsumedBy[iBranchType].resize(allWorkers().size());
1336     }
1337     modulesInPreviousProcessesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1338 
1339     std::map<std::string, ModuleDescription const*> labelToDesc;
1340     unsigned int i = 0;
1341     for (auto const& worker : allWorkers()) {
1342       ModuleDescription const* p = worker->description();
1343       allModuleDescriptions.push_back(p);
1344       moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1345       labelToDesc[p->moduleLabel()] = p;
1346       ++i;
1347     }
1348     sort_all(moduleIDToIndex);
1349 
1350     i = 0;
1351     for (auto const& worker : allWorkers()) {
1352       std::array<std::vector<ModuleDescription const*>*, NumBranchTypes> modules;
1353       for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1354         modules[iBranchType] = &modulesWhoseProductsAreConsumedBy[iBranchType].at(i);
1355       }
1356 
1357       std::vector<ModuleProcessName>& modulesInPreviousProcesses =
1358           modulesInPreviousProcessesWhoseProductsAreConsumedBy.at(i);
1359       try {
1360         worker->modulesWhoseProductsAreConsumed(modules, modulesInPreviousProcesses, preg, labelToDesc);
1361       } catch (cms::Exception& ex) {
1362         ex.addContext("Calling Worker::modulesWhoseProductsAreConsumed() for module " +
1363                       worker->description()->moduleLabel());
1364         throw;
1365       }
1366       ++i;
1367     }
1368   }
1369 
1370   void Schedule::getTriggerReport(TriggerReport& rep) const {
1371     rep.eventSummary.totalEvents = 0;
1372     rep.eventSummary.totalEventsPassed = 0;
1373     rep.eventSummary.totalEventsFailed = 0;
1374     for (auto& s : streamSchedules_) {
1375       s->getTriggerReport(rep);
1376     }
1377     sort_all(rep.workerSummaries);
1378   }
1379 
1380   void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1381     rep.eventSummary.totalEvents = 0;
1382     rep.eventSummary.cpuTime = 0.;
1383     rep.eventSummary.realTime = 0.;
1384     summaryTimeKeeper_->fillTriggerTimingReport(rep);
1385   }
1386 
1387   int Schedule::totalEvents() const {
1388     int returnValue = 0;
1389     for (auto& s : streamSchedules_) {
1390       returnValue += s->totalEvents();
1391     }
1392     return returnValue;
1393   }
1394 
1395   int Schedule::totalEventsPassed() const {
1396     int returnValue = 0;
1397     for (auto& s : streamSchedules_) {
1398       returnValue += s->totalEventsPassed();
1399     }
1400     return returnValue;
1401   }
1402 
1403   int Schedule::totalEventsFailed() const {
1404     int returnValue = 0;
1405     for (auto& s : streamSchedules_) {
1406       returnValue += s->totalEventsFailed();
1407     }
1408     return returnValue;
1409   }
1410 
1411   void Schedule::clearCounters() {
1412     for (auto& s : streamSchedules_) {
1413       s->clearCounters();
1414     }
1415   }
1416 }  // namespace edm