Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:06

0001 #ifndef FWCore_Framework_StreamSchedule_h
0002 #define FWCore_Framework_StreamSchedule_h
0003 
0004 /*
0005   Author: Jim Kowalkowski  28-01-06
0006 
0007   A class for creating a schedule based on paths in the configuration file.
0008   The schedule is maintained as a sequence of paths.
0009   After construction, events can be fed to the object and passed through
0010   all the modules in the schedule.  All accounting about processing
0011   of events by modules and paths is contained here or in object held
0012   by containment.
0013 
0014   The trigger results producer and product are generated and managed here.
0015   This class also manages endpaths and calls to endjob and beginjob.
0016   Endpaths are just treated as a simple list of modules that need to
0017   do processing of the event and do not participate in trigger path
0018   activities.
0019 
0020   This class requires the high-level process pset.  It uses @process_name.
0021   If the high-level pset contains an "options" pset, then the
0022   following optional parameter can be present:
0023   bool wantSummary = true/false   # default false
0024 
0025   wantSummary indicates whether or not the pass/fail/error stats
0026   for modules and paths should be printed at the end-of-job.
0027 
0028   A TriggerResults object will always be inserted into the event
0029   for any schedule.  The producer of the TriggerResults EDProduct
0030   is always the first module in the endpath.  The TriggerResultInserter
0031   is given a fixed label of "TriggerResults".
0032 
0033   Processing of an event happens by pushing the event through the Paths.
0034   The scheduler performs the reset() on each of the workers independent
0035   of the Path objects.
0036 
0037   ------------------------
0038 
0039   About Paths:
0040   Paths fit into two categories:
0041   1) trigger paths that contribute directly to saved trigger bits
0042   2) end paths
0043   The StreamSchedule holds these paths in two data structures:
0044   1) main path list
0045   2) end path list
0046 
0047   Trigger path processing always precedes endpath processing.
0048   The order of the paths from the input configuration is
0049   preserved in the main paths list.
0050 
0051   ------------------------
0052 
0053   The StreamSchedule uses the TriggerNamesService to get the names of the
0054   trigger paths and end paths. When a TriggerResults object is created
0055   the results are stored in the same order as the trigger names from
0056   TriggerNamesService.
0057 
0058 */
0059 
0060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
0061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/EventPrincipal.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0068 #include "FWCore/Framework/interface/WorkerManager.h"
0069 #include "FWCore/Framework/interface/Path.h"
0070 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0071 #include "FWCore/Framework/interface/maker/Worker.h"
0072 #include "FWCore/Framework/interface/WorkerRegistry.h"
0073 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0074 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0075 #include "FWCore/MessageLogger/interface/JobReport.h"
0076 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0077 #include "FWCore/ServiceRegistry/interface/Service.h"
0078 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0079 #include "FWCore/Concurrency/interface/FunctorTask.h"
0080 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0081 #include "FWCore/Utilities/interface/Algorithms.h"
0082 #include "FWCore/Utilities/interface/BranchType.h"
0083 #include "FWCore/Utilities/interface/ConvertException.h"
0084 #include "FWCore/Utilities/interface/Exception.h"
0085 #include "FWCore/Utilities/interface/StreamID.h"
0086 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0087 #include "FWCore/Utilities/interface/propagate_const.h"
0088 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0089 
0090 #include <map>
0091 #include <memory>
0092 #include <set>
0093 #include <string>
0094 #include <vector>
0095 #include <sstream>
0096 #include <atomic>
0097 #include <unordered_set>
0098 
0099 namespace edm {
0100 
0101   class ActivityRegistry;
0102   class BranchIDListHelper;
0103   class ExceptionCollector;
0104   class ExceptionToActionTable;
0105   class OutputModuleCommunicator;
0106   class ProcessContext;
0107   class UnscheduledCallProducer;
0108   class WorkerInPath;
0109   class ModuleRegistry;
0110   class TriggerResultInserter;
0111   class PathStatusInserter;
0112   class EndPathStatusInserter;
0113   class PreallocationConfiguration;
0114   class WaitingTaskHolder;
0115 
0116   class ConditionalTaskHelper;
0117 
0118   namespace service {
0119     class TriggerNamesService;
0120   }
0121 
0122   namespace {
0123     template <typename T>
0124     class StreamScheduleSignalSentry {
0125     public:
0126       StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
0127           : a_(a), context_(context), allowThrow_(false) {
0128         if (a_)
0129           T::preScheduleSignal(a_, context_);
0130       }
0131       ~StreamScheduleSignalSentry() noexcept(false) {
0132         // Caught exception is rethrown (when allowed)
0133         CMS_SA_ALLOW try {
0134           if (a_) {
0135             T::postScheduleSignal(a_, context_);
0136           }
0137         } catch (...) {
0138           if (allowThrow_) {
0139             throw;
0140           }
0141         }
0142       }
0143 
0144       void allowThrow() { allowThrow_ = true; }
0145 
0146     private:
0147       // We own none of these resources.
0148       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0149       typename T::Context const* context_;
0150       bool allowThrow_;
0151     };
0152   }  // namespace
0153 
0154   class StreamSchedule {
0155   public:
0156     typedef std::vector<std::string> vstring;
0157     typedef std::vector<Path> TrigPaths;
0158     typedef std::vector<Path> NonTrigPaths;
0159     typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
0160     typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
0161     typedef std::shared_ptr<Worker> WorkerPtr;
0162     typedef std::vector<Worker*> AllWorkers;
0163 
0164     typedef std::vector<Worker*> Workers;
0165 
0166     typedef std::vector<WorkerInPath> PathWorkers;
0167 
0168     StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0169                    std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0170                    std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0171                    std::shared_ptr<ModuleRegistry>,
0172                    ParameterSet& proc_pset,
0173                    service::TriggerNamesService const& tns,
0174                    PreallocationConfiguration const& prealloc,
0175                    ProductRegistry& pregistry,
0176                    ExceptionToActionTable const& actions,
0177                    std::shared_ptr<ActivityRegistry> areg,
0178                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0179                    StreamID streamID,
0180                    ProcessContext const* processContext);
0181 
0182     StreamSchedule(StreamSchedule const&) = delete;
0183 
0184     void processOneEventAsync(
0185         WaitingTaskHolder iTask,
0186         EventTransitionInfo&,
0187         ServiceToken const& token,
0188         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
0189 
0190     template <typename T>
0191     void processOneStreamAsync(WaitingTaskHolder iTask,
0192                                typename T::TransitionInfoType& transitionInfo,
0193                                ServiceToken const& token,
0194                                bool cleaningUpAfterException = false);
0195 
0196     void beginStream();
0197     void endStream();
0198 
0199     StreamID streamID() const { return streamID_; }
0200 
0201     /// Return a vector allowing const access to all the
0202     /// ModuleDescriptions for this StreamSchedule.
0203 
0204     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0205     /// *** passed to the caller. Do not call delete on these
0206     /// *** pointers!
0207     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0208 
0209     ///adds to oLabelsToFill the labels for all paths in the process
0210     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0211 
0212     ///adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
0213     void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0214 
0215     void moduleDescriptionsInPath(std::string const& iPathLabel,
0216                                   std::vector<ModuleDescription const*>& descriptions,
0217                                   unsigned int hint) const;
0218 
0219     void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0220                                      std::vector<ModuleDescription const*>& descriptions,
0221                                      unsigned int hint) const;
0222 
0223     /// Return the number of events this StreamSchedule has tried to process
0224     /// (inclues both successes and failures, including failures due
0225     /// to exceptions during processing).
0226     int totalEvents() const { return total_events_; }
0227 
0228     /// Return the number of events which have been passed by one or
0229     /// more trigger paths.
0230     int totalEventsPassed() const { return total_passed_; }
0231 
0232     /// Return the number of events that have not passed any trigger.
0233     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0234     int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
0235 
0236     /// Return the trigger report information on paths,
0237     /// modules-in-path, modules-in-endpath, and modules.
0238     void getTriggerReport(TriggerReport& rep) const;
0239 
0240     ///  Clear all the counters in the trigger report.
0241     void clearCounters();
0242 
0243     /// clone the type of module with label iLabel but configure with iPSet.
0244     void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0245 
0246     /// Delete the module with label iLabel
0247     void deleteModule(std::string const& iLabel);
0248 
0249     void initializeEarlyDelete(ModuleRegistry& modReg,
0250                                std::vector<std::string> const& branchesToDeleteEarly,
0251                                std::multimap<std::string, std::string> const& referencesToBranches,
0252                                std::vector<std::string> const& modulesToSkip,
0253                                edm::ProductRegistry const& preg);
0254 
0255     /// returns the collection of pointers to workers
0256     AllWorkers const& allWorkers() const { return workerManager_.allWorkers(); }
0257 
0258     AllWorkers const& unscheduledWorkers() const { return workerManager_.unscheduledWorkers(); }
0259     unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }
0260 
0261     StreamContext const& context() const { return streamContext_; }
0262 
0263     struct AliasInfo {
0264       std::string friendlyClassName;
0265       std::string instanceLabel;
0266       std::string originalInstanceLabel;
0267       std::string originalModuleLabel;
0268     };
0269 
0270   private:
0271     //Sentry class to only send a signal if an
0272     // exception occurs. An exception is identified
0273     // by the destructor being called without first
0274     // calling completedSuccessfully().
0275     class SendTerminationSignalIfException {
0276     public:
0277       SendTerminationSignalIfException(edm::ActivityRegistry* iReg, edm::StreamContext const* iContext)
0278           : reg_(iReg), context_(iContext) {}
0279       ~SendTerminationSignalIfException() {
0280         if (reg_) {
0281           reg_->preStreamEarlyTerminationSignal_(*context_, TerminationOrigin::ExceptionFromThisContext);
0282         }
0283       }
0284       void completedSuccessfully() { reg_ = nullptr; }
0285 
0286     private:
0287       edm::ActivityRegistry* reg_;  // We do not use propagate_const because the registry itself is mutable.
0288       StreamContext const* context_;
0289     };
0290 
0291     /// returns the action table
0292     ExceptionToActionTable const& actionTable() const { return workerManager_.actionTable(); }
0293 
0294     void resetAll();
0295 
0296     void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
0297 
0298     std::exception_ptr finishProcessOneEvent(std::exception_ptr);
0299 
0300     void reportSkipped(EventPrincipal const& ep) const;
0301 
0302     std::vector<Worker*> tryToPlaceConditionalModules(
0303         Worker*,
0304         std::unordered_set<std::string>& conditionalModules,
0305         std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0306         std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0307         ParameterSet& proc_pset,
0308         ProductRegistry& preg,
0309         PreallocationConfiguration const* prealloc,
0310         std::shared_ptr<ProcessConfiguration const> processConfiguration);
0311     void fillWorkers(ParameterSet& proc_pset,
0312                      ProductRegistry& preg,
0313                      PreallocationConfiguration const* prealloc,
0314                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0315                      std::string const& name,
0316                      bool ignoreFilters,
0317                      PathWorkers& out,
0318                      std::vector<std::string> const& endPathNames,
0319                      ConditionalTaskHelper const& conditionalTaskHelper,
0320                      std::unordered_set<std::string>& allConditionalModules);
0321     void fillTrigPath(ParameterSet& proc_pset,
0322                       ProductRegistry& preg,
0323                       PreallocationConfiguration const* prealloc,
0324                       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0325                       int bitpos,
0326                       std::string const& name,
0327                       TrigResPtr,
0328                       std::vector<std::string> const& endPathNames,
0329                       ConditionalTaskHelper const& conditionalTaskHelper,
0330                       std::unordered_set<std::string>& allConditionalModules);
0331     void fillEndPath(ParameterSet& proc_pset,
0332                      ProductRegistry& preg,
0333                      PreallocationConfiguration const* prealloc,
0334                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0335                      int bitpos,
0336                      std::string const& name,
0337                      std::vector<std::string> const& endPathNames,
0338                      ConditionalTaskHelper const& conditionalTaskHelper,
0339                      std::unordered_set<std::string>& allConditionalModules);
0340 
0341     void addToAllWorkers(Worker* w);
0342 
0343     void resetEarlyDelete();
0344 
0345     TrigResConstPtr results() const { return get_underlying_safe(results_); }
0346     TrigResPtr& results() { return get_underlying_safe(results_); }
0347 
0348     void makePathStatusInserters(
0349         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0350         std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0351         ExceptionToActionTable const& actions);
0352 
0353     WorkerManager workerManager_;
0354     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0355 
0356     edm::propagate_const<TrigResPtr> results_;
0357 
0358     edm::propagate_const<WorkerPtr> results_inserter_;
0359     std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
0360     std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
0361 
0362     TrigPaths trig_paths_;
0363     TrigPaths end_paths_;
0364     std::vector<int> empty_trig_paths_;
0365     std::vector<int> empty_end_paths_;
0366 
0367     //For each branch that has been marked for early deletion
0368     // keep track of how many modules are left that read this data but have
0369     // not yet been run in this event
0370     std::vector<BranchToCount> earlyDeleteBranchToCount_;
0371     //NOTE the following is effectively internal data for each EarlyDeleteHelper
0372     // but putting it into one vector makes for better allocation as well as
0373     // faster iteration when used to reset the earlyDeleteBranchToCount_
0374     // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
0375     // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
0376     // tell which EarlyDeleteHelper is associated with which BranchIDs.
0377     std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
0378     //There is one EarlyDeleteHelper per Module which are reading data that
0379     // has been marked for early deletion
0380     std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
0381 
0382     int total_events_;
0383     int total_passed_;
0384     unsigned int number_of_unscheduled_modules_;
0385 
0386     StreamID streamID_;
0387     StreamContext streamContext_;
0388   };
0389 
0390   void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
0391     Service<JobReport> reportSvc;
0392     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
0393   }
0394 
0395   template <typename T>
0396   void StreamSchedule::processOneStreamAsync(WaitingTaskHolder iHolder,
0397                                              typename T::TransitionInfoType& transitionInfo,
0398                                              ServiceToken const& token,
0399                                              bool cleaningUpAfterException) {
0400     auto const& principal = transitionInfo.principal();
0401     T::setStreamContext(streamContext_, principal);
0402 
0403     auto id = principal.id();
0404     ServiceWeakToken weakToken = token;
0405     auto doneTask = make_waiting_task(
0406         [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
0407           std::exception_ptr excpt;
0408           if (iPtr) {
0409             excpt = *iPtr;
0410             //add context information to the exception and print message
0411             try {
0412               convertException::wrap([&]() { std::rethrow_exception(excpt); });
0413             } catch (cms::Exception& ex) {
0414               //TODO: should add the transition type info
0415               std::ostringstream ost;
0416               if (ex.context().empty()) {
0417                 ost << "Processing " << T::transitionName() << " " << id;
0418               }
0419               ServiceRegistry::Operate op(weakToken.lock());
0420               addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0421               excpt = std::current_exception();
0422             }
0423 
0424             ServiceRegistry::Operate op(weakToken.lock());
0425             actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
0426           }
0427           // Caught exception is propagated via WaitingTaskHolder
0428           CMS_SA_ALLOW try {
0429             ServiceRegistry::Operate op(weakToken.lock());
0430             T::postScheduleSignal(actReg_.get(), &streamContext_);
0431           } catch (...) {
0432             if (not excpt) {
0433               excpt = std::current_exception();
0434             }
0435           }
0436           iHolder.doneWaiting(excpt);
0437         });
0438 
0439     auto task = make_functor_task(
0440         [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
0441           auto token = weakToken.lock();
0442           ServiceRegistry::Operate op(token);
0443           // Caught exception is propagated via WaitingTaskHolder
0444           CMS_SA_ALLOW try {
0445             T::preScheduleSignal(actReg_.get(), &streamContext_);
0446 
0447             workerManager_.resetAll();
0448           } catch (...) {
0449             h.doneWaiting(std::current_exception());
0450             return;
0451           }
0452 
0453           for (auto& p : end_paths_) {
0454             p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
0455           }
0456 
0457           for (auto& p : trig_paths_) {
0458             p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
0459           }
0460 
0461           workerManager_.processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
0462         });
0463 
0464     if (streamID_.value() == 0) {
0465       //Enqueueing will start another thread if there is only
0466       // one thread in the job. Having stream == 0 use spawn
0467       // avoids starting up another thread when there is only one stream.
0468       iHolder.group()->run([task]() {
0469         TaskSentry s{task};
0470         task->execute();
0471       });
0472     } else {
0473       oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
0474       arena.enqueue([task]() {
0475         TaskSentry s{task};
0476         task->execute();
0477       });
0478     }
0479   }
0480 }  // namespace edm
0481 
0482 #endif