Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-04 04:34:52

0001 #ifndef FWCore_Framework_GlobalSchedule_h
0002 #define FWCore_Framework_GlobalSchedule_h
0003 
0004 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0005 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0006 #include "FWCore/Framework/interface/EventPrincipal.h"
0007 #include "FWCore/Framework/interface/ExceptionActions.h"
0008 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0009 #include "FWCore/Framework/interface/Frameworkfwd.h"
0010 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0012 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0013 #include "FWCore/Framework/interface/RunPrincipal.h"
0014 #include "FWCore/Framework/interface/WorkerManager.h"
0015 #include "FWCore/Framework/interface/maker/Worker.h"
0016 #include "FWCore/Framework/interface/WorkerRegistry.h"
0017 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0018 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0019 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0021 #include "FWCore/Utilities/interface/Algorithms.h"
0022 #include "FWCore/Utilities/interface/BranchType.h"
0023 #include "FWCore/Utilities/interface/ConvertException.h"
0024 #include "FWCore/Utilities/interface/Exception.h"
0025 #include "FWCore/Utilities/interface/StreamID.h"
0026 #include "FWCore/Utilities/interface/propagate_const.h"
0027 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0028 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0029 
0030 #include <exception>
0031 #include <map>
0032 #include <memory>
0033 #include <set>
0034 #include <string>
0035 #include <vector>
0036 #include <sstream>
0037 #include "boost/range/adaptor/reversed.hpp"
0038 
0039 namespace edm {
0040 
0041   class ActivityRegistry;
0042   class ExceptionCollector;
0043   class ProcessContext;
0044   class PreallocationConfiguration;
0045   class ModuleRegistry;
0046   class TriggerResultInserter;
0047   class PathStatusInserter;
0048   class EndPathStatusInserter;
0049 
0050   class GlobalSchedule {
0051   public:
0052     typedef std::vector<std::string> vstring;
0053     typedef std::vector<Worker*> AllWorkers;
0054     typedef std::shared_ptr<Worker> WorkerPtr;
0055     typedef std::vector<Worker*> Workers;
0056 
0057     GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0058                    std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0059                    std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0060                    std::shared_ptr<ModuleRegistry> modReg,
0061                    std::vector<std::string> const& modulesToUse,
0062                    ParameterSet& proc_pset,
0063                    ProductRegistry& pregistry,
0064                    PreallocationConfiguration const& prealloc,
0065                    ExceptionToActionTable const& actions,
0066                    std::shared_ptr<ActivityRegistry> areg,
0067                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0068                    ProcessContext const* processContext);
0069     GlobalSchedule(GlobalSchedule const&) = delete;
0070 
0071     template <typename T>
0072     void processOneGlobalAsync(WaitingTaskHolder holder,
0073                                typename T::TransitionInfoType&,
0074                                ServiceToken const& token,
0075                                bool cleaningUpAfterException = false);
0076 
0077     void beginJob(ProductRegistry const&,
0078                   eventsetup::ESRecordsToProductResolverIndices const&,
0079                   ProcessBlockHelperBase const&);
0080     void endJob(ExceptionCollector& collector);
0081 
0082     /// Return a vector allowing const access to all the
0083     /// ModuleDescriptions for this GlobalSchedule.
0084 
0085     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0086     /// *** passed to the caller. Do not call delete on these
0087     /// *** pointers!
0088     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0089 
0090     /// Return whether each output module has reached its maximum count.
0091     bool terminate() const;
0092 
0093     /// clone the type of module with label iLabel but configure with iPSet.
0094     void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0095 
0096     /// Delete the module with label iLabel
0097     void deleteModule(std::string const& iLabel);
0098 
0099     /// returns the collection of pointers to workers
0100     AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
0101 
0102   private:
0103     /// returns the action table
0104     ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
0105 
0106     template <typename T>
0107     void preScheduleSignal(GlobalContext const*, ServiceToken const&);
0108 
0109     template <typename T>
0110     void postScheduleSignal(GlobalContext const*, ServiceWeakToken const&, std::exception_ptr&);
0111 
0112     void handleException(GlobalContext const*,
0113                          ServiceWeakToken const&,
0114                          bool cleaningUpAfterException,
0115                          std::exception_ptr&);
0116 
0117     std::vector<WorkerManager> workerManagers_;
0118     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0119     std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
0120     ProcessContext const* processContext_;
0121     unsigned int numberOfConcurrentLumis_;
0122   };
0123 
0124   template <typename T>
0125   void GlobalSchedule::processOneGlobalAsync(WaitingTaskHolder iHolder,
0126                                              typename T::TransitionInfoType& transitionInfo,
0127                                              ServiceToken const& token,
0128                                              bool cleaningUpAfterException) {
0129     auto const& principal = transitionInfo.principal();
0130 
0131     // Caught exception is propagated via WaitingTaskHolder
0132     CMS_SA_ALLOW try {
0133       //need the doneTask to own the memory
0134       auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
0135 
0136       ServiceWeakToken weakToken = token;
0137       auto doneTask = make_waiting_task(
0138           [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
0139             std::exception_ptr excpt;
0140             if (iPtr) {
0141               excpt = *iPtr;
0142               // add context information to the exception and print message
0143               handleException(globalContext.get(), weakToken, cleaningUpAfterException, excpt);
0144             }
0145             postScheduleSignal<T>(globalContext.get(), weakToken, excpt);
0146             iHolder.doneWaiting(excpt);
0147           });
0148 
0149       //make sure the task doesn't get run until all workers have beens started
0150       WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
0151 
0152       CMS_SA_ALLOW try {
0153         preScheduleSignal<T>(globalContext.get(), token);
0154 
0155         unsigned int managerIndex = principal.index();
0156         if constexpr (T::branchType_ == InRun) {
0157           managerIndex += numberOfConcurrentLumis_;
0158         }
0159         WorkerManager& workerManager = workerManagers_[managerIndex];
0160         workerManager.resetAll();
0161 
0162         ParentContext parentContext(globalContext.get());
0163         // make sure the ProductResolvers know about their
0164         // workers to allow proper data dependency handling
0165         workerManager.setupResolvers(transitionInfo.principal());
0166 
0167         auto& aw = workerManager.allWorkers();
0168         for (Worker* worker : boost::adaptors::reverse(aw)) {
0169           worker->doWorkAsync<T>(
0170               holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
0171         }
0172       } catch (...) {
0173         holdForLoop.doneWaiting(std::current_exception());
0174       }
0175     } catch (...) {
0176       iHolder.doneWaiting(std::current_exception());
0177     }
0178   }
0179 
0180   template <typename T>
0181   void GlobalSchedule::preScheduleSignal(GlobalContext const* globalContext, ServiceToken const& token) {
0182     if (actReg_) {
0183       try {
0184         ServiceRegistry::Operate op(token);
0185         convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
0186       } catch (cms::Exception& ex) {
0187         std::ostringstream ost;
0188         ex.addContext("Handling pre signal, likely in a service function");
0189         exceptionContext(ost, *globalContext);
0190         ex.addContext(ost.str());
0191         throw;
0192       }
0193     }
0194   }
0195 
0196   template <typename T>
0197   void GlobalSchedule::postScheduleSignal(GlobalContext const* globalContext,
0198                                           ServiceWeakToken const& weakToken,
0199                                           std::exception_ptr& excpt) {
0200     if (actReg_) {
0201       try {
0202         convertException::wrap([this, &weakToken, globalContext]() {
0203           ServiceRegistry::Operate op(weakToken.lock());
0204           T::postScheduleSignal(actReg_.get(), globalContext);
0205         });
0206       } catch (cms::Exception& ex) {
0207         if (not excpt) {
0208           std::ostringstream ost;
0209           ex.addContext("Handling post signal, likely in a service function");
0210           exceptionContext(ost, *globalContext);
0211           ex.addContext(ost.str());
0212           excpt = std::current_exception();
0213         }
0214       }
0215     }
0216   }
0217 
0218 }  // namespace edm
0219 
0220 #endif