File indexing completed on 2025-06-29 22:58:02
0001 #ifndef FWCore_Framework_StreamSchedule_h
0002 #define FWCore_Framework_StreamSchedule_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
0061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/EventPrincipal.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0068 #include "FWCore/Framework/interface/WorkerManager.h"
0069 #include "FWCore/Framework/interface/Path.h"
0070 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0071 #include "FWCore/Framework/interface/maker/Worker.h"
0072 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0073 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0074 #include "FWCore/MessageLogger/interface/JobReport.h"
0075 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0076 #include "FWCore/ServiceRegistry/interface/Service.h"
0077 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0078 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0079 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0080 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0081 #include "FWCore/Concurrency/interface/FunctorTask.h"
0082 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0083 #include "FWCore/Utilities/interface/Algorithms.h"
0084 #include "FWCore/Utilities/interface/BranchType.h"
0085 #include "FWCore/Utilities/interface/ConvertException.h"
0086 #include "FWCore/Utilities/interface/Exception.h"
0087 #include "FWCore/Utilities/interface/StreamID.h"
0088 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0089 #include "FWCore/Utilities/interface/propagate_const.h"
0090 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0091
0092 #include "oneapi/tbb/task_arena.h"
0093
0094 #include <exception>
0095 #include <map>
0096 #include <memory>
0097 #include <mutex>
0098 #include <set>
0099 #include <string>
0100 #include <vector>
0101 #include <sstream>
0102 #include <atomic>
0103 #include <unordered_set>
0104 #include <utility>
0105
0106 namespace edm {
0107
0108 class BranchIDListHelper;
0109 class ExceptionCollector;
0110 class ExceptionToActionTable;
0111 class OutputModuleCommunicator;
0112 class UnscheduledCallProducer;
0113 class WorkerInPath;
0114 class ModuleRegistry;
0115 class TriggerResultInserter;
0116 class PathStatusInserter;
0117 class EndPathStatusInserter;
0118 class PreallocationConfiguration;
0119 class ConditionalTaskHelper;
0120
0121 namespace service {
0122 class TriggerNamesService;
0123 }
0124
0125 class StreamSchedule {
0126 public:
0127 typedef std::vector<std::string> vstring;
0128 typedef std::vector<Path> TrigPaths;
0129 typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
0130 typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
0131 typedef std::vector<Worker*> AllWorkers;
0132
0133 typedef std::vector<Worker*> Workers;
0134
0135 typedef std::vector<WorkerInPath> PathWorkers;
0136
0137 StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0138 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0139 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0140 std::shared_ptr<ModuleRegistry>,
0141 ParameterSet& proc_pset,
0142 service::TriggerNamesService const& tns,
0143 PreallocationConfiguration const& prealloc,
0144 SignallingProductRegistryFiller& pregistry,
0145 ExceptionToActionTable const& actions,
0146 std::shared_ptr<ActivityRegistry> areg,
0147 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0148 StreamID streamID,
0149 ProcessContext const* processContext);
0150
0151 StreamSchedule(StreamSchedule const&) = delete;
0152
0153 void processOneEventAsync(
0154 WaitingTaskHolder iTask,
0155 EventTransitionInfo&,
0156 ServiceToken const& token,
0157 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
0158
0159 template <typename T>
0160 void processOneStreamAsync(WaitingTaskHolder iTask,
0161 typename T::TransitionInfoType& transitionInfo,
0162 ServiceToken const& token,
0163 bool cleaningUpAfterException = false);
0164
0165 void beginStream(ModuleRegistry& iModuleRegistry);
0166 void endStream(ModuleRegistry& iModuleRegistry, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
0167
0168 StreamID streamID() const { return streamID_; }
0169
0170
0171
0172
0173
0174
0175
0176 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0177
0178
0179 void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0180
0181
0182 void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0183
0184 void moduleDescriptionsInPath(std::string const& iPathLabel,
0185 std::vector<ModuleDescription const*>& descriptions,
0186 unsigned int hint) const;
0187
0188 void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0189 std::vector<ModuleDescription const*>& descriptions,
0190 unsigned int hint) const;
0191
0192
0193
0194
0195 int totalEvents() const { return total_events_; }
0196
0197
0198
0199 int totalEventsPassed() const { return total_passed_; }
0200
0201
0202
0203 int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
0204
0205
0206
0207 void getTriggerReport(TriggerReport& rep) const;
0208
0209
0210 void clearCounters();
0211
0212
0213 void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0214
0215
0216 void deleteModule(std::string const& iLabel);
0217
0218 void initializeEarlyDelete(ModuleRegistry& modReg,
0219 std::vector<std::string> const& branchesToDeleteEarly,
0220 std::multimap<std::string, std::string> const& referencesToBranches,
0221 std::vector<std::string> const& modulesToSkip,
0222 edm::ProductRegistry const& preg);
0223
0224
0225 AllWorkers const& allWorkersRuns() const { return workerManagerRuns_.allWorkers(); }
0226 AllWorkers const& allWorkersLumisAndEvents() const { return workerManagerLumisAndEvents_.allWorkers(); }
0227
0228 AllWorkers const& unscheduledWorkersLumisAndEvents() const {
0229 return workerManagerLumisAndEvents_.unscheduledWorkers();
0230 }
0231 unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }
0232
0233 StreamContext const& context() const { return streamContext_; }
0234
0235 struct AliasInfo {
0236 std::string friendlyClassName;
0237 std::string instanceLabel;
0238 std::string originalInstanceLabel;
0239 std::string originalModuleLabel;
0240 };
0241
0242 private:
0243
0244 ExceptionToActionTable const& actionTable() const { return workerManagerLumisAndEvents_.actionTable(); }
0245
0246 void resetAll();
0247
0248 void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
0249
0250 std::exception_ptr finishProcessOneEvent(std::exception_ptr);
0251
0252 void reportSkipped(EventPrincipal const& ep) const;
0253
0254 std::vector<Worker*> tryToPlaceConditionalModules(
0255 Worker*,
0256 std::unordered_set<std::string>& conditionalModules,
0257 std::unordered_multimap<std::string, edm::ProductDescription const*> const& conditionalModuleBranches,
0258 std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0259 ParameterSet& proc_pset,
0260 SignallingProductRegistryFiller& preg,
0261 PreallocationConfiguration const* prealloc,
0262 std::shared_ptr<ProcessConfiguration const> processConfiguration);
0263 PathWorkers fillWorkers(ParameterSet& proc_pset,
0264 SignallingProductRegistryFiller& preg,
0265 PreallocationConfiguration const* prealloc,
0266 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0267 std::string const& name,
0268 bool ignoreFilters,
0269 std::vector<std::string> const& endPathNames,
0270 ConditionalTaskHelper const& conditionalTaskHelper,
0271 std::unordered_set<std::string>& allConditionalModules);
0272 void fillTrigPath(ParameterSet& proc_pset,
0273 SignallingProductRegistryFiller& preg,
0274 PreallocationConfiguration const* prealloc,
0275 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0276 int bitpos,
0277 std::string const& name,
0278 TrigResPtr,
0279 std::vector<std::string> const& endPathNames,
0280 ConditionalTaskHelper const& conditionalTaskHelper,
0281 std::unordered_set<std::string>& allConditionalModules);
0282 void fillEndPath(ParameterSet& proc_pset,
0283 SignallingProductRegistryFiller& preg,
0284 PreallocationConfiguration const* prealloc,
0285 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0286 int bitpos,
0287 std::string const& name,
0288 std::vector<std::string> const& endPathNames,
0289 ConditionalTaskHelper const& conditionalTaskHelper,
0290 std::unordered_set<std::string>& allConditionalModules);
0291
0292 void addToAllWorkers(Worker* w);
0293
0294 void resetEarlyDelete();
0295
0296 TrigResConstPtr results() const { return get_underlying_safe(results_); }
0297 TrigResPtr& results() { return get_underlying_safe(results_); }
0298
0299 void makePathStatusInserters(
0300 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0301 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0302 ExceptionToActionTable const& actions);
0303
0304 template <typename T>
0305 void preScheduleSignal(StreamContext const*) const;
0306
0307 template <typename T>
0308 void postScheduleSignal(StreamContext const*, std::exception_ptr&) const noexcept;
0309
0310 void handleException(StreamContext const&, bool cleaningUpAfterException, std::exception_ptr&) const noexcept;
0311
0312 std::vector<unsigned int> moduleBeginStreamFailed_;
0313 WorkerManager workerManagerRuns_;
0314 WorkerManager workerManagerLumisAndEvents_;
0315 std::shared_ptr<ActivityRegistry> actReg_;
0316
0317 edm::propagate_const<TrigResPtr> results_;
0318
0319 edm::propagate_const<Worker*> results_inserter_;
0320 std::vector<edm::propagate_const<Worker*>> pathStatusInserterWorkers_;
0321 std::vector<edm::propagate_const<Worker*>> endPathStatusInserterWorkers_;
0322
0323 TrigPaths trig_paths_;
0324 TrigPaths end_paths_;
0325 std::vector<int> empty_trig_paths_;
0326 std::vector<int> empty_end_paths_;
0327
0328
0329
0330
0331 std::vector<BranchToCount> earlyDeleteBranchToCount_;
0332
0333
0334
0335
0336
0337
0338 std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
0339
0340
0341 std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
0342
0343 int total_events_;
0344 int total_passed_;
0345 unsigned int number_of_unscheduled_modules_;
0346
0347 StreamID streamID_;
0348 StreamContext streamContext_;
0349 };
0350
0351 void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
0352 Service<JobReport> reportSvc;
0353 reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
0354 }
0355
0356 template <typename T>
0357 void StreamSchedule::processOneStreamAsync(WaitingTaskHolder iHolder,
0358 typename T::TransitionInfoType& transitionInfo,
0359 ServiceToken const& token,
0360 bool cleaningUpAfterException) {
0361 auto group = iHolder.group();
0362 auto const& principal = transitionInfo.principal();
0363 T::setStreamContext(streamContext_, principal);
0364
0365 ServiceWeakToken weakToken = token;
0366 auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
0367 std::exception_ptr const* iPtr) mutable {
0368 std::exception_ptr excpt;
0369 {
0370 ServiceRegistry::Operate op(weakToken.lock());
0371
0372 if (iPtr) {
0373 excpt = *iPtr;
0374 handleException(streamContext_, cleaningUpAfterException, excpt);
0375 }
0376 postScheduleSignal<T>(&streamContext_, excpt);
0377 }
0378 iHolder.doneWaiting(excpt);
0379 });
0380
0381 auto task =
0382 make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
0383 auto token = weakToken.lock();
0384 ServiceRegistry::Operate op(token);
0385
0386 WorkerManager* workerManager = &workerManagerRuns_;
0387 if (T::branchType_ == InLumi) {
0388 workerManager = &workerManagerLumisAndEvents_;
0389 }
0390 CMS_SA_ALLOW try {
0391 preScheduleSignal<T>(&streamContext_);
0392 workerManager->resetAll();
0393 } catch (...) {
0394
0395
0396
0397 h.presetTaskAsFailed(std::current_exception());
0398 return;
0399 }
0400
0401 workerManager->processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
0402 });
0403
0404 if (streamID_.value() == 0) {
0405
0406
0407
0408 group->run([task]() {
0409 TaskSentry s{task};
0410 task->execute();
0411 });
0412 } else {
0413 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
0414 arena.enqueue([task]() {
0415 TaskSentry s{task};
0416 task->execute();
0417 });
0418 }
0419 }
0420
0421 template <typename T>
0422 void StreamSchedule::preScheduleSignal(StreamContext const* streamContext) const {
0423 try {
0424 convertException::wrap([this, streamContext]() { T::preScheduleSignal(actReg_.get(), streamContext); });
0425 } catch (cms::Exception& ex) {
0426 std::ostringstream ost;
0427 ex.addContext("Handling pre signal, likely in a service function");
0428 exceptionContext(ost, *streamContext);
0429 ex.addContext(ost.str());
0430 throw;
0431 }
0432 }
0433
0434 template <typename T>
0435 void StreamSchedule::postScheduleSignal(StreamContext const* streamContext,
0436 std::exception_ptr& excpt) const noexcept {
0437 try {
0438 convertException::wrap([this, streamContext]() { T::postScheduleSignal(actReg_.get(), streamContext); });
0439 } catch (cms::Exception& ex) {
0440 if (not excpt) {
0441 std::ostringstream ost;
0442 ex.addContext("Handling post signal, likely in a service function");
0443 exceptionContext(ost, *streamContext);
0444 ex.addContext(ost.str());
0445 excpt = std::current_exception();
0446 }
0447 }
0448 }
0449 }
0450
0451 #endif