Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-09-12 10:00:30

0001 #include "FWCore/Framework/interface/Schedule.h"
0002 #include "FWCore/Framework/src/ScheduleBuilder.h"
0003 
0004 #include "DataFormats/Common/interface/setIsMergeable.h"
0005 #include "DataFormats/Common/interface/TriggerResults.h"
0006 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0007 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0008 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0009 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0010 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0011 #include "FWCore/AbstractServices/interface/RandomNumberGenerator.h"
0012 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0013 #include "FWCore/Framework/interface/ConsumesCollector.h"
0014 #include "FWCore/Framework/interface/EDConsumerBase.h"
0015 #include "FWCore/Framework/src/OutputModuleDescription.h"
0016 #include "FWCore/Framework/interface/TriggerNamesService.h"
0017 #include "FWCore/Framework/src/TriggerReport.h"
0018 #include "FWCore/Framework/src/TriggerTimingReport.h"
0019 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0020 #include "FWCore/Framework/src/ModuleHolderFactory.h"
0021 #include "FWCore/Framework/interface/OutputModuleCommunicator.h"
0022 #include "FWCore/Framework/interface/maker/ModuleHolder.h"
0023 #include "FWCore/Framework/interface/ModuleRegistry.h"
0024 #include "FWCore/Framework/interface/ModuleRegistryUtilities.h"
0025 #include "FWCore/Framework/src/TriggerResultInserter.h"
0026 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0027 #include "FWCore/Framework/src/PathStatusInserter.h"
0028 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0029 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0030 #include "FWCore/Framework/interface/ComponentDescription.h"
0031 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0032 #include "FWCore/Concurrency/interface/chain_first.h"
0033 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0034 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0035 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0036 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0037 #include "FWCore/ServiceRegistry/interface/ModuleConsumesInfo.h"
0038 #include "FWCore/Utilities/interface/Algorithms.h"
0039 #include "FWCore/Utilities/interface/ConvertException.h"
0040 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0041 #include "FWCore/Utilities/interface/TypeID.h"
0042 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0043 
0044 #include <array>
0045 #include <algorithm>
0046 #include <cassert>
0047 #include <cstdlib>
0048 #include <functional>
0049 #include <iomanip>
0050 #include <list>
0051 #include <map>
0052 #include <set>
0053 #include <exception>
0054 #include <sstream>
0055 
0056 #include "make_shared_noexcept_false.h"
0057 #include "processEDAliases.h"
0058 
0059 namespace edm {
0060 
0061   class ModuleMakerBase;
0062 
0063   namespace {
0064     using std::placeholders::_1;
0065 
0066     bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
0067       return std::binary_search(v.begin(), v.end(), s);
0068     }
0069 
0070     typedef std::vector<std::string> vstring;
0071 
0072     void processSwitchProducers(ParameterSet const& proc_pset,
0073                                 std::string const& processName,
0074                                 SignallingProductRegistryFiller& preg) {
0075       // Update Switch ProductDescriptions for the chosen case
0076       struct BranchesCases {
0077         BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
0078         std::vector<BranchKey> chosenBranches;
0079         std::vector<std::string> caseLabels;
0080       };
0081       std::map<std::string, BranchesCases> switchMap;
0082       for (auto& prod : preg.productListUpdator()) {
0083         if (prod.second.isSwitchAlias()) {
0084           auto it = switchMap.find(prod.second.moduleLabel());
0085           if (it == switchMap.end()) {
0086             auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
0087             auto inserted = switchMap.emplace(prod.second.moduleLabel(),
0088                                               switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
0089             assert(inserted.second);
0090             it = inserted.first;
0091           }
0092 
0093           bool found = false;
0094           for (auto const& productIter : preg.registry().productList()) {
0095             BranchKey const& branchKey = productIter.first;
0096             // The alias-for product must be in the same process as
0097             // the SwitchProducer (earlier processes
0098             // may contain products with same type, module label, and
0099             // instance name)
0100             if (branchKey.processName() != processName) {
0101               continue;
0102             }
0103 
0104             ProductDescription const& desc = productIter.second;
0105             if (desc.branchType() == prod.second.branchType() and
0106                 desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
0107                 branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
0108                 branchKey.productInstanceName() == prod.second.productInstanceName()) {
0109               prod.second.setSwitchAliasForBranch(desc);
0110               if (!prod.second.transient()) {
0111                 it->second.chosenBranches.push_back(prod.first);  // with moduleLabel of the Switch
0112               }
0113               found = true;
0114             }
0115           }
0116           if (not found) {
0117             Exception ex(errors::LogicError);
0118             ex << "Trying to find a ProductDescription to be aliased-for by SwitchProducer with\n"
0119                << "  friendly class name = " << prod.second.friendlyClassName() << "\n"
0120                << "  module label = " << prod.second.moduleLabel() << "\n"
0121                << "  product instance name = " << prod.second.productInstanceName() << "\n"
0122                << "  process name = " << processName
0123                << "\n\nbut did not find any. Please contact a framework developer.";
0124             ex.addContext("Calling Schedule.cc:processSwitchProducers()");
0125             throw ex;
0126           }
0127         }
0128       }
0129       if (switchMap.empty())
0130         return;
0131 
0132       for (auto& elem : switchMap) {
0133         std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
0134       }
0135 
0136       auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
0137         std::map<std::string, std::vector<BranchKey>> caseBranches;
0138         for (auto const& item : preg.registry().productList()) {
0139           if (item.first.processName() != processName)
0140             continue;
0141 
0142           if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
0143               found != caseLabels.end()) {
0144             caseBranches[*found].push_back(item.first);
0145           }
0146         }
0147 
0148         for (auto const& caseLabel : caseLabels) {
0149           ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
0150           auto& branches = caseBranches[caseLabel];
0151           std::sort(branches.begin(), branches.end());
0152           for (auto const& branch : branches) {
0153             ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
0154           }
0155         }
0156       };
0157 
0158       // Check that non-chosen cases declare exactly the same non-transient branches
0159       // Also set the alias-for branches to transient
0160       std::vector<bool> foundBranches;
0161       for (auto const& switchItem : switchMap) {
0162         auto const& switchLabel = switchItem.first;
0163         auto const& chosenBranches = switchItem.second.chosenBranches;
0164         auto const& caseLabels = switchItem.second.caseLabels;
0165         foundBranches.resize(chosenBranches.size());
0166         for (auto const& caseLabel : caseLabels) {
0167           std::fill(foundBranches.begin(), foundBranches.end(), false);
0168           for (auto& nonConstItem : preg.productListUpdator()) {
0169             auto const& item = nonConstItem;
0170             if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
0171               // Check that products which are not transient in the dictionary are consistent between
0172               // all the cases of a SwitchProducer.
0173               if (!item.second.transient()) {
0174                 auto range = std::equal_range(chosenBranches.begin(),
0175                                               chosenBranches.end(),
0176                                               BranchKey(item.first.friendlyClassName(),
0177                                                         switchLabel,
0178                                                         item.first.productInstanceName(),
0179                                                         item.first.processName()));
0180                 if (range.first == range.second) {
0181                   Exception ex(errors::Configuration);
0182                   ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
0183                      << item.first << " that is not produced by the chosen case "
0184                      << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0185                             .getUntrackedParameter<std::string>("@chosen_case")
0186                      << " and that product is not transient. "
0187                      << "If the intention is to produce only a subset of the non-transient products listed below, each "
0188                         "case with more non-transient products needs to be replaced with an EDAlias to only the "
0189                         "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0190                   addProductsToException(caseLabels, ex);
0191                   throw ex;
0192                 }
0193                 assert(std::distance(range.first, range.second) == 1);
0194                 foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
0195               }
0196 
0197               // Set the alias-for branch as transient so it gets fully ignored in output.
0198               // I tried first to implicitly drop all branches with
0199               // '@' in ProductSelector, but that gave problems on
0200               // input (those branches would be implicitly dropped on
0201               // input as well, leading to the SwitchProducer branches
0202               // do be dropped as dependent ones, as the alias
0203               // detection logic in RootFile says that the
0204               // SwitchProducer branches are not alias branches)
0205               nonConstItem.second.setTransient(true);
0206 
0207               // Check that there are no BranchAliases for any of the cases
0208               auto const& bd = item.second;
0209               if (not bd.branchAliases().empty()) {
0210                 auto ex = Exception(errors::UnimplementedFeature)
0211                           << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
0212                              "aliases for SwitchProducer with label "
0213                           << switchLabel << " for case " << caseLabel << ":";
0214                 for (auto const& branchAlias : bd.branchAliases()) {
0215                   ex << " " << branchAlias;
0216                 }
0217                 throw ex;
0218               }
0219             }
0220           }
0221 
0222           for (size_t i = 0; i < chosenBranches.size(); i++) {
0223             if (not foundBranches[i]) {
0224               auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
0225                                      .getUntrackedParameter<std::string>("@chosen_case");
0226               Exception ex(errors::Configuration);
0227               ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
0228                  << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
0229                  << chosenLabel << " and that product is not transient. "
0230                  << "If the intention is to produce only a subset of the non-transient products listed below, each "
0231                     "case with more non-transient products needs to be replaced with an EDAlias to only the "
0232                     "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
0233               addProductsToException(caseLabels, ex);
0234               throw ex;
0235             }
0236           }
0237         }
0238       }
0239     }
0240 
0241     void reduceParameterSet(ParameterSet& proc_pset,
0242                             vstring const& end_path_name_list,
0243                             vstring& modulesInConfig,
0244                             std::set<std::string> const& usedModuleLabels,
0245                             std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
0246       // Before calculating the ParameterSetID of the top level ParameterSet or
0247       // saving it in the registry drop from the top level ParameterSet all
0248       // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
0249       // production is not enabled also drop all the EDFilters and EDProducers
0250       // that are not scheduled. Drop the ParameterSet used to configure the module
0251       // itself. Also drop the other traces of these labels in the top level
0252       // ParameterSet: Remove that labels from @all_modules and from all the
0253       // end paths. If this makes any end paths empty, then remove the end path
0254       // name from @end_paths, and @paths.
0255 
0256       // First make a list of labels to drop
0257       vstring outputModuleLabels;
0258       std::string edmType;
0259       std::string const moduleEdmType("@module_edm_type");
0260       std::string const outputModule("OutputModule");
0261       std::string const edAnalyzer("EDAnalyzer");
0262       std::string const edFilter("EDFilter");
0263       std::string const edProducer("EDProducer");
0264 
0265       std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
0266 
0267       //need a list of all modules on paths in order to determine
0268       // if an EDAnalyzer only appears on an end path
0269       vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
0270       std::set<std::string> modulesOnPaths;
0271       {
0272         std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
0273         for (auto const& endPath : end_path_name_list) {
0274           noEndPaths.erase(endPath);
0275         }
0276         {
0277           vstring labels;
0278           for (auto const& path : noEndPaths) {
0279             labels = proc_pset.getParameter<vstring>(path);
0280             modulesOnPaths.insert(labels.begin(), labels.end());
0281           }
0282         }
0283       }
0284       //Initially fill labelsToBeDropped with all module mentioned in
0285       // the configuration but which are not being used by the system
0286       std::vector<std::string> labelsToBeDropped;
0287       labelsToBeDropped.reserve(modulesInConfigSet.size());
0288       std::set_difference(modulesInConfigSet.begin(),
0289                           modulesInConfigSet.end(),
0290                           usedModuleLabels.begin(),
0291                           usedModuleLabels.end(),
0292                           std::back_inserter(labelsToBeDropped));
0293 
0294       const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
0295       for (auto const& modLabel : usedModuleLabels) {
0296         // Do nothing for modules that do not have a ParameterSet. Modules of type
0297         // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
0298         if (proc_pset.existsAs<ParameterSet>(modLabel)) {
0299           edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
0300           if (edmType == outputModule) {
0301             outputModuleLabels.push_back(modLabel);
0302             labelsToBeDropped.push_back(modLabel);
0303           }
0304           if (edmType == edAnalyzer) {
0305             if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
0306               labelsToBeDropped.push_back(modLabel);
0307             }
0308           }
0309         }
0310       }
0311       //labelsToBeDropped must be sorted
0312       std::inplace_merge(
0313           labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
0314 
0315       // drop the parameter sets used to configure the modules
0316       for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
0317 
0318       // drop the labels from @all_modules
0319       vstring::iterator endAfterRemove =
0320           std::remove_if(modulesInConfig.begin(),
0321                          modulesInConfig.end(),
0322                          std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
0323       modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
0324       proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
0325 
0326       // drop the labels from all end paths
0327       vstring endPathsToBeDropped;
0328       vstring labels;
0329       for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
0330            iEndPath != endEndPath;
0331            ++iEndPath) {
0332         labels = proc_pset.getParameter<vstring>(*iEndPath);
0333         vstring::iterator iSave = labels.begin();
0334         vstring::iterator iBegin = labels.begin();
0335 
0336         for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
0337           if (binary_search_string(labelsToBeDropped, *iLabel)) {
0338             if (binary_search_string(outputModuleLabels, *iLabel)) {
0339               outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
0340             }
0341           } else {
0342             if (iSave != iLabel) {
0343               iSave->swap(*iLabel);
0344             }
0345             ++iSave;
0346           }
0347         }
0348         labels.erase(iSave, labels.end());
0349         if (labels.empty()) {
0350           // remove empty end paths and save their names
0351           proc_pset.eraseSimpleParameter(*iEndPath);
0352           endPathsToBeDropped.push_back(*iEndPath);
0353         } else {
0354           proc_pset.addParameter<vstring>(*iEndPath, labels);
0355         }
0356       }
0357       sort_all(endPathsToBeDropped);
0358 
0359       // remove empty end paths from @paths
0360       endAfterRemove = std::remove_if(scheduledPaths.begin(),
0361                                       scheduledPaths.end(),
0362                                       std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0363       scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
0364       proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
0365 
0366       // remove empty end paths from @end_paths
0367       vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
0368       endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
0369                                       scheduledEndPaths.end(),
0370                                       std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
0371       scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
0372       proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
0373     }
0374 
0375     template <typename F>
0376     auto doCleanup(F&& iF) {
0377       auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
0378         CMS_SA_ALLOW try { f(); } catch (...) {
0379         }
0380         if (iPtr) {
0381           iTask.doneWaiting(*iPtr);
0382         }
0383       };
0384       return wrapped;
0385     }
0386   }  // namespace
0387   // -----------------------------
0388 
0389   typedef std::vector<std::string> vstring;
0390 
0391   // -----------------------------
0392 
0393   Schedule::Schedule(ParameterSet& proc_pset,
0394                      service::TriggerNamesService const& tns,
0395                      SignallingProductRegistryFiller& preg,
0396                      ExceptionToActionTable const& actions,
0397                      std::shared_ptr<ActivityRegistry> areg,
0398                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0399                      PreallocationConfiguration const& prealloc,
0400                      ProcessContext const* processContext,
0401                      ModuleTypeResolverMaker const* resolverMaker)
0402       :  //Only create a resultsInserter if there is a trigger path
0403         moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
0404         all_output_communicators_(),
0405         preallocConfig_(prealloc),
0406         pathNames_(&tns.getTrigPaths()),
0407         endPathNames_(&tns.getEndPaths()),
0408         wantSummary_(tns.wantSummary()) {
0409     ScheduleBuilder builder(
0410         *moduleRegistry_, proc_pset, *pathNames_, *endPathNames_, prealloc, preg, *areg, processConfiguration);
0411     resultsInserter_ = std::move(builder.resultsInserter_);
0412     assert(builder.pathStatusInserters_.size() == builder.pathNameAndModules_.size());
0413     pathStatusInserters_ = std::move(builder.pathStatusInserters_);
0414     endPathStatusInserters_ = std::move(builder.endPathStatusInserters_);
0415 
0416     std::vector<StreamSchedule::PathInfo> paths;
0417     paths.reserve(builder.pathNameAndModules_.size());
0418     unsigned int index = 0;
0419     for (auto& path : builder.pathNameAndModules_) {
0420       paths.emplace_back(path.first, std::move(path.second), get_underlying_safe(pathStatusInserters_[index]));
0421       ++index;
0422     }
0423     std::vector<StreamSchedule::EndPathInfo> endpaths;
0424     endpaths.reserve(builder.endpathNameAndModules_.size());
0425     if (builder.endpathNameAndModules_.size() == 1) {
0426       assert(endPathStatusInserters_.empty());
0427       auto& path = builder.endpathNameAndModules_.front();
0428       endpaths.emplace_back(path.first, std::move(path.second), std::shared_ptr<EndPathStatusInserter>());
0429     } else {
0430       assert(endPathStatusInserters_.size() == builder.endpathNameAndModules_.size());
0431       index = 0;
0432       for (auto& path : builder.endpathNameAndModules_) {
0433         endpaths.emplace_back(path.first, std::move(path.second), get_underlying_safe(endPathStatusInserters_[index]));
0434         ++index;
0435       }
0436     }
0437 
0438     assert(0 < prealloc.numberOfStreams());
0439     streamSchedules_.reserve(prealloc.numberOfStreams());
0440     for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
0441       streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(paths,
0442                                                                                endpaths,
0443                                                                                builder.unscheduledModules_,
0444                                                                                resultsInserter(),
0445                                                                                moduleRegistrySharedPtr(),
0446                                                                                actions,
0447                                                                                areg,
0448                                                                                StreamID{i},
0449                                                                                processContext));
0450     }
0451 
0452     globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
0453                                                        pathStatusInserters_,
0454                                                        endPathStatusInserters_,
0455                                                        moduleRegistrySharedPtr(),
0456                                                        builder.allNeededModules_,
0457                                                        prealloc,
0458                                                        actions,
0459                                                        areg,
0460                                                        processContext);
0461   }
0462 
0463   void Schedule::finishSetup(ParameterSet& proc_pset,
0464                              service::TriggerNamesService const& tns,
0465                              SignallingProductRegistryFiller& preg,
0466                              BranchIDListHelper& branchIDListHelper,
0467                              ProcessBlockHelperBase& processBlockHelper,
0468                              ThinnedAssociationsHelper& thinnedAssociationsHelper,
0469                              std::shared_ptr<ActivityRegistry> areg,
0470                              std::shared_ptr<ProcessConfiguration> processConfiguration,
0471                              PreallocationConfiguration const& prealloc,
0472                              ProcessContext const* processContext) {
0473     //TriggerResults is not in the top level ParameterSet so the call to
0474     // reduceParameterSet would fail to find it. Just remove it up front.
0475     const std::string kTriggerResults("TriggerResults");
0476 
0477     std::set<std::string> usedModuleLabels;
0478     moduleRegistry_->forAllModuleHolders([&usedModuleLabels, &kTriggerResults](maker::ModuleHolder* iHolder) {
0479       if (iHolder->moduleDescription().moduleLabel() != kTriggerResults) {
0480         usedModuleLabels.insert(iHolder->moduleDescription().moduleLabel());
0481       }
0482     });
0483     std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
0484     std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
0485     reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
0486     {
0487       std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
0488       detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
0489     }
0490 
0491     // At this point all ProductDescriptions are created. Mark now the
0492     // ones of unscheduled workers to be on-demand.
0493     {
0494       auto const& unsched = streamSchedules_[0]->unscheduledWorkersLumisAndEvents();
0495       if (not unsched.empty()) {
0496         std::set<std::string> unscheduledModules;
0497         std::transform(unsched.begin(),
0498                        unsched.end(),
0499                        std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
0500                        [](auto worker) { return worker->description()->moduleLabel(); });
0501         preg.setUnscheduledProducts(unscheduledModules);
0502       }
0503     }
0504 
0505     processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
0506     proc_pset.registerIt();
0507     processConfiguration->setParameterSetID(proc_pset.id());
0508     processConfiguration->setProcessConfigurationID();
0509 
0510     // This is used for a little sanity-check to make sure no code
0511     // modifications alter the number of workers at a later date.
0512     size_t all_workers_count = moduleRegistry_->maxModuleID();
0513 
0514     moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
0515       auto comm = iHolder->createOutputModuleCommunicator();
0516       if (comm) {
0517         all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
0518       }
0519     });
0520     // Now that the output communicators are filled in, set any output limits or information.
0521     limitOutput(proc_pset, branchIDListHelper.branchIDLists());
0522 
0523     // Sanity check: make sure nobody has added a module after we've
0524     // already relied on the ModuleRegistry being full.
0525     assert(all_workers_count == moduleRegistry_->maxModuleID());
0526 
0527     branchIDListHelper.updateFromRegistry(preg.registry());
0528 
0529     moduleRegistry_->forAllModuleHolders([&preg, &thinnedAssociationsHelper](auto& iHolder) {
0530       iHolder->registerThinnedAssociations(preg.registry(), thinnedAssociationsHelper);
0531     });
0532 
0533     processBlockHelper.updateForNewProcess(preg.registry(), processConfiguration->processName());
0534 
0535     // The output modules consume products in kept branches.
0536     // So we must set this up before freezing.
0537     for (auto& c : all_output_communicators_) {
0538       c->selectProducts(preg.registry(), thinnedAssociationsHelper, processBlockHelper);
0539     }
0540 
0541     for (auto& product : preg.productListUpdator()) {
0542       setIsMergeable(product.second);
0543     }
0544 
0545     {
0546       // We now get a collection of types that may be consumed.
0547       std::set<TypeID> productTypesConsumed;
0548       std::set<TypeID> elementTypesConsumed;
0549 
0550       moduleRegistry_->forAllModuleHolders(
0551           [&productTypesConsumed, &elementTypesConsumed](maker::ModuleHolder* iHolder) {
0552             for (auto const& consumesInfo : iHolder->moduleConsumesInfos()) {
0553               if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
0554                 productTypesConsumed.emplace(consumesInfo.type());
0555               } else {
0556                 elementTypesConsumed.emplace(consumesInfo.type());
0557               }
0558             }
0559           });
0560       // The RandomNumberGeneratorService is not a module, yet it consumes.
0561       {
0562         Service<RandomNumberGenerator> rng;
0563         if (rng.isAvailable()) {
0564           //if the service doesn't consume anything, the consumer will be nullptr
0565           auto consumer = rng->consumer();
0566           if (consumer) {
0567             for (auto const& consumesInfo : consumer->moduleConsumesInfos()) {
0568               productTypesConsumed.emplace(consumesInfo.type());
0569             }
0570           }
0571         }
0572       }
0573       preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
0574     }
0575 
0576     for (auto& c : all_output_communicators_) {
0577       c->setEventSelectionInfo(outputModulePathPositions, preg.registry().anyProductProduced());
0578     }
0579 
0580     if (wantSummary_) {
0581       std::vector<const ModuleDescription*> modDesc;
0582       const auto& workers = allWorkers();
0583       modDesc.reserve(workers.size());
0584 
0585       std::transform(workers.begin(),
0586                      workers.end(),
0587                      std::back_inserter(modDesc),
0588                      [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
0589 
0590       // propagate_const<T> has no reset() function
0591       summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
0592       auto timeKeeperPtr = summaryTimeKeeper_.get();
0593 
0594       areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
0595 
0596       areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
0597       areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0598       areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0599       areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
0600       areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
0601       areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
0602 
0603       areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
0604       areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
0605 
0606       areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
0607       areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
0608 
0609       areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
0610       areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
0611       //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
0612       //timeKeeperPtr->startModuleEvent(iContext,iMod);
0613       //});
0614     }
0615 
0616   }  // Schedule::Schedule
0617 
0618   void Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
0619     std::string const output("output");
0620 
0621     ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
0622     int maxEventSpecs = 0;
0623     int maxEventsOut = -1;
0624     ParameterSet const* vMaxEventsOut = nullptr;
0625     std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
0626     if (search_all(intNamesE, output)) {
0627       maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
0628       ++maxEventSpecs;
0629     }
0630     std::vector<std::string> psetNamesE;
0631     maxEventsPSet.getParameterSetNames(psetNamesE, false);
0632     if (search_all(psetNamesE, output)) {
0633       vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
0634       ++maxEventSpecs;
0635     }
0636 
0637     if (maxEventSpecs > 1) {
0638       throw Exception(errors::Configuration)
0639           << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
0640     }
0641 
0642     for (auto& c : all_output_communicators_) {
0643       OutputModuleDescription desc(branchIDLists, maxEventsOut);
0644       if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
0645         std::string const& moduleLabel = c->description().moduleLabel();
0646         try {
0647           desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
0648         } catch (Exception const&) {
0649           throw Exception(errors::Configuration)
0650               << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
0651         }
0652       }
0653       c->configure(desc);
0654     }
0655   }
0656 
0657   bool Schedule::terminate() const {
0658     if (all_output_communicators_.empty()) {
0659       return false;
0660     }
0661     for (auto& c : all_output_communicators_) {
0662       if (!c->limitReached()) {
0663         // Found an output module that has not reached output event count.
0664         return false;
0665       }
0666     }
0667     LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
0668                                      << "has reached its configured limit.\n";
0669     return true;
0670   }
0671 
0672   void Schedule::endJob(ExceptionCollector& collector) {
0673     globalSchedule_->endJob(collector, *moduleRegistry_);
0674     if (collector.hasThrown()) {
0675       return;
0676     }
0677 
0678     if (wantSummary_) {
0679       try {
0680         convertException::wrap([this]() { sendFwkSummaryToMessageLogger(); });
0681       } catch (cms::Exception const& ex) {
0682         collector.addException(ex);
0683       }
0684     }
0685   }
0686 
0687   void Schedule::sendFwkSummaryToMessageLogger() const {
0688     //Function to loop over items in a container and periodically
0689     // flush to the message logger.
0690     auto logForEach = [](auto const& iContainer, auto iMessage) {
0691       auto logger = LogFwkVerbatim("FwkSummary");
0692       int count = 0;
0693       for (auto const& element : iContainer) {
0694         iMessage(logger, element);
0695         if (++count == 10) {
0696           logger = LogFwkVerbatim("FwkSummary");
0697           count = 0;
0698         } else {
0699           logger << "\n";
0700         }
0701       }
0702     };
0703 
0704     {
0705       TriggerReport tr;
0706       getTriggerReport(tr);
0707 
0708       // The trigger report (pass/fail etc.):
0709 
0710       LogFwkVerbatim("FwkSummary") << "";
0711       LogFwkVerbatim("FwkSummary") << "TrigReport "
0712                                    << "---------- Event  Summary ------------";
0713       if (!tr.trigPathSummaries.empty()) {
0714         LogFwkVerbatim("FwkSummary") << "TrigReport"
0715                                      << " Events total = " << tr.eventSummary.totalEvents
0716                                      << " passed = " << tr.eventSummary.totalEventsPassed
0717                                      << " failed = " << tr.eventSummary.totalEventsFailed << "";
0718       } else {
0719         LogFwkVerbatim("FwkSummary") << "TrigReport"
0720                                      << " Events total = " << tr.eventSummary.totalEvents
0721                                      << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
0722       }
0723 
0724       LogFwkVerbatim("FwkSummary") << "";
0725       LogFwkVerbatim("FwkSummary") << "TrigReport "
0726                                    << "---------- Path   Summary ------------";
0727       LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0728                                    << " " << std::right << std::setw(10) << "Executed"
0729                                    << " " << std::right << std::setw(10) << "Passed"
0730                                    << " " << std::right << std::setw(10) << "Failed"
0731                                    << " " << std::right << std::setw(10) << "Error"
0732                                    << " "
0733                                    << "Name"
0734                                    << "";
0735       logForEach(tr.trigPathSummaries, [](auto& logger, auto const& p) {
0736         logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << p.bitPosition << " "
0737                << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0738                << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0739                << p.timesExcept << " " << p.name;
0740       });
0741 
0742       /*
0743       std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
0744       std::vector<int>::const_iterator epe = empty_trig_paths_.end();
0745       std::vector<std::string>::const_iterator  epn = empty_trig_path_names_.begin();
0746       for (; epi != epe; ++epi, ++epn) {
0747 
0748         LogFwkVerbatim("FwkSummary") << "TrigReport "
0749         << std::right << std::setw(5) << 1
0750         << std::right << std::setw(5) << *epi << " "
0751         << std::right << std::setw(10) << totalEvents() << " "
0752         << std::right << std::setw(10) << totalEvents() << " "
0753         << std::right << std::setw(10) << 0 << " "
0754         << std::right << std::setw(10) << 0 << " "
0755         << *epn << "";
0756       }
0757        */
0758 
0759       LogFwkVerbatim("FwkSummary") << "\n"
0760                                    << "TrigReport "
0761                                    << "-------End-Path   Summary ------------\n"
0762                                    << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0763                                    << " " << std::right << std::setw(10) << "Executed"
0764                                    << " " << std::right << std::setw(10) << "Passed"
0765                                    << " " << std::right << std::setw(10) << "Failed"
0766                                    << " " << std::right << std::setw(10) << "Error"
0767                                    << " "
0768                                    << "Name";
0769       logForEach(tr.endPathSummaries, [](auto& logger, auto const& p) {
0770         logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << p.bitPosition << " "
0771                << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
0772                << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
0773                << p.timesExcept << " " << p.name;
0774       });
0775 
0776       for (auto const& p : tr.trigPathSummaries) {
0777         LogFwkVerbatim("FwkSummary") << "\n"
0778                                      << "TrigReport "
0779                                      << "---------- Modules in Path: " << p.name << " ------------\n"
0780                                      << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0781                                      << " " << std::right << std::setw(10) << "Visited"
0782                                      << " " << std::right << std::setw(10) << "Passed"
0783                                      << " " << std::right << std::setw(10) << "Failed"
0784                                      << " " << std::right << std::setw(10) << "Error"
0785                                      << " "
0786                                      << "Name";
0787 
0788         logForEach(p.moduleInPathSummaries, [](auto& logger, auto const& mod) {
0789           logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << mod.bitPosition
0790                  << " " << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0791                  << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0792                  << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0793         });
0794       }
0795 
0796       for (auto const& p : tr.endPathSummaries) {
0797         LogFwkVerbatim("FwkSummary") << "\n"
0798                                      << "TrigReport "
0799                                      << "------ Modules in End-Path: " << p.name << " ------------\n"
0800                                      << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
0801                                      << " " << std::right << std::setw(10) << "Visited"
0802                                      << " " << std::right << std::setw(10) << "Passed"
0803                                      << " " << std::right << std::setw(10) << "Failed"
0804                                      << " " << std::right << std::setw(10) << "Error"
0805                                      << " "
0806                                      << "Name";
0807 
0808         unsigned int bitpos = 0;
0809         logForEach(p.moduleInPathSummaries, [&bitpos](auto& logger, auto const& mod) {
0810           logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << bitpos << " "
0811                  << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
0812                  << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
0813                  << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
0814           ++bitpos;
0815         });
0816       }
0817 
0818       LogFwkVerbatim("FwkSummary") << "\n"
0819                                    << "TrigReport "
0820                                    << "---------- Module Summary ------------\n"
0821                                    << "TrigReport " << std::right << std::setw(10) << "Visited"
0822                                    << " " << std::right << std::setw(10) << "Executed"
0823                                    << " " << std::right << std::setw(10) << "Passed"
0824                                    << " " << std::right << std::setw(10) << "Failed"
0825                                    << " " << std::right << std::setw(10) << "Error"
0826                                    << " "
0827                                    << "Name";
0828 
0829       logForEach(tr.workerSummaries, [](auto& logger, auto const& worker) {
0830         logger << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " " << std::right
0831                << std::setw(10) << worker.timesRun << " " << std::right << std::setw(10) << worker.timesPassed << " "
0832                << std::right << std::setw(10) << worker.timesFailed << " " << std::right << std::setw(10)
0833                << worker.timesExcept << " " << worker.moduleLabel;
0834       });
0835       LogFwkVerbatim("FwkSummary") << "";
0836     }
0837     // The timing report (CPU and Real Time):
0838     TriggerTimingReport tr;
0839     getTriggerTimingReport(tr);
0840 
0841     const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
0842 
0843     LogFwkVerbatim("FwkSummary") << "TimeReport "
0844                                  << "---------- Event  Summary ---[sec]----";
0845     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0846                                  << "       event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
0847     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0848                                  << "      event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
0849     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0850                                  << "     sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
0851     LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
0852                                  << " efficiency CPU/Real/thread = "
0853                                  << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
0854                                         preallocConfig_.numberOfThreads();
0855 
0856     constexpr int kColumn1Size = 10;
0857     constexpr int kColumn2Size = 12;
0858     constexpr int kColumn3Size = 12;
0859     LogFwkVerbatim("FwkSummary") << "";
0860     LogFwkVerbatim("FwkSummary") << "TimeReport "
0861                                  << "---------- Path   Summary ---[Real sec]----";
0862     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0863                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0864                                  << "  Name";
0865     logForEach(tr.trigPathSummaries, [&](auto& logger, auto const& p) {
0866       const int timesRun = std::max(1, p.timesRun);
0867       logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0868              << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0869              << "  " << p.name;
0870     });
0871     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0872                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0873                                  << "  Name";
0874 
0875     LogFwkVerbatim("FwkSummary") << "\n"
0876                                  << "TimeReport "
0877                                  << "-------End-Path   Summary ---[Real sec]----\n"
0878                                  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0879                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0880                                  << "  Name";
0881     logForEach(tr.endPathSummaries, [&](auto& logger, auto const& p) {
0882       const int timesRun = std::max(1, p.timesRun);
0883 
0884       logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0885              << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
0886              << "  " << p.name;
0887     });
0888     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0889                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0890                                  << "  Name";
0891 
0892     for (auto const& p : tr.trigPathSummaries) {
0893       LogFwkVerbatim("FwkSummary") << "\n"
0894                                    << "TimeReport "
0895                                    << "---------- Modules in Path: " << p.name << " ---[Real sec]----\n"
0896                                    << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0897                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
0898                                    << "  Name";
0899       logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
0900         logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0901                << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
0902                << mod.realTime / std::max(1, mod.timesVisited) << "  " << mod.moduleLabel;
0903       });
0904     }
0905     if (not tr.trigPathSummaries.empty()) {
0906       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0907                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
0908                                    << "  Name";
0909     }
0910     for (auto const& p : tr.endPathSummaries) {
0911       LogFwkVerbatim("FwkSummary") << "\n"
0912                                    << "TimeReport "
0913                                    << "------ Modules in End-Path: " << p.name << " ---[Real sec]----\n"
0914                                    << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0915                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
0916                                    << "  Name";
0917       logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
0918         logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0919                << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
0920                << mod.realTime / std::max(1, mod.timesVisited) << "  " << mod.moduleLabel;
0921       });
0922     }
0923     if (not tr.endPathSummaries.empty()) {
0924       LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0925                                    << " " << std::right << std::setw(kColumn2Size) << "per visit"
0926                                    << "  Name";
0927     }
0928     LogFwkVerbatim("FwkSummary") << "\n"
0929                                  << "TimeReport "
0930                                  << "---------- Module Summary ---[Real sec]----\n"
0931                                  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0932                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0933                                  << " " << std::right << std::setw(kColumn3Size) << "per visit"
0934                                  << "  Name";
0935     logForEach(tr.workerSummaries, [&](auto& logger, auto const& worker) {
0936       logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
0937              << worker.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
0938              << worker.realTime / std::max(1, worker.timesRun) << " " << std::right << std::setw(kColumn3Size)
0939              << worker.realTime / std::max(1, worker.timesVisited) << "  " << worker.moduleLabel;
0940     });
0941     LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
0942                                  << " " << std::right << std::setw(kColumn2Size) << "per exec"
0943                                  << " " << std::right << std::setw(kColumn3Size) << "per visit"
0944                                  << "  Name";
0945 
0946     LogFwkVerbatim("FwkSummary") << "\nT---Report end!\n\n";
0947   }
0948 
0949   void Schedule::closeOutputFiles() {
0950     using std::placeholders::_1;
0951     for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::closeFile, _1));
0952     moduleRegistry_->forAllModuleHolders([](maker::ModuleHolder* iHolder) { iHolder->respondToCloseOutputFile(); });
0953   }
0954 
0955   void Schedule::openOutputFiles(FileBlock& fb) {
0956     using std::placeholders::_1;
0957     for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
0958   }
0959 
0960   void Schedule::writeRunAsync(WaitingTaskHolder task,
0961                                RunPrincipal const& rp,
0962                                ProcessContext const* processContext,
0963                                ActivityRegistry* activityRegistry,
0964                                MergeableRunProductMetadata const* mergeableRunProductMetadata) {
0965     auto token = ServiceRegistry::instance().presentToken();
0966     GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
0967                                 LuminosityBlockID(rp.run(), 0),
0968                                 rp.index(),
0969                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
0970                                 rp.endTime(),
0971                                 processContext);
0972 
0973     using namespace edm::waiting_task;
0974     chain::first([&](auto nextTask) {
0975       //services can depend on other services
0976       ServiceRegistry::Operate op(token);
0977 
0978       // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
0979       CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
0980       }
0981       for (auto& c : all_output_communicators_) {
0982         c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
0983       }
0984     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
0985       //services can depend on other services
0986       ServiceRegistry::Operate op(token);
0987 
0988       activityRegistry->postGlobalWriteRunSignal_(globalContext);
0989     })) |
0990         chain::runLast(task);
0991   }
0992 
0993   void Schedule::writeProcessBlockAsync(WaitingTaskHolder task,
0994                                         ProcessBlockPrincipal const& pbp,
0995                                         ProcessContext const* processContext,
0996                                         ActivityRegistry* activityRegistry) {
0997     auto token = ServiceRegistry::instance().presentToken();
0998     GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
0999                                 LuminosityBlockID(),
1000                                 RunIndex::invalidRunIndex(),
1001                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
1002                                 Timestamp::invalidTimestamp(),
1003                                 processContext);
1004 
1005     using namespace edm::waiting_task;
1006     chain::first([&](auto nextTask) {
1007       // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1008       ServiceRegistry::Operate op(token);
1009       CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1010       }
1011       for (auto& c : all_output_communicators_) {
1012         c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1013       }
1014     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1015       //services can depend on other services
1016       ServiceRegistry::Operate op(token);
1017 
1018       activityRegistry->postWriteProcessBlockSignal_(globalContext);
1019     })) |
1020         chain::runLast(std::move(task));
1021   }
1022 
1023   void Schedule::writeLumiAsync(WaitingTaskHolder task,
1024                                 LuminosityBlockPrincipal const& lbp,
1025                                 ProcessContext const* processContext,
1026                                 ActivityRegistry* activityRegistry) {
1027     auto token = ServiceRegistry::instance().presentToken();
1028     GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1029                                 lbp.id(),
1030                                 lbp.runPrincipal().index(),
1031                                 lbp.index(),
1032                                 lbp.beginTime(),
1033                                 processContext);
1034 
1035     using namespace edm::waiting_task;
1036     chain::first([&](auto nextTask) {
1037       ServiceRegistry::Operate op(token);
1038       CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1039       }
1040       for (auto& c : all_output_communicators_) {
1041         c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1042       }
1043     }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1044       //services can depend on other services
1045       ServiceRegistry::Operate op(token);
1046 
1047       activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1048     })) |
1049         chain::runLast(task);
1050   }
1051 
1052   bool Schedule::shouldWeCloseOutput() const {
1053     using std::placeholders::_1;
1054     // Return true iff at least one output module returns true.
1055     return (std::find_if(all_output_communicators_.begin(),
1056                          all_output_communicators_.end(),
1057                          std::bind(&OutputModuleCommunicator::shouldWeCloseFile, _1)) !=
1058             all_output_communicators_.end());
1059   }
1060 
1061   void Schedule::respondToOpenInputFile(FileBlock const& fb) {
1062     moduleRegistry_->forAllModuleHolders([&fb](maker::ModuleHolder* iHolder) { iHolder->respondToOpenInputFile(fb); });
1063   }
1064 
1065   void Schedule::respondToCloseInputFile(FileBlock const& fb) {
1066     moduleRegistry_->forAllModuleHolders([&fb](maker::ModuleHolder* iHolder) { iHolder->respondToCloseInputFile(fb); });
1067   }
1068 
1069   void Schedule::beginJob(ProductRegistry const& iRegistry,
1070                           eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
1071                           ProcessBlockHelperBase const& processBlockHelperBase,
1072                           std::string const& iProcessName) {
1073     finishModulesInitialization(*moduleRegistry_, iRegistry, iESIndices, processBlockHelperBase, iProcessName);
1074     globalSchedule_->beginJob(*moduleRegistry_);
1075   }
1076 
1077   void Schedule::beginStream(unsigned int streamID) {
1078     assert(streamID < streamSchedules_.size());
1079     streamSchedules_[streamID]->beginStream(*moduleRegistry_);
1080   }
1081 
1082   void Schedule::endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept {
1083     assert(streamID < streamSchedules_.size());
1084     streamSchedules_[streamID]->endStream(*moduleRegistry_, collector, collectorMutex);
1085   }
1086 
1087   void Schedule::processOneEventAsync(WaitingTaskHolder iTask,
1088                                       unsigned int iStreamID,
1089                                       EventTransitionInfo& info,
1090                                       ServiceToken const& token) {
1091     assert(iStreamID < streamSchedules_.size());
1092     streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1093   }
1094 
1095   bool Schedule::changeModule(std::string const& iLabel,
1096                               ParameterSet const& iPSet,
1097                               const SignallingProductRegistryFiller& iRegistry,
1098                               eventsetup::ESRecordsToProductResolverIndices const& iIndices) {
1099     Worker* found = nullptr;
1100     for (auto const& worker : allWorkers()) {
1101       if (worker->description()->moduleLabel() == iLabel) {
1102         found = worker;
1103         break;
1104       }
1105     }
1106     if (nullptr == found) {
1107       return false;
1108     }
1109 
1110     auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1111 
1112     globalSchedule_->replaceModule(newMod, iLabel);
1113 
1114     for (auto& s : streamSchedules_) {
1115       s->replaceModule(newMod, iLabel);
1116     }
1117 
1118     {
1119       //Need to updateLookup in order to make getByToken work
1120       auto const processBlockLookup = iRegistry.registry().productLookup(InProcess);
1121       auto const runLookup = iRegistry.registry().productLookup(InRun);
1122       auto const lumiLookup = iRegistry.registry().productLookup(InLumi);
1123       auto const eventLookup = iRegistry.registry().productLookup(InEvent);
1124       newMod->updateLookup(InProcess, *runLookup);
1125       newMod->updateLookup(InRun, *runLookup);
1126       newMod->updateLookup(InLumi, *lumiLookup);
1127       newMod->updateLookup(InEvent, *eventLookup);
1128       newMod->updateLookup(iIndices);
1129 
1130       auto const& processName = newMod->moduleDescription().processName();
1131       auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1132       auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1133       auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1134       auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1135       newMod->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1136       newMod->resolvePutIndicies(InRun, runModuleToIndicies);
1137       newMod->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1138       newMod->resolvePutIndicies(InEvent, eventModuleToIndicies);
1139     }
1140 
1141     return true;
1142   }
1143 
1144   void Schedule::deleteModule(std::string const& iLabel, ActivityRegistry* areg) {
1145     globalSchedule_->deleteModule(iLabel);
1146     for (auto& stream : streamSchedules_) {
1147       stream->deleteModule(iLabel);
1148     }
1149     moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1150   }
1151 
1152   void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1153                                        std::multimap<std::string, std::string> const& referencesToBranches,
1154                                        std::vector<std::string> const& modulesToSkip,
1155                                        edm::ProductRegistry const& preg) {
1156     for (auto& stream : streamSchedules_) {
1157       stream->initializeEarlyDelete(*moduleRegistry_, branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1158     }
1159   }
1160 
1161   std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1162     std::vector<ModuleDescription const*> result;
1163     result.reserve(moduleRegistry_->maxModuleID() + 1);
1164 
1165     moduleRegistry_->forAllModuleHolders([&](auto const* iHolder) { result.push_back(&iHolder->moduleDescription()); });
1166     return result;
1167   }
1168   Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1169 
1170   void Schedule::convertCurrentProcessAlias(std::string const& processName) {
1171     moduleRegistry_->forAllModuleHolders([&](auto& iHolder) { iHolder->convertCurrentProcessAlias(processName); });
1172   }
1173 
1174   void Schedule::releaseMemoryPostLookupSignal() {
1175     moduleRegistry_->forAllModuleHolders([&](auto& iHolder) { iHolder->releaseMemoryPostLookupSignal(); });
1176   }
1177 
1178   void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1179     streamSchedules_[0]->availablePaths(oLabelsToFill);
1180   }
1181 
1182   void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1183 
1184   void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1185 
1186   void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1187     streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1188   }
1189 
1190   void Schedule::moduleDescriptionsInPath(std::string const& iPathLabel,
1191                                           std::vector<ModuleDescription const*>& descriptions,
1192                                           unsigned int hint) const {
1193     streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1194   }
1195 
1196   void Schedule::moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
1197                                              std::vector<ModuleDescription const*>& descriptions,
1198                                              unsigned int hint) const {
1199     streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1200   }
1201 
1202   void Schedule::getTriggerReport(TriggerReport& rep) const {
1203     rep.eventSummary.totalEvents = 0;
1204     rep.eventSummary.totalEventsPassed = 0;
1205     rep.eventSummary.totalEventsFailed = 0;
1206     for (auto& s : streamSchedules_) {
1207       s->getTriggerReport(rep);
1208     }
1209     sort_all(rep.workerSummaries);
1210   }
1211 
1212   void Schedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
1213     rep.eventSummary.totalEvents = 0;
1214     rep.eventSummary.cpuTime = 0.;
1215     rep.eventSummary.realTime = 0.;
1216     summaryTimeKeeper_->fillTriggerTimingReport(rep);
1217   }
1218 
1219   int Schedule::totalEvents() const {
1220     int returnValue = 0;
1221     for (auto& s : streamSchedules_) {
1222       returnValue += s->totalEvents();
1223     }
1224     return returnValue;
1225   }
1226 
1227   int Schedule::totalEventsPassed() const {
1228     int returnValue = 0;
1229     for (auto& s : streamSchedules_) {
1230       returnValue += s->totalEventsPassed();
1231     }
1232     return returnValue;
1233   }
1234 
1235   int Schedule::totalEventsFailed() const {
1236     int returnValue = 0;
1237     for (auto& s : streamSchedules_) {
1238       returnValue += s->totalEventsFailed();
1239     }
1240     return returnValue;
1241   }
1242 
1243   void Schedule::clearCounters() {
1244     for (auto& s : streamSchedules_) {
1245       s->clearCounters();
1246     }
1247   }
1248 }  // namespace edm