Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-10-19 23:19:14

0001 #ifndef FWCore_Framework_StreamSchedule_h
0002 #define FWCore_Framework_StreamSchedule_h
0003 
0004 /*
0005   Author: Jim Kowalkowski  28-01-06
0006 
0007   A class for creating a schedule based on paths in the configuration file.
0008   The schedule is maintained as a sequence of paths.
0009   After construction, events can be fed to the object and passed through
0010   all the modules in the schedule.  All accounting about processing
0011   of events by modules and paths is contained here or in object held
0012   by containment.
0013 
0014   The trigger results producer and product are generated and managed here.
0015   This class also manages endpaths and calls to endjob and beginjob.
0016   Endpaths are just treated as a simple list of modules that need to
0017   do processing of the event and do not participate in trigger path
0018   activities.
0019 
0020   This class requires the high-level process pset.  It uses @process_name.
0021   If the high-level pset contains an "options" pset, then the
0022   following optional parameter can be present:
0023   bool wantSummary = true/false   # default false
0024 
0025   wantSummary indicates whether or not the pass/fail/error stats
0026   for modules and paths should be printed at the end-of-job.
0027 
0028   A TriggerResults object will always be inserted into the event
0029   for any schedule.  The producer of the TriggerResults EDProduct
0030   is always the first module in the endpath.  The TriggerResultInserter
0031   is given a fixed label of "TriggerResults".
0032 
0033   Processing of an event happens by pushing the event through the Paths.
0034   The scheduler performs the reset() on each of the workers independent
0035   of the Path objects.
0036 
0037   ------------------------
0038 
0039   About Paths:
0040   Paths fit into two categories:
0041   1) trigger paths that contribute directly to saved trigger bits
0042   2) end paths
0043   The StreamSchedule holds these paths in two data structures:
0044   1) main path list
0045   2) end path list
0046 
0047   Trigger path processing always precedes endpath processing.
0048   The order of the paths from the input configuration is
0049   preserved in the main paths list.
0050 
0051   ------------------------
0052 
0053   The StreamSchedule uses the TriggerNamesService to get the names of the
0054   trigger paths and end paths. When a TriggerResults object is created
0055   the results are stored in the same order as the trigger names from
0056   TriggerNamesService.
0057 
0058 */
0059 
0060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
0061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/EventPrincipal.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0068 #include "FWCore/Framework/interface/WorkerManager.h"
0069 #include "FWCore/Framework/interface/Path.h"
0070 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0071 #include "FWCore/Framework/interface/maker/Worker.h"
0072 #include "FWCore/Framework/interface/WorkerRegistry.h"
0073 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0074 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0075 #include "FWCore/MessageLogger/interface/JobReport.h"
0076 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0077 #include "FWCore/ServiceRegistry/interface/Service.h"
0078 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0079 #include "FWCore/Concurrency/interface/FunctorTask.h"
0080 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0081 #include "FWCore/Utilities/interface/Algorithms.h"
0082 #include "FWCore/Utilities/interface/BranchType.h"
0083 #include "FWCore/Utilities/interface/ConvertException.h"
0084 #include "FWCore/Utilities/interface/Exception.h"
0085 #include "FWCore/Utilities/interface/StreamID.h"
0086 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0087 #include "FWCore/Utilities/interface/propagate_const.h"
0088 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0089 
0090 #include <map>
0091 #include <memory>
0092 #include <set>
0093 #include <string>
0094 #include <vector>
0095 #include <sstream>
0096 #include <atomic>
0097 #include <unordered_set>
0098 
0099 namespace edm {
0100 
0101   class ActivityRegistry;
0102   class BranchIDListHelper;
0103   class ExceptionCollector;
0104   class ExceptionToActionTable;
0105   class OutputModuleCommunicator;
0106   class ProcessContext;
0107   class UnscheduledCallProducer;
0108   class WorkerInPath;
0109   class ModuleRegistry;
0110   class TriggerResultInserter;
0111   class PathStatusInserter;
0112   class EndPathStatusInserter;
0113   class PreallocationConfiguration;
0114   class WaitingTaskHolder;
0115 
0116   class ConditionalTaskHelper;
0117 
0118   namespace service {
0119     class TriggerNamesService;
0120   }
0121 
0122   namespace {
0123     template <typename T>
0124     class StreamScheduleSignalSentry {
0125     public:
0126       StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
0127           : a_(a), context_(context), allowThrow_(false) {
0128         if (a_)
0129           T::preScheduleSignal(a_, context_);
0130       }
0131       ~StreamScheduleSignalSentry() noexcept(false) {
0132         // Caught exception is rethrown (when allowed)
0133         CMS_SA_ALLOW try {
0134           if (a_) {
0135             T::postScheduleSignal(a_, context_);
0136           }
0137         } catch (...) {
0138           if (allowThrow_) {
0139             throw;
0140           }
0141         }
0142       }
0143 
0144       void allowThrow() { allowThrow_ = true; }
0145 
0146     private:
0147       // We own none of these resources.
0148       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0149       typename T::Context const* context_;
0150       bool allowThrow_;
0151     };
0152   }  // namespace
0153 
0154   class StreamSchedule {
0155   public:
0156     typedef std::vector<std::string> vstring;
0157     typedef std::vector<Path> TrigPaths;
0158     typedef std::vector<Path> NonTrigPaths;
0159     typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
0160     typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
0161     typedef std::shared_ptr<Worker> WorkerPtr;
0162     typedef std::vector<Worker*> AllWorkers;
0163 
0164     typedef std::vector<Worker*> Workers;
0165 
0166     typedef std::vector<WorkerInPath> PathWorkers;
0167 
0168     StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0169                    std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0170                    std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0171                    std::shared_ptr<ModuleRegistry>,
0172                    ParameterSet& proc_pset,
0173                    service::TriggerNamesService const& tns,
0174                    PreallocationConfiguration const& prealloc,
0175                    ProductRegistry& pregistry,
0176                    ExceptionToActionTable const& actions,
0177                    std::shared_ptr<ActivityRegistry> areg,
0178                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0179                    StreamID streamID,
0180                    ProcessContext const* processContext);
0181 
0182     StreamSchedule(StreamSchedule const&) = delete;
0183 
0184     void processOneEventAsync(
0185         WaitingTaskHolder iTask,
0186         EventTransitionInfo&,
0187         ServiceToken const& token,
0188         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
0189 
0190     template <typename T>
0191     void processOneStreamAsync(WaitingTaskHolder iTask,
0192                                typename T::TransitionInfoType& transitionInfo,
0193                                ServiceToken const& token,
0194                                bool cleaningUpAfterException = false);
0195 
0196     void beginStream();
0197     void endStream();
0198 
0199     StreamID streamID() const { return streamID_; }
0200 
0201     /// Return a vector allowing const access to all the
0202     /// ModuleDescriptions for this StreamSchedule.
0203 
0204     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0205     /// *** passed to the caller. Do not call delete on these
0206     /// *** pointers!
0207     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0208 
0209     ///adds to oLabelsToFill the labels for all paths in the process
0210     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0211 
0212     ///adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
0213     void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0214 
0215     void moduleDescriptionsInPath(std::string const& iPathLabel,
0216                                   std::vector<ModuleDescription const*>& descriptions,
0217                                   unsigned int hint) const;
0218 
0219     void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0220                                      std::vector<ModuleDescription const*>& descriptions,
0221                                      unsigned int hint) const;
0222 
0223     /// Return the number of events this StreamSchedule has tried to process
0224     /// (inclues both successes and failures, including failures due
0225     /// to exceptions during processing).
0226     int totalEvents() const { return total_events_; }
0227 
0228     /// Return the number of events which have been passed by one or
0229     /// more trigger paths.
0230     int totalEventsPassed() const { return total_passed_; }
0231 
0232     /// Return the number of events that have not passed any trigger.
0233     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0234     int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
0235 
0236     /// Return the trigger report information on paths,
0237     /// modules-in-path, modules-in-endpath, and modules.
0238     void getTriggerReport(TriggerReport& rep) const;
0239 
0240     ///  Clear all the counters in the trigger report.
0241     void clearCounters();
0242 
0243     /// clone the type of module with label iLabel but configure with iPSet.
0244     void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0245 
0246     /// Delete the module with label iLabel
0247     void deleteModule(std::string const& iLabel);
0248 
0249     void initializeEarlyDelete(ModuleRegistry& modReg,
0250                                std::vector<std::string> const& branchesToDeleteEarly,
0251                                std::multimap<std::string, std::string> const& referencesToBranches,
0252                                std::vector<std::string> const& modulesToSkip,
0253                                edm::ProductRegistry const& preg);
0254 
0255     /// returns the collection of pointers to workers
0256     AllWorkers const& allWorkers() const { return workerManager_.allWorkers(); }
0257 
0258     AllWorkers const& unscheduledWorkers() const { return workerManager_.unscheduledWorkers(); }
0259     unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }
0260 
0261     StreamContext const& context() const { return streamContext_; }
0262 
0263     struct AliasInfo {
0264       std::string friendlyClassName;
0265       std::string instanceLabel;
0266       std::string originalInstanceLabel;
0267       std::string originalModuleLabel;
0268     };
0269 
0270   private:
0271     //Sentry class to only send a signal if an
0272     // exception occurs. An exception is identified
0273     // by the destructor being called without first
0274     // calling completedSuccessfully().
0275     class SendTerminationSignalIfException {
0276     public:
0277       SendTerminationSignalIfException(edm::ActivityRegistry* iReg, edm::StreamContext const* iContext)
0278           : reg_(iReg), context_(iContext) {}
0279       ~SendTerminationSignalIfException() {
0280         if (reg_) {
0281           reg_->preStreamEarlyTerminationSignal_(*context_, TerminationOrigin::ExceptionFromThisContext);
0282         }
0283       }
0284       void completedSuccessfully() { reg_ = nullptr; }
0285 
0286     private:
0287       edm::ActivityRegistry* reg_;  // We do not use propagate_const because the registry itself is mutable.
0288       StreamContext const* context_;
0289     };
0290 
0291     /// returns the action table
0292     ExceptionToActionTable const& actionTable() const { return workerManager_.actionTable(); }
0293 
0294     void resetAll();
0295 
0296     void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
0297 
0298     std::exception_ptr finishProcessOneEvent(std::exception_ptr);
0299 
0300     void reportSkipped(EventPrincipal const& ep) const;
0301 
0302     std::vector<Worker*> tryToPlaceConditionalModules(
0303         Worker*,
0304         std::unordered_set<std::string>& conditionalModules,
0305         std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0306         std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0307         ParameterSet& proc_pset,
0308         ProductRegistry& preg,
0309         PreallocationConfiguration const* prealloc,
0310         std::shared_ptr<ProcessConfiguration const> processConfiguration);
0311     void fillWorkers(ParameterSet& proc_pset,
0312                      ProductRegistry& preg,
0313                      PreallocationConfiguration const* prealloc,
0314                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0315                      std::string const& name,
0316                      bool ignoreFilters,
0317                      PathWorkers& out,
0318                      std::vector<std::string> const& endPathNames,
0319                      ConditionalTaskHelper const& conditionalTaskHelper);
0320     void fillTrigPath(ParameterSet& proc_pset,
0321                       ProductRegistry& preg,
0322                       PreallocationConfiguration const* prealloc,
0323                       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0324                       int bitpos,
0325                       std::string const& name,
0326                       TrigResPtr,
0327                       std::vector<std::string> const& endPathNames,
0328                       ConditionalTaskHelper const& conditionalTaskHelper);
0329     void fillEndPath(ParameterSet& proc_pset,
0330                      ProductRegistry& preg,
0331                      PreallocationConfiguration const* prealloc,
0332                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0333                      int bitpos,
0334                      std::string const& name,
0335                      std::vector<std::string> const& endPathNames,
0336                      ConditionalTaskHelper const& conditionalTaskHelper);
0337 
0338     void addToAllWorkers(Worker* w);
0339 
0340     void resetEarlyDelete();
0341 
0342     TrigResConstPtr results() const { return get_underlying_safe(results_); }
0343     TrigResPtr& results() { return get_underlying_safe(results_); }
0344 
0345     void makePathStatusInserters(
0346         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0347         std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0348         ExceptionToActionTable const& actions);
0349 
0350     WorkerManager workerManager_;
0351     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0352 
0353     edm::propagate_const<TrigResPtr> results_;
0354 
0355     edm::propagate_const<WorkerPtr> results_inserter_;
0356     std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
0357     std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
0358 
0359     TrigPaths trig_paths_;
0360     TrigPaths end_paths_;
0361     std::vector<int> empty_trig_paths_;
0362     std::vector<int> empty_end_paths_;
0363 
0364     //For each branch that has been marked for early deletion
0365     // keep track of how many modules are left that read this data but have
0366     // not yet been run in this event
0367     std::vector<BranchToCount> earlyDeleteBranchToCount_;
0368     //NOTE the following is effectively internal data for each EarlyDeleteHelper
0369     // but putting it into one vector makes for better allocation as well as
0370     // faster iteration when used to reset the earlyDeleteBranchToCount_
0371     // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
0372     // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
0373     // tell which EarlyDeleteHelper is associated with which BranchIDs.
0374     std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
0375     //There is one EarlyDeleteHelper per Module which are reading data that
0376     // has been marked for early deletion
0377     std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
0378 
0379     int total_events_;
0380     int total_passed_;
0381     unsigned int number_of_unscheduled_modules_;
0382 
0383     StreamID streamID_;
0384     StreamContext streamContext_;
0385     std::atomic<bool> skippingEvent_;
0386   };
0387 
0388   void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
0389     Service<JobReport> reportSvc;
0390     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
0391   }
0392 
0393   template <typename T>
0394   void StreamSchedule::processOneStreamAsync(WaitingTaskHolder iHolder,
0395                                              typename T::TransitionInfoType& transitionInfo,
0396                                              ServiceToken const& token,
0397                                              bool cleaningUpAfterException) {
0398     auto const& principal = transitionInfo.principal();
0399     T::setStreamContext(streamContext_, principal);
0400 
0401     auto id = principal.id();
0402     ServiceWeakToken weakToken = token;
0403     auto doneTask = make_waiting_task(
0404         [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
0405           std::exception_ptr excpt;
0406           if (iPtr) {
0407             excpt = *iPtr;
0408             //add context information to the exception and print message
0409             try {
0410               convertException::wrap([&]() { std::rethrow_exception(excpt); });
0411             } catch (cms::Exception& ex) {
0412               //TODO: should add the transition type info
0413               std::ostringstream ost;
0414               if (ex.context().empty()) {
0415                 ost << "Processing " << T::transitionName() << " " << id;
0416               }
0417               ServiceRegistry::Operate op(weakToken.lock());
0418               addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0419               excpt = std::current_exception();
0420             }
0421 
0422             ServiceRegistry::Operate op(weakToken.lock());
0423             actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
0424           }
0425           // Caught exception is propagated via WaitingTaskHolder
0426           CMS_SA_ALLOW try {
0427             ServiceRegistry::Operate op(weakToken.lock());
0428             T::postScheduleSignal(actReg_.get(), &streamContext_);
0429           } catch (...) {
0430             if (not excpt) {
0431               excpt = std::current_exception();
0432             }
0433           }
0434           iHolder.doneWaiting(excpt);
0435         });
0436 
0437     auto task = make_functor_task(
0438         [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
0439           auto token = weakToken.lock();
0440           ServiceRegistry::Operate op(token);
0441           // Caught exception is propagated via WaitingTaskHolder
0442           CMS_SA_ALLOW try {
0443             T::preScheduleSignal(actReg_.get(), &streamContext_);
0444 
0445             workerManager_.resetAll();
0446           } catch (...) {
0447             h.doneWaiting(std::current_exception());
0448             return;
0449           }
0450 
0451           for (auto& p : end_paths_) {
0452             p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
0453           }
0454 
0455           for (auto& p : trig_paths_) {
0456             p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
0457           }
0458 
0459           workerManager_.processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
0460         });
0461 
0462     if (streamID_.value() == 0) {
0463       //Enqueueing will start another thread if there is only
0464       // one thread in the job. Having stream == 0 use spawn
0465       // avoids starting up another thread when there is only one stream.
0466       iHolder.group()->run([task]() {
0467         TaskSentry s{task};
0468         task->execute();
0469       });
0470     } else {
0471       oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
0472       arena.enqueue([task]() {
0473         TaskSentry s{task};
0474         task->execute();
0475       });
0476     }
0477   }
0478 }  // namespace edm
0479 
0480 #endif