Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-13 22:49:49

0001 #include "FWCore/Framework/interface/GlobalSchedule.h"
0002 #include "FWCore/Framework/interface/maker/WorkerMaker.h"
0003 #include "FWCore/Framework/src/TriggerResultInserter.h"
0004 #include "FWCore/Framework/src/PathStatusInserter.h"
0005 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0006 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0007 
0008 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0009 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0011 #include "FWCore/ParameterSet/interface/Registry.h"
0012 #include "FWCore/Utilities/interface/Algorithms.h"
0013 #include "FWCore/Utilities/interface/Exception.h"
0014 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0015 
0016 #include <algorithm>
0017 #include <cassert>
0018 #include <cstdlib>
0019 #include <functional>
0020 #include <map>
0021 #include <sstream>
0022 
0023 namespace edm {
0024   GlobalSchedule::GlobalSchedule(
0025       std::shared_ptr<TriggerResultInserter> inserter,
0026       std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0027       std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0028       std::shared_ptr<ModuleRegistry> modReg,
0029       std::vector<std::string> const& iModulesToUse,
0030       ParameterSet& proc_pset,
0031       SignallingProductRegistryFiller& pregistry,
0032       PreallocationConfiguration const& prealloc,
0033       ExceptionToActionTable const& actions,
0034       std::shared_ptr<ActivityRegistry> areg,
0035       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0036       ProcessContext const* processContext)
0037       : actReg_(areg),
0038         processContext_(processContext),
0039         numberOfConcurrentLumis_(prealloc.numberOfLuminosityBlocks()),
0040         numberOfConcurrentRuns_(prealloc.numberOfRuns()) {
0041     unsigned int nManagers = prealloc.numberOfLuminosityBlocks() + prealloc.numberOfRuns() +
0042                              numberOfConcurrentProcessBlocks_ + numberOfConcurrentJobs_;
0043     workerManagers_.reserve(nManagers);
0044     for (unsigned int i = 0; i < nManagers; ++i) {
0045       workerManagers_.emplace_back(modReg, areg, actions);
0046     }
0047     for (auto const& moduleLabel : iModulesToUse) {
0048       bool isTracked;
0049       ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0050       if (modpset != nullptr) {  // It will be null for PathStatusInserters, it should
0051                                  // be impossible to be null for anything else
0052         assert(isTracked);
0053 
0054         //side effect keeps this module around
0055         for (auto& wm : workerManagers_) {
0056           wm.addToAllWorkers(wm.getWorker(*modpset, pregistry, &prealloc, processConfiguration, moduleLabel));
0057         }
0058       }
0059     }
0060     if (inserter) {
0061       inserter->doPreallocate(prealloc);
0062       for (auto& wm : workerManagers_) {
0063         auto results_inserter = WorkerPtr(new edm::WorkerT<TriggerResultInserter::ModuleType>(
0064             inserter, inserter->moduleDescription(), &actions));  // propagate_const<T> has no reset() function
0065         results_inserter->setActivityRegistry(actReg_);
0066         wm.addToAllWorkers(results_inserter.get());
0067         extraWorkers_.emplace_back(std::move(results_inserter));
0068       }
0069     }
0070 
0071     for (auto& pathStatusInserter : pathStatusInserters) {
0072       std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
0073       inserterPtr->doPreallocate(prealloc);
0074 
0075       for (auto& wm : workerManagers_) {
0076         WorkerPtr workerPtr(
0077             new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
0078         workerPtr->setActivityRegistry(actReg_);
0079         wm.addToAllWorkers(workerPtr.get());
0080         extraWorkers_.emplace_back(std::move(workerPtr));
0081       }
0082     }
0083 
0084     for (auto& endPathStatusInserter : endPathStatusInserters) {
0085       std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
0086       inserterPtr->doPreallocate(prealloc);
0087       for (auto& wm : workerManagers_) {
0088         WorkerPtr workerPtr(new edm::WorkerT<EndPathStatusInserter::ModuleType>(
0089             inserterPtr, inserterPtr->moduleDescription(), &actions));
0090         workerPtr->setActivityRegistry(actReg_);
0091         wm.addToAllWorkers(workerPtr.get());
0092         extraWorkers_.emplace_back(std::move(workerPtr));
0093       }
0094     }
0095 
0096   }  // GlobalSchedule::GlobalSchedule
0097 
0098   void GlobalSchedule::beginJob(ProductRegistry const& iRegistry,
0099                                 eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
0100                                 ProcessBlockHelperBase const& processBlockHelperBase,
0101                                 ProcessContext const& processContext) {
0102     GlobalContext globalContext(GlobalContext::Transition::kBeginJob, processContext_);
0103     unsigned int const managerIndex =
0104         numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0105 
0106     std::exception_ptr exceptionPtr;
0107     CMS_SA_ALLOW try {
0108       try {
0109         convertException::wrap([this, &processContext]() { actReg_->preBeginJobSignal_(processContext); });
0110       } catch (cms::Exception& ex) {
0111         exceptionContext(ex, globalContext, "Handling pre signal, likely in a service function");
0112         throw;
0113       }
0114       workerManagers_[managerIndex].beginJob(iRegistry, iESIndices, processBlockHelperBase, globalContext);
0115     } catch (...) {
0116       exceptionPtr = std::current_exception();
0117     }
0118 
0119     try {
0120       convertException::wrap([this]() { actReg_->postBeginJobSignal_(); });
0121     } catch (cms::Exception& ex) {
0122       if (!exceptionPtr) {
0123         exceptionContext(ex, globalContext, "Handling post signal, likely in a service function");
0124         exceptionPtr = std::current_exception();
0125       }
0126     }
0127     if (exceptionPtr) {
0128       std::rethrow_exception(exceptionPtr);
0129     }
0130   }
0131 
0132   void GlobalSchedule::endJob(ExceptionCollector& collector) {
0133     GlobalContext globalContext(GlobalContext::Transition::kEndJob, processContext_);
0134     unsigned int const managerIndex =
0135         numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0136 
0137     std::exception_ptr exceptionPtr;
0138     CMS_SA_ALLOW try {
0139       try {
0140         convertException::wrap([this]() { actReg_->preEndJobSignal_(); });
0141       } catch (cms::Exception& ex) {
0142         exceptionContext(ex, globalContext, "Handling pre signal, likely in a service function");
0143         throw;
0144       }
0145       workerManagers_[managerIndex].endJob(collector, globalContext);
0146     } catch (...) {
0147       exceptionPtr = std::current_exception();
0148     }
0149 
0150     try {
0151       convertException::wrap([this]() { actReg_->postEndJobSignal_(); });
0152     } catch (cms::Exception& ex) {
0153       if (!exceptionPtr) {
0154         exceptionContext(ex, globalContext, "Handling post signal, likely in a service function");
0155         exceptionPtr = std::current_exception();
0156       }
0157     }
0158     if (exceptionPtr) {
0159       collector.call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
0160     }
0161   }
0162 
0163   void GlobalSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
0164     Worker* found = nullptr;
0165     unsigned int const jobManagerIndex =
0166         numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0167     unsigned int managerIndex = 0;
0168     for (auto& wm : workerManagers_) {
0169       for (auto const& worker : wm.allWorkers()) {
0170         if (worker->description()->moduleLabel() == iLabel) {
0171           found = worker;
0172           break;
0173         }
0174       }
0175       if (nullptr == found) {
0176         return;
0177       }
0178 
0179       iMod->replaceModuleFor(found);
0180       if (managerIndex == jobManagerIndex) {
0181         GlobalContext globalContext(GlobalContext::Transition::kBeginJob, processContext_);
0182         found->beginJob(globalContext);
0183       }
0184       ++managerIndex;
0185     }
0186   }
0187 
0188   void GlobalSchedule::deleteModule(std::string const& iLabel) {
0189     for (auto& wm : workerManagers_) {
0190       wm.deleteModuleIfExists(iLabel);
0191     }
0192   }
0193 
0194   void GlobalSchedule::releaseMemoryPostLookupSignal() {
0195     unsigned int const managerIndex =
0196         numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0197     workerManagers_[managerIndex].releaseMemoryPostLookupSignal();
0198   }
0199 
0200   std::vector<ModuleDescription const*> GlobalSchedule::getAllModuleDescriptions() const {
0201     std::vector<ModuleDescription const*> result;
0202     result.reserve(allWorkers().size());
0203 
0204     for (auto const& worker : allWorkers()) {
0205       ModuleDescription const* p = worker->description();
0206       result.push_back(p);
0207     }
0208     return result;
0209   }
0210 
0211   void GlobalSchedule::handleException(GlobalContext const* globalContext,
0212                                        ServiceWeakToken const& weakToken,
0213                                        bool cleaningUpAfterException,
0214                                        std::exception_ptr& excpt) {
0215     //add context information to the exception and print message
0216     try {
0217       convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
0218     } catch (cms::Exception& ex) {
0219       std::ostringstream ost;
0220       // In most cases the exception will already have context at this point,
0221       // but add some context here in those rare cases where it does not.
0222       if (ex.context().empty()) {
0223         exceptionContext(ost, *globalContext);
0224       }
0225       ServiceRegistry::Operate op(weakToken.lock());
0226       addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0227       excpt = std::current_exception();
0228     }
0229     // We are already handling an earlier exception, so ignore it
0230     // if this signal results in another exception being thrown.
0231     CMS_SA_ALLOW try {
0232       if (actReg_) {
0233         ServiceRegistry::Operate op(weakToken.lock());
0234         actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
0235       }
0236     } catch (...) {
0237     }
0238   }
0239 
0240 }  // namespace edm