Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-13 22:49:47

0001 #ifndef FWCore_Framework_GlobalSchedule_h
0002 #define FWCore_Framework_GlobalSchedule_h
0003 
0004 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0005 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0006 #include "FWCore/Framework/interface/EventPrincipal.h"
0007 #include "FWCore/Framework/interface/ExceptionActions.h"
0008 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0009 #include "FWCore/Framework/interface/Frameworkfwd.h"
0010 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0012 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0013 #include "FWCore/Framework/interface/RunPrincipal.h"
0014 #include "FWCore/Framework/interface/WorkerManager.h"
0015 #include "FWCore/Framework/interface/maker/Worker.h"
0016 #include "FWCore/Framework/interface/WorkerRegistry.h"
0017 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0018 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0019 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0021 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0022 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0023 #include "FWCore/Utilities/interface/Algorithms.h"
0024 #include "FWCore/Utilities/interface/BranchType.h"
0025 #include "FWCore/Utilities/interface/ConvertException.h"
0026 #include "FWCore/Utilities/interface/Exception.h"
0027 #include "FWCore/Utilities/interface/StreamID.h"
0028 #include "FWCore/Utilities/interface/propagate_const.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0031 
0032 #include <exception>
0033 #include <map>
0034 #include <memory>
0035 #include <set>
0036 #include <string>
0037 #include <vector>
0038 #include <sstream>
0039 #include "boost/range/adaptor/reversed.hpp"
0040 
0041 namespace edm {
0042 
0043   class ExceptionCollector;
0044   class PreallocationConfiguration;
0045   class ModuleRegistry;
0046   class TriggerResultInserter;
0047   class PathStatusInserter;
0048   class EndPathStatusInserter;
0049 
0050   class GlobalSchedule {
0051   public:
0052     typedef std::vector<std::string> vstring;
0053     typedef std::vector<Worker*> AllWorkers;
0054     typedef std::shared_ptr<Worker> WorkerPtr;
0055     typedef std::vector<Worker*> Workers;
0056 
0057     GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0058                    std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0059                    std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0060                    std::shared_ptr<ModuleRegistry> modReg,
0061                    std::vector<std::string> const& modulesToUse,
0062                    ParameterSet& proc_pset,
0063                    SignallingProductRegistryFiller& pregistry,
0064                    PreallocationConfiguration const& prealloc,
0065                    ExceptionToActionTable const& actions,
0066                    std::shared_ptr<ActivityRegistry> areg,
0067                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0068                    ProcessContext const* processContext);
0069     GlobalSchedule(GlobalSchedule const&) = delete;
0070 
0071     template <typename T>
0072     void processOneGlobalAsync(WaitingTaskHolder holder,
0073                                typename T::TransitionInfoType&,
0074                                ServiceToken const& token,
0075                                bool cleaningUpAfterException = false);
0076 
0077     void beginJob(ProductRegistry const&,
0078                   eventsetup::ESRecordsToProductResolverIndices const&,
0079                   ProcessBlockHelperBase const&,
0080                   ProcessContext const&);
0081     void endJob(ExceptionCollector& collector);
0082 
0083     /// Return a vector allowing const access to all the
0084     /// ModuleDescriptions for this GlobalSchedule.
0085 
0086     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0087     /// *** passed to the caller. Do not call delete on these
0088     /// *** pointers!
0089     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0090 
0091     /// Return whether each output module has reached its maximum count.
0092     bool terminate() const;
0093 
0094     /// clone the type of module with label iLabel but configure with iPSet.
0095     void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0096 
0097     /// Delete the module with label iLabel
0098     void deleteModule(std::string const& iLabel);
0099 
0100     /// returns the collection of pointers to workers
0101     AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
0102 
0103     void releaseMemoryPostLookupSignal();
0104 
0105   private:
0106     /// returns the action table
0107     ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
0108 
0109     template <typename T>
0110     void preScheduleSignal(GlobalContext const*, ServiceToken const&);
0111 
0112     template <typename T>
0113     void postScheduleSignal(GlobalContext const*, ServiceWeakToken const&, std::exception_ptr&);
0114 
0115     void handleException(GlobalContext const*,
0116                          ServiceWeakToken const&,
0117                          bool cleaningUpAfterException,
0118                          std::exception_ptr&);
0119 
0120     std::vector<WorkerManager> workerManagers_;
0121     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0122     std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
0123     ProcessContext const* processContext_;
0124 
0125     // The next 4 variables use the same naming convention, even though we have no intention
0126     // to ever have concurrent ProcessBlocks or Jobs. They are all related to the number of
0127     // WorkerManagers needed for global transitions.
0128     unsigned int numberOfConcurrentLumis_;
0129     unsigned int numberOfConcurrentRuns_;
0130     static constexpr unsigned int numberOfConcurrentProcessBlocks_ = 1;
0131     static constexpr unsigned int numberOfConcurrentJobs_ = 1;
0132   };
0133 
0134   template <typename T>
0135   void GlobalSchedule::processOneGlobalAsync(WaitingTaskHolder iHolder,
0136                                              typename T::TransitionInfoType& transitionInfo,
0137                                              ServiceToken const& token,
0138                                              bool cleaningUpAfterException) {
0139     auto const& principal = transitionInfo.principal();
0140 
0141     // Caught exception is propagated via WaitingTaskHolder
0142     CMS_SA_ALLOW try {
0143       //need the doneTask to own the memory
0144       auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
0145 
0146       ServiceWeakToken weakToken = token;
0147       auto doneTask = make_waiting_task(
0148           [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
0149             std::exception_ptr excpt;
0150             if (iPtr) {
0151               excpt = *iPtr;
0152               // add context information to the exception and print message
0153               handleException(globalContext.get(), weakToken, cleaningUpAfterException, excpt);
0154             }
0155             postScheduleSignal<T>(globalContext.get(), weakToken, excpt);
0156             iHolder.doneWaiting(excpt);
0157           });
0158 
0159       //make sure the task doesn't get run until all workers have beens started
0160       WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
0161 
0162       CMS_SA_ALLOW try {
0163         preScheduleSignal<T>(globalContext.get(), token);
0164 
0165         unsigned int managerIndex = principal.index();
0166         if constexpr (T::branchType_ == InRun) {
0167           managerIndex += numberOfConcurrentLumis_;
0168         } else if constexpr (T::branchType_ == InProcess) {
0169           managerIndex += (numberOfConcurrentLumis_ + numberOfConcurrentRuns_);
0170         }
0171         WorkerManager& workerManager = workerManagers_[managerIndex];
0172         workerManager.resetAll();
0173 
0174         ParentContext parentContext(globalContext.get());
0175         // make sure the ProductResolvers know about their
0176         // workers to allow proper data dependency handling
0177         workerManager.setupResolvers(transitionInfo.principal());
0178 
0179         auto& aw = workerManager.allWorkers();
0180         for (Worker* worker : boost::adaptors::reverse(aw)) {
0181           worker->doWorkAsync<T>(
0182               holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
0183         }
0184       } catch (...) {
0185         holdForLoop.doneWaiting(std::current_exception());
0186       }
0187     } catch (...) {
0188       iHolder.doneWaiting(std::current_exception());
0189     }
0190   }
0191 
0192   template <typename T>
0193   void GlobalSchedule::preScheduleSignal(GlobalContext const* globalContext, ServiceToken const& token) {
0194     if (actReg_) {
0195       try {
0196         ServiceRegistry::Operate op(token);
0197         convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
0198       } catch (cms::Exception& ex) {
0199         exceptionContext(ex, *globalContext, "Handling pre signal, likely in a service function");
0200         throw;
0201       }
0202     }
0203   }
0204 
0205   template <typename T>
0206   void GlobalSchedule::postScheduleSignal(GlobalContext const* globalContext,
0207                                           ServiceWeakToken const& weakToken,
0208                                           std::exception_ptr& excpt) {
0209     if (actReg_) {
0210       try {
0211         convertException::wrap([this, &weakToken, globalContext]() {
0212           ServiceRegistry::Operate op(weakToken.lock());
0213           T::postScheduleSignal(actReg_.get(), globalContext);
0214         });
0215       } catch (cms::Exception& ex) {
0216         if (not excpt) {
0217           exceptionContext(ex, *globalContext, "Handling post signal, likely in a service function");
0218           excpt = std::current_exception();
0219         }
0220       }
0221     }
0222   }
0223 
0224 }  // namespace edm
0225 
0226 #endif