Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-08-04 22:45:01

0001 #ifndef FWCore_Framework_GlobalSchedule_h
0002 #define FWCore_Framework_GlobalSchedule_h
0003 
0004 /*
0005 */
0006 
0007 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0008 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0009 #include "FWCore/Framework/interface/EventPrincipal.h"
0010 #include "FWCore/Framework/interface/ExceptionActions.h"
0011 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0012 #include "FWCore/Framework/interface/Frameworkfwd.h"
0013 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0014 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0015 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0016 #include "FWCore/Framework/interface/RunPrincipal.h"
0017 #include "FWCore/Framework/interface/WorkerManager.h"
0018 #include "FWCore/Framework/interface/maker/Worker.h"
0019 #include "FWCore/Framework/interface/WorkerRegistry.h"
0020 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0021 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0022 #include "FWCore/Utilities/interface/Algorithms.h"
0023 #include "FWCore/Utilities/interface/BranchType.h"
0024 #include "FWCore/Utilities/interface/ConvertException.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/Utilities/interface/StreamID.h"
0027 #include "FWCore/Utilities/interface/propagate_const.h"
0028 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0029 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0030 
0031 #include <map>
0032 #include <memory>
0033 #include <set>
0034 #include <string>
0035 #include <vector>
0036 #include <sstream>
0037 #include "boost/range/adaptor/reversed.hpp"
0038 
0039 namespace edm {
0040 
0041   namespace {
0042     template <typename T>
0043     class GlobalScheduleSignalSentry {
0044     public:
0045       GlobalScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
0046           : a_(a), context_(context), allowThrow_(false) {
0047         if (a_)
0048           T::preScheduleSignal(a_, context_);
0049       }
0050       ~GlobalScheduleSignalSentry() noexcept(false) {
0051         // Caught exception is rethrown
0052         CMS_SA_ALLOW try {
0053           if (a_)
0054             T::postScheduleSignal(a_, context_);
0055         } catch (...) {
0056           if (allowThrow_) {
0057             throw;
0058           }
0059         }
0060       }
0061 
0062       void allowThrow() { allowThrow_ = true; }
0063 
0064     private:
0065       // We own none of these resources.
0066       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0067       typename T::Context const* context_;
0068       bool allowThrow_;
0069     };
0070   }  // namespace
0071 
0072   class ActivityRegistry;
0073   class ExceptionCollector;
0074   class ProcessContext;
0075   class PreallocationConfiguration;
0076   class ModuleRegistry;
0077   class TriggerResultInserter;
0078   class PathStatusInserter;
0079   class EndPathStatusInserter;
0080 
0081   class GlobalSchedule {
0082   public:
0083     typedef std::vector<std::string> vstring;
0084     typedef std::vector<Worker*> AllWorkers;
0085     typedef std::shared_ptr<Worker> WorkerPtr;
0086     typedef std::vector<Worker*> Workers;
0087 
0088     GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0089                    std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0090                    std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0091                    std::shared_ptr<ModuleRegistry> modReg,
0092                    std::vector<std::string> const& modulesToUse,
0093                    ParameterSet& proc_pset,
0094                    ProductRegistry& pregistry,
0095                    PreallocationConfiguration const& prealloc,
0096                    ExceptionToActionTable const& actions,
0097                    std::shared_ptr<ActivityRegistry> areg,
0098                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0099                    ProcessContext const* processContext);
0100     GlobalSchedule(GlobalSchedule const&) = delete;
0101 
0102     template <typename T>
0103     void processOneGlobalAsync(WaitingTaskHolder holder,
0104                                typename T::TransitionInfoType&,
0105                                ServiceToken const& token,
0106                                bool cleaningUpAfterException = false);
0107 
0108     void beginJob(ProductRegistry const&, eventsetup::ESRecordsToProxyIndices const&, ProcessBlockHelperBase const&);
0109     void endJob(ExceptionCollector& collector);
0110 
0111     /// Return a vector allowing const access to all the
0112     /// ModuleDescriptions for this GlobalSchedule.
0113 
0114     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0115     /// *** passed to the caller. Do not call delete on these
0116     /// *** pointers!
0117     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0118 
0119     /// Return whether each output module has reached its maximum count.
0120     bool terminate() const;
0121 
0122     /// clone the type of module with label iLabel but configure with iPSet.
0123     void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0124 
0125     /// Delete the module with label iLabel
0126     void deleteModule(std::string const& iLabel);
0127 
0128     /// returns the collection of pointers to workers
0129     AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
0130 
0131   private:
0132     //Sentry class to only send a signal if an
0133     // exception occurs. An exception is identified
0134     // by the destructor being called without first
0135     // calling completedSuccessfully().
0136     class SendTerminationSignalIfException {
0137     public:
0138       SendTerminationSignalIfException(edm::ActivityRegistry* iReg, edm::GlobalContext const* iContext)
0139           : reg_(iReg), context_(iContext) {}
0140       ~SendTerminationSignalIfException() {
0141         if (reg_) {
0142           reg_->preGlobalEarlyTerminationSignal_(*context_, TerminationOrigin::ExceptionFromThisContext);
0143         }
0144       }
0145       void completedSuccessfully() { reg_ = nullptr; }
0146 
0147     private:
0148       edm::ActivityRegistry* reg_;  // We do not use propagate_const because the registry itself is mutable.
0149       GlobalContext const* context_;
0150     };
0151 
0152     /// returns the action table
0153     ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
0154 
0155     std::vector<WorkerManager> workerManagers_;
0156     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0157     std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
0158     ProcessContext const* processContext_;
0159   };
0160 
0161   template <typename T>
0162   void GlobalSchedule::processOneGlobalAsync(WaitingTaskHolder iHolder,
0163                                              typename T::TransitionInfoType& transitionInfo,
0164                                              ServiceToken const& token,
0165                                              bool cleaningUpAfterException) {
0166     auto const& principal = transitionInfo.principal();
0167 
0168     // Caught exception is propagated via WaitingTaskHolder
0169     CMS_SA_ALLOW try {
0170       //need the doneTask to own the memory
0171       auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
0172 
0173       if (actReg_) {
0174         //Services may depend upon each other
0175         ServiceRegistry::Operate op(token);
0176         T::preScheduleSignal(actReg_.get(), globalContext.get());
0177       }
0178 
0179       ServiceWeakToken weakToken = token;
0180       auto doneTask = make_waiting_task(
0181           [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
0182             std::exception_ptr excpt;
0183             if (iPtr) {
0184               excpt = *iPtr;
0185               //add context information to the exception and print message
0186               try {
0187                 convertException::wrap([&]() { std::rethrow_exception(excpt); });
0188               } catch (cms::Exception& ex) {
0189                 //TODO: should add the transition type info
0190                 std::ostringstream ost;
0191                 if (ex.context().empty()) {
0192                   ost << "Processing " << T::transitionName() << " ";
0193                 }
0194                 ServiceRegistry::Operate op(weakToken.lock());
0195                 addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0196                 excpt = std::current_exception();
0197               }
0198               if (actReg_) {
0199                 ServiceRegistry::Operate op(weakToken.lock());
0200                 actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
0201               }
0202             }
0203             if (actReg_) {
0204               // Caught exception is propagated via WaitingTaskHolder
0205               CMS_SA_ALLOW try {
0206                 ServiceRegistry::Operate op(weakToken.lock());
0207                 T::postScheduleSignal(actReg_.get(), globalContext.get());
0208               } catch (...) {
0209                 if (not excpt) {
0210                   excpt = std::current_exception();
0211                 }
0212               }
0213             }
0214             iHolder.doneWaiting(excpt);
0215           });
0216       WorkerManager& workerManager = workerManagers_[principal.index()];
0217       workerManager.resetAll();
0218 
0219       ParentContext parentContext(globalContext.get());
0220       //make sure the ProductResolvers know about their
0221       // workers to allow proper data dependency handling
0222       workerManager.setupResolvers(transitionInfo.principal());
0223 
0224       //make sure the task doesn't get run until all workers have beens started
0225       WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
0226       auto& aw = workerManager.allWorkers();
0227       for (Worker* worker : boost::adaptors::reverse(aw)) {
0228         worker->doWorkAsync<T>(
0229             holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
0230       }
0231     } catch (...) {
0232       iHolder.doneWaiting(std::current_exception());
0233     }
0234   }
0235 }  // namespace edm
0236 
0237 #endif