File indexing completed on 2025-01-31 02:19:22
0001 #ifndef FWCore_Framework_StreamSchedule_h
0002 #define FWCore_Framework_StreamSchedule_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
0061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/EventPrincipal.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0068 #include "FWCore/Framework/interface/WorkerManager.h"
0069 #include "FWCore/Framework/interface/Path.h"
0070 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0071 #include "FWCore/Framework/interface/maker/Worker.h"
0072 #include "FWCore/Framework/interface/WorkerRegistry.h"
0073 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0074 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0075 #include "FWCore/MessageLogger/interface/JobReport.h"
0076 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0077 #include "FWCore/ServiceRegistry/interface/Service.h"
0078 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0079 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0080 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0081 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0082 #include "FWCore/Concurrency/interface/FunctorTask.h"
0083 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0084 #include "FWCore/Utilities/interface/Algorithms.h"
0085 #include "FWCore/Utilities/interface/BranchType.h"
0086 #include "FWCore/Utilities/interface/ConvertException.h"
0087 #include "FWCore/Utilities/interface/Exception.h"
0088 #include "FWCore/Utilities/interface/StreamID.h"
0089 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0090 #include "FWCore/Utilities/interface/propagate_const.h"
0091 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0092
0093 #include "oneapi/tbb/task_arena.h"
0094
0095 #include <exception>
0096 #include <map>
0097 #include <memory>
0098 #include <mutex>
0099 #include <set>
0100 #include <string>
0101 #include <vector>
0102 #include <sstream>
0103 #include <atomic>
0104 #include <unordered_set>
0105 #include <utility>
0106
0107 namespace edm {
0108
0109 class BranchIDListHelper;
0110 class ExceptionCollector;
0111 class ExceptionToActionTable;
0112 class OutputModuleCommunicator;
0113 class UnscheduledCallProducer;
0114 class WorkerInPath;
0115 class ModuleRegistry;
0116 class TriggerResultInserter;
0117 class PathStatusInserter;
0118 class EndPathStatusInserter;
0119 class PreallocationConfiguration;
0120 class ConditionalTaskHelper;
0121
0122 namespace service {
0123 class TriggerNamesService;
0124 }
0125
0126 class StreamSchedule {
0127 public:
0128 typedef std::vector<std::string> vstring;
0129 typedef std::vector<Path> TrigPaths;
0130 typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
0131 typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
0132 typedef std::shared_ptr<Worker> WorkerPtr;
0133 typedef std::vector<Worker*> AllWorkers;
0134
0135 typedef std::vector<Worker*> Workers;
0136
0137 typedef std::vector<WorkerInPath> PathWorkers;
0138
0139 StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0140 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0141 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0142 std::shared_ptr<ModuleRegistry>,
0143 ParameterSet& proc_pset,
0144 service::TriggerNamesService const& tns,
0145 PreallocationConfiguration const& prealloc,
0146 SignallingProductRegistry& pregistry,
0147 ExceptionToActionTable const& actions,
0148 std::shared_ptr<ActivityRegistry> areg,
0149 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0150 StreamID streamID,
0151 ProcessContext const* processContext);
0152
0153 StreamSchedule(StreamSchedule const&) = delete;
0154
0155 void processOneEventAsync(
0156 WaitingTaskHolder iTask,
0157 EventTransitionInfo&,
0158 ServiceToken const& token,
0159 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
0160
0161 template <typename T>
0162 void processOneStreamAsync(WaitingTaskHolder iTask,
0163 typename T::TransitionInfoType& transitionInfo,
0164 ServiceToken const& token,
0165 bool cleaningUpAfterException = false);
0166
0167 void beginStream();
0168 void endStream(ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
0169
0170 StreamID streamID() const { return streamID_; }
0171
0172
0173
0174
0175
0176
0177
0178 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0179
0180
0181 void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0182
0183
0184 void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0185
0186 void moduleDescriptionsInPath(std::string const& iPathLabel,
0187 std::vector<ModuleDescription const*>& descriptions,
0188 unsigned int hint) const;
0189
0190 void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0191 std::vector<ModuleDescription const*>& descriptions,
0192 unsigned int hint) const;
0193
0194
0195
0196
0197 int totalEvents() const { return total_events_; }
0198
0199
0200
0201 int totalEventsPassed() const { return total_passed_; }
0202
0203
0204
0205 int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
0206
0207
0208
0209 void getTriggerReport(TriggerReport& rep) const;
0210
0211
0212 void clearCounters();
0213
0214
0215 void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0216
0217
0218 void deleteModule(std::string const& iLabel);
0219
0220 void initializeEarlyDelete(ModuleRegistry& modReg,
0221 std::vector<std::string> const& branchesToDeleteEarly,
0222 std::multimap<std::string, std::string> const& referencesToBranches,
0223 std::vector<std::string> const& modulesToSkip,
0224 edm::SignallingProductRegistry const& preg);
0225
0226
0227 AllWorkers const& allWorkersBeginEnd() const { return workerManagerBeginEnd_.allWorkers(); }
0228 AllWorkers const& allWorkersRuns() const { return workerManagerRuns_.allWorkers(); }
0229 AllWorkers const& allWorkersLumisAndEvents() const { return workerManagerLumisAndEvents_.allWorkers(); }
0230
0231 AllWorkers const& unscheduledWorkersLumisAndEvents() const {
0232 return workerManagerLumisAndEvents_.unscheduledWorkers();
0233 }
0234 unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }
0235
0236 StreamContext const& context() const { return streamContext_; }
0237
0238 struct AliasInfo {
0239 std::string friendlyClassName;
0240 std::string instanceLabel;
0241 std::string originalInstanceLabel;
0242 std::string originalModuleLabel;
0243 };
0244
0245 private:
0246
0247 ExceptionToActionTable const& actionTable() const { return workerManagerLumisAndEvents_.actionTable(); }
0248
0249 void resetAll();
0250
0251 void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
0252
0253 std::exception_ptr finishProcessOneEvent(std::exception_ptr);
0254
0255 void reportSkipped(EventPrincipal const& ep) const;
0256
0257 std::vector<Worker*> tryToPlaceConditionalModules(
0258 Worker*,
0259 std::unordered_set<std::string>& conditionalModules,
0260 std::unordered_multimap<std::string, edm::ProductDescription const*> const& conditionalModuleBranches,
0261 std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0262 ParameterSet& proc_pset,
0263 SignallingProductRegistry& preg,
0264 PreallocationConfiguration const* prealloc,
0265 std::shared_ptr<ProcessConfiguration const> processConfiguration);
0266 void fillWorkers(ParameterSet& proc_pset,
0267 SignallingProductRegistry& preg,
0268 PreallocationConfiguration const* prealloc,
0269 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0270 std::string const& name,
0271 bool ignoreFilters,
0272 PathWorkers& out,
0273 std::vector<std::string> const& endPathNames,
0274 ConditionalTaskHelper const& conditionalTaskHelper,
0275 std::unordered_set<std::string>& allConditionalModules);
0276 void fillTrigPath(ParameterSet& proc_pset,
0277 SignallingProductRegistry& preg,
0278 PreallocationConfiguration const* prealloc,
0279 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0280 int bitpos,
0281 std::string const& name,
0282 TrigResPtr,
0283 std::vector<std::string> const& endPathNames,
0284 ConditionalTaskHelper const& conditionalTaskHelper,
0285 std::unordered_set<std::string>& allConditionalModules);
0286 void fillEndPath(ParameterSet& proc_pset,
0287 SignallingProductRegistry& preg,
0288 PreallocationConfiguration const* prealloc,
0289 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0290 int bitpos,
0291 std::string const& name,
0292 std::vector<std::string> const& endPathNames,
0293 ConditionalTaskHelper const& conditionalTaskHelper,
0294 std::unordered_set<std::string>& allConditionalModules);
0295
0296 void addToAllWorkers(Worker* w);
0297
0298 void resetEarlyDelete();
0299
0300 TrigResConstPtr results() const { return get_underlying_safe(results_); }
0301 TrigResPtr& results() { return get_underlying_safe(results_); }
0302
0303 void makePathStatusInserters(
0304 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0305 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0306 ExceptionToActionTable const& actions);
0307
0308 template <typename T>
0309 void preScheduleSignal(StreamContext const*) const;
0310
0311 template <typename T>
0312 void postScheduleSignal(StreamContext const*, std::exception_ptr&) const noexcept;
0313
0314 void handleException(StreamContext const&, bool cleaningUpAfterException, std::exception_ptr&) const noexcept;
0315
0316 WorkerManager workerManagerBeginEnd_;
0317 WorkerManager workerManagerRuns_;
0318 WorkerManager workerManagerLumisAndEvents_;
0319 std::shared_ptr<ActivityRegistry> actReg_;
0320
0321 edm::propagate_const<TrigResPtr> results_;
0322
0323 edm::propagate_const<WorkerPtr> results_inserter_;
0324 std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
0325 std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
0326
0327 TrigPaths trig_paths_;
0328 TrigPaths end_paths_;
0329 std::vector<int> empty_trig_paths_;
0330 std::vector<int> empty_end_paths_;
0331
0332
0333
0334
0335 std::vector<BranchToCount> earlyDeleteBranchToCount_;
0336
0337
0338
0339
0340
0341
0342 std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
0343
0344
0345 std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
0346
0347 int total_events_;
0348 int total_passed_;
0349 unsigned int number_of_unscheduled_modules_;
0350
0351 StreamID streamID_;
0352 StreamContext streamContext_;
0353 };
0354
0355 void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
0356 Service<JobReport> reportSvc;
0357 reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
0358 }
0359
0360 template <typename T>
0361 void StreamSchedule::processOneStreamAsync(WaitingTaskHolder iHolder,
0362 typename T::TransitionInfoType& transitionInfo,
0363 ServiceToken const& token,
0364 bool cleaningUpAfterException) {
0365 auto group = iHolder.group();
0366 auto const& principal = transitionInfo.principal();
0367 T::setStreamContext(streamContext_, principal);
0368
0369 ServiceWeakToken weakToken = token;
0370 auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
0371 std::exception_ptr const* iPtr) mutable {
0372 std::exception_ptr excpt;
0373 {
0374 ServiceRegistry::Operate op(weakToken.lock());
0375
0376 if (iPtr) {
0377 excpt = *iPtr;
0378 handleException(streamContext_, cleaningUpAfterException, excpt);
0379 }
0380 postScheduleSignal<T>(&streamContext_, excpt);
0381 }
0382 iHolder.doneWaiting(excpt);
0383 });
0384
0385 auto task =
0386 make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
0387 auto token = weakToken.lock();
0388 ServiceRegistry::Operate op(token);
0389
0390 WorkerManager* workerManager = &workerManagerRuns_;
0391 if (T::branchType_ == InLumi) {
0392 workerManager = &workerManagerLumisAndEvents_;
0393 }
0394 CMS_SA_ALLOW try {
0395 preScheduleSignal<T>(&streamContext_);
0396 workerManager->resetAll();
0397 } catch (...) {
0398
0399
0400
0401 h.presetTaskAsFailed(std::current_exception());
0402 return;
0403 }
0404
0405 workerManager->processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
0406 });
0407
0408 if (streamID_.value() == 0) {
0409
0410
0411
0412 group->run([task]() {
0413 TaskSentry s{task};
0414 task->execute();
0415 });
0416 } else {
0417 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
0418 arena.enqueue([task]() {
0419 TaskSentry s{task};
0420 task->execute();
0421 });
0422 }
0423 }
0424
0425 template <typename T>
0426 void StreamSchedule::preScheduleSignal(StreamContext const* streamContext) const {
0427 try {
0428 convertException::wrap([this, streamContext]() { T::preScheduleSignal(actReg_.get(), streamContext); });
0429 } catch (cms::Exception& ex) {
0430 std::ostringstream ost;
0431 ex.addContext("Handling pre signal, likely in a service function");
0432 exceptionContext(ost, *streamContext);
0433 ex.addContext(ost.str());
0434 throw;
0435 }
0436 }
0437
0438 template <typename T>
0439 void StreamSchedule::postScheduleSignal(StreamContext const* streamContext,
0440 std::exception_ptr& excpt) const noexcept {
0441 try {
0442 convertException::wrap([this, streamContext]() { T::postScheduleSignal(actReg_.get(), streamContext); });
0443 } catch (cms::Exception& ex) {
0444 if (not excpt) {
0445 std::ostringstream ost;
0446 ex.addContext("Handling post signal, likely in a service function");
0447 exceptionContext(ost, *streamContext);
0448 ex.addContext(ost.str());
0449 excpt = std::current_exception();
0450 }
0451 }
0452 }
0453 }
0454
0455 #endif