Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-07-28 22:48:23

0001 #include "FWCore/Framework/interface/GlobalSchedule.h"
0002 #include "FWCore/Framework/interface/maker/WorkerMaker.h"
0003 #include "FWCore/Framework/src/TriggerResultInserter.h"
0004 #include "FWCore/Framework/src/PathStatusInserter.h"
0005 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0006 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0007 
0008 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0009 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0011 #include "FWCore/ParameterSet/interface/Registry.h"
0012 #include "FWCore/Utilities/interface/Algorithms.h"
0013 #include "FWCore/Utilities/interface/Exception.h"
0014 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0015 
0016 #include <algorithm>
0017 #include <cassert>
0018 #include <cstdlib>
0019 #include <functional>
0020 #include <map>
0021 #include <sstream>
0022 
0023 namespace edm {
0024   GlobalSchedule::GlobalSchedule(
0025       std::shared_ptr<TriggerResultInserter> inserter,
0026       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0027       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0028       std::shared_ptr<ModuleRegistry> modReg,
0029       std::vector<std::string> const& iModulesToUse,
0030       ParameterSet& proc_pset,
0031       ProductRegistry& pregistry,
0032       PreallocationConfiguration const& prealloc,
0033       ExceptionToActionTable const& actions,
0034       std::shared_ptr<ActivityRegistry> areg,
0035       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0036       ProcessContext const* processContext)
0037       : actReg_(areg),
0038         processContext_(processContext),
0039         numberOfConcurrentLumis_(prealloc.numberOfLuminosityBlocks()),
0040         numberOfConcurrentRuns_(prealloc.numberOfRuns()) {
0041     unsigned int nManagers = prealloc.numberOfLuminosityBlocks() + prealloc.numberOfRuns() +
0042                              numberOfConcurrentProcessBlocks_ + numberOfConcurrentJobs_;
0043     workerManagers_.reserve(nManagers);
0044     for (unsigned int i = 0; i < nManagers; ++i) {
0045       workerManagers_.emplace_back(modReg, areg, actions);
0046     }
0047     for (auto const& moduleLabel : iModulesToUse) {
0048       bool isTracked;
0049       ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0050       if (modpset != nullptr) {  // It will be null for PathStatusInserters, it should
0051                                  // be impossible to be null for anything else
0052         assert(isTracked);
0053 
0054         //side effect keeps this module around
0055         for (auto& wm : workerManagers_) {
0056           wm.addToAllWorkers(wm.getWorker(*modpset, pregistry, &prealloc, processConfiguration, moduleLabel));
0057         }
0058       }
0059     }
0060     if (inserter) {
0061       inserter->doPreallocate(prealloc);
0062       for (auto& wm : workerManagers_) {
0063         auto results_inserter = WorkerPtr(new edm::WorkerT<TriggerResultInserter::ModuleType>(
0064             inserter, inserter->moduleDescription(), &actions));  // propagate_const<T> has no reset() function
0065         results_inserter->setActivityRegistry(actReg_);
0066         wm.addToAllWorkers(results_inserter.get());
0067         extraWorkers_.emplace_back(std::move(results_inserter));
0068       }
0069     }
0070 
0071     for (auto& pathStatusInserter : pathStatusInserters) {
0072       std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
0073       inserterPtr->doPreallocate(prealloc);
0074 
0075       for (auto& wm : workerManagers_) {
0076         WorkerPtr workerPtr(
0077             new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
0078         workerPtr->setActivityRegistry(actReg_);
0079         wm.addToAllWorkers(workerPtr.get());
0080         extraWorkers_.emplace_back(std::move(workerPtr));
0081       }
0082     }
0083 
0084     for (auto& endPathStatusInserter : endPathStatusInserters) {
0085       std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
0086       inserterPtr->doPreallocate(prealloc);
0087       for (auto& wm : workerManagers_) {
0088         WorkerPtr workerPtr(new edm::WorkerT<EndPathStatusInserter::ModuleType>(
0089             inserterPtr, inserterPtr->moduleDescription(), &actions));
0090         workerPtr->setActivityRegistry(actReg_);
0091         wm.addToAllWorkers(workerPtr.get());
0092         extraWorkers_.emplace_back(std::move(workerPtr));
0093       }
0094     }
0095 
0096   }  // GlobalSchedule::GlobalSchedule
0097 
0098   void GlobalSchedule::beginJob(ProductRegistry const& iRegistry,
0099                                 eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
0100                                 ProcessBlockHelperBase const& processBlockHelperBase,
0101                                 PathsAndConsumesOfModulesBase const& pathsAndConsumesOfModules,
0102                                 ProcessContext const& processContext) {
0103     GlobalContext globalContext(GlobalContext::Transition::kBeginJob, processContext_);
0104     unsigned int const managerIndex =
0105         numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0106 
0107     std::exception_ptr exceptionPtr;
0108     CMS_SA_ALLOW try {
0109       try {
0110         convertException::wrap([this, &pathsAndConsumesOfModules, &processContext]() {
0111           actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext);
0112         });
0113       } catch (cms::Exception& ex) {
0114         exceptionContext(ex, globalContext, "Handling pre signal, likely in a service function");
0115         throw;
0116       }
0117       workerManagers_[managerIndex].beginJob(iRegistry, iESIndices, processBlockHelperBase, globalContext);
0118     } catch (...) {
0119       exceptionPtr = std::current_exception();
0120     }
0121 
0122     try {
0123       convertException::wrap([this]() { actReg_->postBeginJobSignal_(); });
0124     } catch (cms::Exception& ex) {
0125       if (!exceptionPtr) {
0126         exceptionContext(ex, globalContext, "Handling post signal, likely in a service function");
0127         exceptionPtr = std::current_exception();
0128       }
0129     }
0130     if (exceptionPtr) {
0131       std::rethrow_exception(exceptionPtr);
0132     }
0133   }
0134 
0135   void GlobalSchedule::endJob(ExceptionCollector& collector) {
0136     GlobalContext globalContext(GlobalContext::Transition::kEndJob, processContext_);
0137     unsigned int const managerIndex =
0138         numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0139 
0140     std::exception_ptr exceptionPtr;
0141     CMS_SA_ALLOW try {
0142       try {
0143         convertException::wrap([this]() { actReg_->preEndJobSignal_(); });
0144       } catch (cms::Exception& ex) {
0145         exceptionContext(ex, globalContext, "Handling pre signal, likely in a service function");
0146         throw;
0147       }
0148       workerManagers_[managerIndex].endJob(collector, globalContext);
0149     } catch (...) {
0150       exceptionPtr = std::current_exception();
0151     }
0152 
0153     try {
0154       convertException::wrap([this]() { actReg_->postEndJobSignal_(); });
0155     } catch (cms::Exception& ex) {
0156       if (!exceptionPtr) {
0157         exceptionContext(ex, globalContext, "Handling post signal, likely in a service function");
0158         exceptionPtr = std::current_exception();
0159       }
0160     }
0161     if (exceptionPtr) {
0162       collector.call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
0163     }
0164   }
0165 
0166   void GlobalSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
0167     Worker* found = nullptr;
0168     unsigned int const jobManagerIndex =
0169         numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0170     unsigned int managerIndex = 0;
0171     for (auto& wm : workerManagers_) {
0172       for (auto const& worker : wm.allWorkers()) {
0173         if (worker->description()->moduleLabel() == iLabel) {
0174           found = worker;
0175           break;
0176         }
0177       }
0178       if (nullptr == found) {
0179         return;
0180       }
0181 
0182       iMod->replaceModuleFor(found);
0183       if (managerIndex == jobManagerIndex) {
0184         GlobalContext globalContext(GlobalContext::Transition::kBeginJob, processContext_);
0185         found->beginJob(globalContext);
0186       }
0187       ++managerIndex;
0188     }
0189   }
0190 
0191   void GlobalSchedule::deleteModule(std::string const& iLabel) {
0192     for (auto& wm : workerManagers_) {
0193       wm.deleteModuleIfExists(iLabel);
0194     }
0195   }
0196 
0197   std::vector<ModuleDescription const*> GlobalSchedule::getAllModuleDescriptions() const {
0198     std::vector<ModuleDescription const*> result;
0199     result.reserve(allWorkers().size());
0200 
0201     for (auto const& worker : allWorkers()) {
0202       ModuleDescription const* p = worker->description();
0203       result.push_back(p);
0204     }
0205     return result;
0206   }
0207 
0208   void GlobalSchedule::handleException(GlobalContext const* globalContext,
0209                                        ServiceWeakToken const& weakToken,
0210                                        bool cleaningUpAfterException,
0211                                        std::exception_ptr& excpt) {
0212     //add context information to the exception and print message
0213     try {
0214       convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
0215     } catch (cms::Exception& ex) {
0216       std::ostringstream ost;
0217       // In most cases the exception will already have context at this point,
0218       // but add some context here in those rare cases where it does not.
0219       if (ex.context().empty()) {
0220         exceptionContext(ost, *globalContext);
0221       }
0222       ServiceRegistry::Operate op(weakToken.lock());
0223       addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0224       excpt = std::current_exception();
0225     }
0226     // We are already handling an earlier exception, so ignore it
0227     // if this signal results in another exception being thrown.
0228     CMS_SA_ALLOW try {
0229       if (actReg_) {
0230         ServiceRegistry::Operate op(weakToken.lock());
0231         actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
0232       }
0233     } catch (...) {
0234     }
0235   }
0236 
0237 }  // namespace edm