Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-07-28 22:48:22

0001 #ifndef FWCore_Framework_GlobalSchedule_h
0002 #define FWCore_Framework_GlobalSchedule_h
0003 
0004 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0005 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0006 #include "FWCore/Framework/interface/EventPrincipal.h"
0007 #include "FWCore/Framework/interface/ExceptionActions.h"
0008 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0009 #include "FWCore/Framework/interface/Frameworkfwd.h"
0010 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0012 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0013 #include "FWCore/Framework/interface/RunPrincipal.h"
0014 #include "FWCore/Framework/interface/WorkerManager.h"
0015 #include "FWCore/Framework/interface/maker/Worker.h"
0016 #include "FWCore/Framework/interface/WorkerRegistry.h"
0017 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0018 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0019 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0021 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0022 #include "FWCore/Utilities/interface/Algorithms.h"
0023 #include "FWCore/Utilities/interface/BranchType.h"
0024 #include "FWCore/Utilities/interface/ConvertException.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/Utilities/interface/StreamID.h"
0027 #include "FWCore/Utilities/interface/propagate_const.h"
0028 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0029 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0030 
0031 #include <exception>
0032 #include <map>
0033 #include <memory>
0034 #include <set>
0035 #include <string>
0036 #include <vector>
0037 #include <sstream>
0038 #include "boost/range/adaptor/reversed.hpp"
0039 
0040 namespace edm {
0041 
0042   class ExceptionCollector;
0043   class PreallocationConfiguration;
0044   class ModuleRegistry;
0045   class TriggerResultInserter;
0046   class PathStatusInserter;
0047   class EndPathStatusInserter;
0048 
0049   class GlobalSchedule {
0050   public:
0051     typedef std::vector<std::string> vstring;
0052     typedef std::vector<Worker*> AllWorkers;
0053     typedef std::shared_ptr<Worker> WorkerPtr;
0054     typedef std::vector<Worker*> Workers;
0055 
0056     GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0057                    std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0058                    std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0059                    std::shared_ptr<ModuleRegistry> modReg,
0060                    std::vector<std::string> const& modulesToUse,
0061                    ParameterSet& proc_pset,
0062                    ProductRegistry& pregistry,
0063                    PreallocationConfiguration const& prealloc,
0064                    ExceptionToActionTable const& actions,
0065                    std::shared_ptr<ActivityRegistry> areg,
0066                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0067                    ProcessContext const* processContext);
0068     GlobalSchedule(GlobalSchedule const&) = delete;
0069 
0070     template <typename T>
0071     void processOneGlobalAsync(WaitingTaskHolder holder,
0072                                typename T::TransitionInfoType&,
0073                                ServiceToken const& token,
0074                                bool cleaningUpAfterException = false);
0075 
0076     void beginJob(ProductRegistry const&,
0077                   eventsetup::ESRecordsToProductResolverIndices const&,
0078                   ProcessBlockHelperBase const&,
0079                   PathsAndConsumesOfModulesBase const&,
0080                   ProcessContext const&);
0081     void endJob(ExceptionCollector& collector);
0082 
0083     /// Return a vector allowing const access to all the
0084     /// ModuleDescriptions for this GlobalSchedule.
0085 
0086     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0087     /// *** passed to the caller. Do not call delete on these
0088     /// *** pointers!
0089     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0090 
0091     /// Return whether each output module has reached its maximum count.
0092     bool terminate() const;
0093 
0094     /// clone the type of module with label iLabel but configure with iPSet.
0095     void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0096 
0097     /// Delete the module with label iLabel
0098     void deleteModule(std::string const& iLabel);
0099 
0100     /// returns the collection of pointers to workers
0101     AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
0102 
0103   private:
0104     /// returns the action table
0105     ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
0106 
0107     template <typename T>
0108     void preScheduleSignal(GlobalContext const*, ServiceToken const&);
0109 
0110     template <typename T>
0111     void postScheduleSignal(GlobalContext const*, ServiceWeakToken const&, std::exception_ptr&);
0112 
0113     void handleException(GlobalContext const*,
0114                          ServiceWeakToken const&,
0115                          bool cleaningUpAfterException,
0116                          std::exception_ptr&);
0117 
0118     std::vector<WorkerManager> workerManagers_;
0119     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0120     std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
0121     ProcessContext const* processContext_;
0122 
0123     // The next 4 variables use the same naming convention, even though we have no intention
0124     // to ever have concurrent ProcessBlocks or Jobs. They are all related to the number of
0125     // WorkerManagers needed for global transitions.
0126     unsigned int numberOfConcurrentLumis_;
0127     unsigned int numberOfConcurrentRuns_;
0128     static constexpr unsigned int numberOfConcurrentProcessBlocks_ = 1;
0129     static constexpr unsigned int numberOfConcurrentJobs_ = 1;
0130   };
0131 
0132   template <typename T>
0133   void GlobalSchedule::processOneGlobalAsync(WaitingTaskHolder iHolder,
0134                                              typename T::TransitionInfoType& transitionInfo,
0135                                              ServiceToken const& token,
0136                                              bool cleaningUpAfterException) {
0137     auto const& principal = transitionInfo.principal();
0138 
0139     // Caught exception is propagated via WaitingTaskHolder
0140     CMS_SA_ALLOW try {
0141       //need the doneTask to own the memory
0142       auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
0143 
0144       ServiceWeakToken weakToken = token;
0145       auto doneTask = make_waiting_task(
0146           [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
0147             std::exception_ptr excpt;
0148             if (iPtr) {
0149               excpt = *iPtr;
0150               // add context information to the exception and print message
0151               handleException(globalContext.get(), weakToken, cleaningUpAfterException, excpt);
0152             }
0153             postScheduleSignal<T>(globalContext.get(), weakToken, excpt);
0154             iHolder.doneWaiting(excpt);
0155           });
0156 
0157       //make sure the task doesn't get run until all workers have beens started
0158       WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
0159 
0160       CMS_SA_ALLOW try {
0161         preScheduleSignal<T>(globalContext.get(), token);
0162 
0163         unsigned int managerIndex = principal.index();
0164         if constexpr (T::branchType_ == InRun) {
0165           managerIndex += numberOfConcurrentLumis_;
0166         } else if constexpr (T::branchType_ == InProcess) {
0167           managerIndex += (numberOfConcurrentLumis_ + numberOfConcurrentRuns_);
0168         }
0169         WorkerManager& workerManager = workerManagers_[managerIndex];
0170         workerManager.resetAll();
0171 
0172         ParentContext parentContext(globalContext.get());
0173         // make sure the ProductResolvers know about their
0174         // workers to allow proper data dependency handling
0175         workerManager.setupResolvers(transitionInfo.principal());
0176 
0177         auto& aw = workerManager.allWorkers();
0178         for (Worker* worker : boost::adaptors::reverse(aw)) {
0179           worker->doWorkAsync<T>(
0180               holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
0181         }
0182       } catch (...) {
0183         holdForLoop.doneWaiting(std::current_exception());
0184       }
0185     } catch (...) {
0186       iHolder.doneWaiting(std::current_exception());
0187     }
0188   }
0189 
0190   template <typename T>
0191   void GlobalSchedule::preScheduleSignal(GlobalContext const* globalContext, ServiceToken const& token) {
0192     if (actReg_) {
0193       try {
0194         ServiceRegistry::Operate op(token);
0195         convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
0196       } catch (cms::Exception& ex) {
0197         exceptionContext(ex, *globalContext, "Handling pre signal, likely in a service function");
0198         throw;
0199       }
0200     }
0201   }
0202 
0203   template <typename T>
0204   void GlobalSchedule::postScheduleSignal(GlobalContext const* globalContext,
0205                                           ServiceWeakToken const& weakToken,
0206                                           std::exception_ptr& excpt) {
0207     if (actReg_) {
0208       try {
0209         convertException::wrap([this, &weakToken, globalContext]() {
0210           ServiceRegistry::Operate op(weakToken.lock());
0211           T::postScheduleSignal(actReg_.get(), globalContext);
0212         });
0213       } catch (cms::Exception& ex) {
0214         if (not excpt) {
0215           exceptionContext(ex, *globalContext, "Handling post signal, likely in a service function");
0216           excpt = std::current_exception();
0217         }
0218       }
0219     }
0220   }
0221 
0222 }  // namespace edm
0223 
0224 #endif