Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-04-08 00:35:12

0001 #ifndef FWCore_Framework_StreamSchedule_h
0002 #define FWCore_Framework_StreamSchedule_h
0003 
0004 /*
0005   Author: Jim Kowalkowski  28-01-06
0006 
0007   A class for creating a schedule based on paths in the configuration file.
0008   The schedule is maintained as a sequence of paths.
0009   After construction, events can be fed to the object and passed through
0010   all the modules in the schedule.  All accounting about processing
0011   of events by modules and paths is contained here or in object held
0012   by containment.
0013 
0014   The trigger results producer and product are generated and managed here.
0015   This class also manages endpaths and calls to endjob and beginjob.
0016   Endpaths are just treated as a simple list of modules that need to
0017   do processing of the event and do not participate in trigger path
0018   activities.
0019 
0020   This class requires the high-level process pset.  It uses @process_name.
0021   If the high-level pset contains an "options" pset, then the
0022   following optional parameter can be present:
0023   bool wantSummary = true/false   # default false
0024 
0025   wantSummary indicates whether or not the pass/fail/error stats
0026   for modules and paths should be printed at the end-of-job.
0027 
0028   A TriggerResults object will always be inserted into the event
0029   for any schedule.  The producer of the TriggerResults EDProduct
0030   is always the first module in the endpath.  The TriggerResultInserter
0031   is given a fixed label of "TriggerResults".
0032 
0033   Processing of an event happens by pushing the event through the Paths.
0034   The scheduler performs the reset() on each of the workers independent
0035   of the Path objects.
0036 
0037   ------------------------
0038 
0039   About Paths:
0040   Paths fit into two categories:
0041   1) trigger paths that contribute directly to saved trigger bits
0042   2) end paths
0043   The StreamSchedule holds these paths in two data structures:
0044   1) main path list
0045   2) end path list
0046 
0047   Trigger path processing always precedes endpath processing.
0048   The order of the paths from the input configuration is
0049   preserved in the main paths list.
0050 
0051   ------------------------
0052 
0053   The StreamSchedule uses the TriggerNamesService to get the names of the
0054   trigger paths and end paths. When a TriggerResults object is created
0055   the results are stored in the same order as the trigger names from
0056   TriggerNamesService.
0057 
0058 */
0059 
0060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
0061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/EventPrincipal.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0068 #include "FWCore/Framework/interface/WorkerManager.h"
0069 #include "FWCore/Framework/interface/EDProducer.h"
0070 #include "FWCore/Framework/interface/Path.h"
0071 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0072 #include "FWCore/Framework/interface/maker/Worker.h"
0073 #include "FWCore/Framework/interface/WorkerRegistry.h"
0074 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0075 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0076 #include "FWCore/MessageLogger/interface/JobReport.h"
0077 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0078 #include "FWCore/ServiceRegistry/interface/Service.h"
0079 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0080 #include "FWCore/Concurrency/interface/FunctorTask.h"
0081 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0082 #include "FWCore/Utilities/interface/Algorithms.h"
0083 #include "FWCore/Utilities/interface/BranchType.h"
0084 #include "FWCore/Utilities/interface/ConvertException.h"
0085 #include "FWCore/Utilities/interface/Exception.h"
0086 #include "FWCore/Utilities/interface/StreamID.h"
0087 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0088 #include "FWCore/Utilities/interface/propagate_const.h"
0089 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0090 
0091 #include <map>
0092 #include <memory>
0093 #include <set>
0094 #include <string>
0095 #include <vector>
0096 #include <sstream>
0097 #include <atomic>
0098 #include <unordered_set>
0099 
0100 namespace edm {
0101 
0102   class ActivityRegistry;
0103   class BranchIDListHelper;
0104   class ExceptionCollector;
0105   class ExceptionToActionTable;
0106   class OutputModuleCommunicator;
0107   class ProcessContext;
0108   class UnscheduledCallProducer;
0109   class WorkerInPath;
0110   class ModuleRegistry;
0111   class TriggerResultInserter;
0112   class PathStatusInserter;
0113   class EndPathStatusInserter;
0114   class PreallocationConfiguration;
0115   class WaitingTaskHolder;
0116 
0117   namespace service {
0118     class TriggerNamesService;
0119   }
0120 
0121   namespace {
0122     template <typename T>
0123     class StreamScheduleSignalSentry {
0124     public:
0125       StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
0126           : a_(a), context_(context), allowThrow_(false) {
0127         if (a_)
0128           T::preScheduleSignal(a_, context_);
0129       }
0130       ~StreamScheduleSignalSentry() noexcept(false) {
0131         // Caught exception is rethrown (when allowed)
0132         CMS_SA_ALLOW try {
0133           if (a_) {
0134             T::postScheduleSignal(a_, context_);
0135           }
0136         } catch (...) {
0137           if (allowThrow_) {
0138             throw;
0139           }
0140         }
0141       }
0142 
0143       void allowThrow() { allowThrow_ = true; }
0144 
0145     private:
0146       // We own none of these resources.
0147       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0148       typename T::Context const* context_;
0149       bool allowThrow_;
0150     };
0151   }  // namespace
0152 
0153   class StreamSchedule {
0154   public:
0155     typedef std::vector<std::string> vstring;
0156     typedef std::vector<Path> TrigPaths;
0157     typedef std::vector<Path> NonTrigPaths;
0158     typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
0159     typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
0160     typedef std::shared_ptr<Worker> WorkerPtr;
0161     typedef std::vector<Worker*> AllWorkers;
0162 
0163     typedef std::vector<Worker*> Workers;
0164 
0165     typedef std::vector<WorkerInPath> PathWorkers;
0166 
0167     StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0168                    std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0169                    std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0170                    std::shared_ptr<ModuleRegistry>,
0171                    ParameterSet& proc_pset,
0172                    service::TriggerNamesService const& tns,
0173                    PreallocationConfiguration const& prealloc,
0174                    ProductRegistry& pregistry,
0175                    BranchIDListHelper& branchIDListHelper,
0176                    ExceptionToActionTable const& actions,
0177                    std::shared_ptr<ActivityRegistry> areg,
0178                    std::shared_ptr<ProcessConfiguration> processConfiguration,
0179                    StreamID streamID,
0180                    ProcessContext const* processContext);
0181 
0182     StreamSchedule(StreamSchedule const&) = delete;
0183 
0184     void processOneEventAsync(
0185         WaitingTaskHolder iTask,
0186         EventTransitionInfo&,
0187         ServiceToken const& token,
0188         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
0189 
0190     template <typename T>
0191     void processOneStreamAsync(WaitingTaskHolder iTask,
0192                                typename T::TransitionInfoType& transitionInfo,
0193                                ServiceToken const& token,
0194                                bool cleaningUpAfterException = false);
0195 
0196     void beginStream();
0197     void endStream();
0198 
0199     StreamID streamID() const { return streamID_; }
0200 
0201     /// Return a vector allowing const access to all the
0202     /// ModuleDescriptions for this StreamSchedule.
0203 
0204     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0205     /// *** passed to the caller. Do not call delete on these
0206     /// *** pointers!
0207     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0208 
0209     ///adds to oLabelsToFill the labels for all paths in the process
0210     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0211 
0212     ///adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
0213     void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0214 
0215     void moduleDescriptionsInPath(std::string const& iPathLabel,
0216                                   std::vector<ModuleDescription const*>& descriptions,
0217                                   unsigned int hint) const;
0218 
0219     void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0220                                      std::vector<ModuleDescription const*>& descriptions,
0221                                      unsigned int hint) const;
0222 
0223     /// Return the number of events this StreamSchedule has tried to process
0224     /// (inclues both successes and failures, including failures due
0225     /// to exceptions during processing).
0226     int totalEvents() const { return total_events_; }
0227 
0228     /// Return the number of events which have been passed by one or
0229     /// more trigger paths.
0230     int totalEventsPassed() const { return total_passed_; }
0231 
0232     /// Return the number of events that have not passed any trigger.
0233     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0234     int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
0235 
0236     /// Return the trigger report information on paths,
0237     /// modules-in-path, modules-in-endpath, and modules.
0238     void getTriggerReport(TriggerReport& rep) const;
0239 
0240     ///  Clear all the counters in the trigger report.
0241     void clearCounters();
0242 
0243     /// clone the type of module with label iLabel but configure with iPSet.
0244     void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0245 
0246     /// Delete the module with label iLabel
0247     void deleteModule(std::string const& iLabel);
0248 
0249     void initializeEarlyDelete(ModuleRegistry& modReg,
0250                                std::vector<std::string> const& branchesToDeleteEarly,
0251                                edm::ProductRegistry const& preg);
0252 
0253     /// returns the collection of pointers to workers
0254     AllWorkers const& allWorkers() const { return workerManager_.allWorkers(); }
0255 
0256     unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }
0257 
0258     StreamContext const& context() const { return streamContext_; }
0259 
0260   private:
0261     //Sentry class to only send a signal if an
0262     // exception occurs. An exception is identified
0263     // by the destructor being called without first
0264     // calling completedSuccessfully().
0265     class SendTerminationSignalIfException {
0266     public:
0267       SendTerminationSignalIfException(edm::ActivityRegistry* iReg, edm::StreamContext const* iContext)
0268           : reg_(iReg), context_(iContext) {}
0269       ~SendTerminationSignalIfException() {
0270         if (reg_) {
0271           reg_->preStreamEarlyTerminationSignal_(*context_, TerminationOrigin::ExceptionFromThisContext);
0272         }
0273       }
0274       void completedSuccessfully() { reg_ = nullptr; }
0275 
0276     private:
0277       edm::ActivityRegistry* reg_;  // We do not use propagate_const because the registry itself is mutable.
0278       StreamContext const* context_;
0279     };
0280 
0281     /// returns the action table
0282     ExceptionToActionTable const& actionTable() const { return workerManager_.actionTable(); }
0283 
0284     void resetAll();
0285 
0286     void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
0287 
0288     std::exception_ptr finishProcessOneEvent(std::exception_ptr);
0289 
0290     void reportSkipped(EventPrincipal const& ep) const;
0291 
0292     struct AliasInfo {
0293       std::string friendlyClassName;
0294       std::string instanceLabel;
0295       std::string originalInstanceLabel;
0296       std::string originalModuleLabel;
0297     };
0298     std::vector<Worker*> tryToPlaceConditionalModules(
0299         Worker*,
0300         std::unordered_set<std::string>& conditionalModules,
0301         std::multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0302         std::multimap<std::string, AliasInfo> const& aliasMap,
0303         ParameterSet& proc_pset,
0304         ProductRegistry& preg,
0305         PreallocationConfiguration const* prealloc,
0306         std::shared_ptr<ProcessConfiguration const> processConfiguration);
0307     void fillWorkers(ParameterSet& proc_pset,
0308                      ProductRegistry& preg,
0309                      PreallocationConfiguration const* prealloc,
0310                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0311                      std::string const& name,
0312                      bool ignoreFilters,
0313                      PathWorkers& out,
0314                      std::vector<std::string> const& endPathNames);
0315     void fillTrigPath(ParameterSet& proc_pset,
0316                       ProductRegistry& preg,
0317                       PreallocationConfiguration const* prealloc,
0318                       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0319                       int bitpos,
0320                       std::string const& name,
0321                       TrigResPtr,
0322                       std::vector<std::string> const& endPathNames);
0323     void fillEndPath(ParameterSet& proc_pset,
0324                      ProductRegistry& preg,
0325                      PreallocationConfiguration const* prealloc,
0326                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0327                      int bitpos,
0328                      std::string const& name,
0329                      std::vector<std::string> const& endPathNames);
0330 
0331     void addToAllWorkers(Worker* w);
0332 
0333     void resetEarlyDelete();
0334 
0335     TrigResConstPtr results() const { return get_underlying_safe(results_); }
0336     TrigResPtr& results() { return get_underlying_safe(results_); }
0337 
0338     void makePathStatusInserters(
0339         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0340         std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0341         ExceptionToActionTable const& actions);
0342 
0343     WorkerManager workerManager_;
0344     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0345 
0346     edm::propagate_const<TrigResPtr> results_;
0347 
0348     edm::propagate_const<WorkerPtr> results_inserter_;
0349     std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
0350     std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
0351 
0352     TrigPaths trig_paths_;
0353     TrigPaths end_paths_;
0354     std::vector<int> empty_trig_paths_;
0355     std::vector<int> empty_end_paths_;
0356 
0357     //For each branch that has been marked for early deletion
0358     // keep track of how many modules are left that read this data but have
0359     // not yet been run in this event
0360     std::vector<BranchToCount> earlyDeleteBranchToCount_;
0361     //NOTE the following is effectively internal data for each EarlyDeleteHelper
0362     // but putting it into one vector makes for better allocation as well as
0363     // faster iteration when used to reset the earlyDeleteBranchToCount_
0364     // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
0365     // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
0366     // tell which EarlyDeleteHelper is associated with which BranchIDs.
0367     std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
0368     //There is one EarlyDeleteHelper per Module which are reading data that
0369     // has been marked for early deletion
0370     std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
0371 
0372     int total_events_;
0373     int total_passed_;
0374     unsigned int number_of_unscheduled_modules_;
0375 
0376     StreamID streamID_;
0377     StreamContext streamContext_;
0378     std::atomic<bool> skippingEvent_;
0379   };
0380 
0381   void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
0382     Service<JobReport> reportSvc;
0383     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
0384   }
0385 
0386   template <typename T>
0387   void StreamSchedule::processOneStreamAsync(WaitingTaskHolder iHolder,
0388                                              typename T::TransitionInfoType& transitionInfo,
0389                                              ServiceToken const& token,
0390                                              bool cleaningUpAfterException) {
0391     auto const& principal = transitionInfo.principal();
0392     T::setStreamContext(streamContext_, principal);
0393 
0394     auto id = principal.id();
0395     ServiceWeakToken weakToken = token;
0396     auto doneTask = make_waiting_task(
0397         [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
0398           std::exception_ptr excpt;
0399           if (iPtr) {
0400             excpt = *iPtr;
0401             //add context information to the exception and print message
0402             try {
0403               convertException::wrap([&]() { std::rethrow_exception(excpt); });
0404             } catch (cms::Exception& ex) {
0405               //TODO: should add the transition type info
0406               std::ostringstream ost;
0407               if (ex.context().empty()) {
0408                 ost << "Processing " << T::transitionName() << " " << id;
0409               }
0410               ServiceRegistry::Operate op(weakToken.lock());
0411               addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0412               excpt = std::current_exception();
0413             }
0414 
0415             ServiceRegistry::Operate op(weakToken.lock());
0416             actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
0417           }
0418           // Caught exception is propagated via WaitingTaskHolder
0419           CMS_SA_ALLOW try {
0420             ServiceRegistry::Operate op(weakToken.lock());
0421             T::postScheduleSignal(actReg_.get(), &streamContext_);
0422           } catch (...) {
0423             if (not excpt) {
0424               excpt = std::current_exception();
0425             }
0426           }
0427           iHolder.doneWaiting(excpt);
0428         });
0429 
0430     auto task = make_functor_task(
0431         [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
0432           auto token = weakToken.lock();
0433           ServiceRegistry::Operate op(token);
0434           // Caught exception is propagated via WaitingTaskHolder
0435           CMS_SA_ALLOW try {
0436             T::preScheduleSignal(actReg_.get(), &streamContext_);
0437 
0438             workerManager_.resetAll();
0439           } catch (...) {
0440             h.doneWaiting(std::current_exception());
0441             return;
0442           }
0443 
0444           for (auto& p : end_paths_) {
0445             p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
0446           }
0447 
0448           for (auto& p : trig_paths_) {
0449             p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
0450           }
0451 
0452           workerManager_.processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
0453         });
0454 
0455     if (streamID_.value() == 0) {
0456       //Enqueueing will start another thread if there is only
0457       // one thread in the job. Having stream == 0 use spawn
0458       // avoids starting up another thread when there is only one stream.
0459       iHolder.group()->run([task]() {
0460         TaskSentry s{task};
0461         task->execute();
0462       });
0463     } else {
0464       oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
0465       arena.enqueue([task]() {
0466         TaskSentry s{task};
0467         task->execute();
0468       });
0469     }
0470   }
0471 }  // namespace edm
0472 
0473 #endif