Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:03

0001 #ifndef FWCore_Framework_GlobalSchedule_h
0002 #define FWCore_Framework_GlobalSchedule_h
0003 
0004 /*
0005 */
0006 
0007 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0008 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0009 #include "FWCore/Framework/interface/EventPrincipal.h"
0010 #include "FWCore/Framework/interface/ExceptionActions.h"
0011 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0012 #include "FWCore/Framework/interface/Frameworkfwd.h"
0013 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0014 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0015 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0016 #include "FWCore/Framework/interface/RunPrincipal.h"
0017 #include "FWCore/Framework/interface/WorkerManager.h"
0018 #include "FWCore/Framework/interface/maker/Worker.h"
0019 #include "FWCore/Framework/interface/WorkerRegistry.h"
0020 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0021 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0022 #include "FWCore/Utilities/interface/Algorithms.h"
0023 #include "FWCore/Utilities/interface/BranchType.h"
0024 #include "FWCore/Utilities/interface/ConvertException.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/Utilities/interface/StreamID.h"
0027 #include "FWCore/Utilities/interface/propagate_const.h"
0028 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0029 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0030 
0031 #include <map>
0032 #include <memory>
0033 #include <set>
0034 #include <string>
0035 #include <vector>
0036 #include <sstream>
0037 #include "boost/range/adaptor/reversed.hpp"
0038 
0039 namespace edm {
0040 
0041   namespace {
0042     template <typename T>
0043     class GlobalScheduleSignalSentry {
0044     public:
0045       GlobalScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
0046           : a_(a), context_(context), allowThrow_(false) {
0047         if (a_)
0048           T::preScheduleSignal(a_, context_);
0049       }
0050       ~GlobalScheduleSignalSentry() noexcept(false) {
0051         // Caught exception is rethrown
0052         CMS_SA_ALLOW try {
0053           if (a_)
0054             T::postScheduleSignal(a_, context_);
0055         } catch (...) {
0056           if (allowThrow_) {
0057             throw;
0058           }
0059         }
0060       }
0061 
0062       void allowThrow() { allowThrow_ = true; }
0063 
0064     private:
0065       // We own none of these resources.
0066       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0067       typename T::Context const* context_;
0068       bool allowThrow_;
0069     };
0070   }  // namespace
0071 
0072   class ActivityRegistry;
0073   class ExceptionCollector;
0074   class ProcessContext;
0075   class PreallocationConfiguration;
0076   class ModuleRegistry;
0077   class TriggerResultInserter;
0078   class PathStatusInserter;
0079   class EndPathStatusInserter;
0080 
0081   class GlobalSchedule {
0082   public:
0083     typedef std::vector<std::string> vstring;
0084     typedef std::vector<Worker*> AllWorkers;
0085     typedef std::shared_ptr<Worker> WorkerPtr;
0086     typedef std::vector<Worker*> Workers;
0087 
0088     GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0089                    std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0090                    std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0091                    std::shared_ptr<ModuleRegistry> modReg,
0092                    std::vector<std::string> const& modulesToUse,
0093                    ParameterSet& proc_pset,
0094                    ProductRegistry& pregistry,
0095                    PreallocationConfiguration const& prealloc,
0096                    ExceptionToActionTable const& actions,
0097                    std::shared_ptr<ActivityRegistry> areg,
0098                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0099                    ProcessContext const* processContext);
0100     GlobalSchedule(GlobalSchedule const&) = delete;
0101 
0102     template <typename T>
0103     void processOneGlobalAsync(WaitingTaskHolder holder,
0104                                typename T::TransitionInfoType&,
0105                                ServiceToken const& token,
0106                                bool cleaningUpAfterException = false);
0107 
0108     void beginJob(ProductRegistry const&,
0109                   eventsetup::ESRecordsToProductResolverIndices const&,
0110                   ProcessBlockHelperBase const&);
0111     void endJob(ExceptionCollector& collector);
0112 
0113     /// Return a vector allowing const access to all the
0114     /// ModuleDescriptions for this GlobalSchedule.
0115 
0116     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0117     /// *** passed to the caller. Do not call delete on these
0118     /// *** pointers!
0119     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0120 
0121     /// Return whether each output module has reached its maximum count.
0122     bool terminate() const;
0123 
0124     /// clone the type of module with label iLabel but configure with iPSet.
0125     void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0126 
0127     /// Delete the module with label iLabel
0128     void deleteModule(std::string const& iLabel);
0129 
0130     /// returns the collection of pointers to workers
0131     AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
0132 
0133   private:
0134     //Sentry class to only send a signal if an
0135     // exception occurs. An exception is identified
0136     // by the destructor being called without first
0137     // calling completedSuccessfully().
0138     class SendTerminationSignalIfException {
0139     public:
0140       SendTerminationSignalIfException(edm::ActivityRegistry* iReg, edm::GlobalContext const* iContext)
0141           : reg_(iReg), context_(iContext) {}
0142       ~SendTerminationSignalIfException() {
0143         if (reg_) {
0144           reg_->preGlobalEarlyTerminationSignal_(*context_, TerminationOrigin::ExceptionFromThisContext);
0145         }
0146       }
0147       void completedSuccessfully() { reg_ = nullptr; }
0148 
0149     private:
0150       edm::ActivityRegistry* reg_;  // We do not use propagate_const because the registry itself is mutable.
0151       GlobalContext const* context_;
0152     };
0153 
0154     /// returns the action table
0155     ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
0156 
0157     std::vector<WorkerManager> workerManagers_;
0158     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0159     std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
0160     ProcessContext const* processContext_;
0161     unsigned int numberOfConcurrentLumis_;
0162   };
0163 
0164   template <typename T>
0165   void GlobalSchedule::processOneGlobalAsync(WaitingTaskHolder iHolder,
0166                                              typename T::TransitionInfoType& transitionInfo,
0167                                              ServiceToken const& token,
0168                                              bool cleaningUpAfterException) {
0169     auto const& principal = transitionInfo.principal();
0170 
0171     // Caught exception is propagated via WaitingTaskHolder
0172     CMS_SA_ALLOW try {
0173       //need the doneTask to own the memory
0174       auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
0175 
0176       if (actReg_) {
0177         //Services may depend upon each other
0178         ServiceRegistry::Operate op(token);
0179         T::preScheduleSignal(actReg_.get(), globalContext.get());
0180       }
0181 
0182       ServiceWeakToken weakToken = token;
0183       auto doneTask = make_waiting_task(
0184           [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
0185             std::exception_ptr excpt;
0186             if (iPtr) {
0187               excpt = *iPtr;
0188               //add context information to the exception and print message
0189               try {
0190                 convertException::wrap([&]() { std::rethrow_exception(excpt); });
0191               } catch (cms::Exception& ex) {
0192                 //TODO: should add the transition type info
0193                 std::ostringstream ost;
0194                 if (ex.context().empty()) {
0195                   ost << "Processing " << T::transitionName() << " ";
0196                 }
0197                 ServiceRegistry::Operate op(weakToken.lock());
0198                 addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0199                 excpt = std::current_exception();
0200               }
0201               if (actReg_) {
0202                 ServiceRegistry::Operate op(weakToken.lock());
0203                 actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
0204               }
0205             }
0206             if (actReg_) {
0207               // Caught exception is propagated via WaitingTaskHolder
0208               CMS_SA_ALLOW try {
0209                 ServiceRegistry::Operate op(weakToken.lock());
0210                 T::postScheduleSignal(actReg_.get(), globalContext.get());
0211               } catch (...) {
0212                 if (not excpt) {
0213                   excpt = std::current_exception();
0214                 }
0215               }
0216             }
0217             iHolder.doneWaiting(excpt);
0218           });
0219       unsigned int managerIndex = principal.index();
0220       if constexpr (T::branchType_ == InRun) {
0221         managerIndex += numberOfConcurrentLumis_;
0222       }
0223       WorkerManager& workerManager = workerManagers_[managerIndex];
0224       workerManager.resetAll();
0225 
0226       ParentContext parentContext(globalContext.get());
0227       //make sure the ProductResolvers know about their
0228       // workers to allow proper data dependency handling
0229       workerManager.setupResolvers(transitionInfo.principal());
0230 
0231       //make sure the task doesn't get run until all workers have beens started
0232       WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
0233       auto& aw = workerManager.allWorkers();
0234       for (Worker* worker : boost::adaptors::reverse(aw)) {
0235         worker->doWorkAsync<T>(
0236             holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
0237       }
0238     } catch (...) {
0239       iHolder.doneWaiting(std::current_exception());
0240     }
0241   }
0242 }  // namespace edm
0243 
0244 #endif