File indexing completed on 2024-06-13 03:23:42
0001 #ifndef FWCore_Framework_StreamSchedule_h
0002 #define FWCore_Framework_StreamSchedule_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
0061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/EventPrincipal.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0068 #include "FWCore/Framework/interface/WorkerManager.h"
0069 #include "FWCore/Framework/interface/Path.h"
0070 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0071 #include "FWCore/Framework/interface/maker/Worker.h"
0072 #include "FWCore/Framework/interface/WorkerRegistry.h"
0073 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0074 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0075 #include "FWCore/MessageLogger/interface/JobReport.h"
0076 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0077 #include "FWCore/ServiceRegistry/interface/Service.h"
0078 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0079 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0080 #include "FWCore/Concurrency/interface/FunctorTask.h"
0081 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0082 #include "FWCore/Utilities/interface/Algorithms.h"
0083 #include "FWCore/Utilities/interface/BranchType.h"
0084 #include "FWCore/Utilities/interface/ConvertException.h"
0085 #include "FWCore/Utilities/interface/Exception.h"
0086 #include "FWCore/Utilities/interface/StreamID.h"
0087 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0088 #include "FWCore/Utilities/interface/propagate_const.h"
0089 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0090
0091 #include <map>
0092 #include <memory>
0093 #include <set>
0094 #include <string>
0095 #include <vector>
0096 #include <sstream>
0097 #include <atomic>
0098 #include <unordered_set>
0099 #include <utility>
0100
0101 namespace edm {
0102
0103 class BranchIDListHelper;
0104 class ExceptionCollector;
0105 class ExceptionToActionTable;
0106 class OutputModuleCommunicator;
0107 class UnscheduledCallProducer;
0108 class WorkerInPath;
0109 class ModuleRegistry;
0110 class TriggerResultInserter;
0111 class PathStatusInserter;
0112 class EndPathStatusInserter;
0113 class PreallocationConfiguration;
0114 class WaitingTaskHolder;
0115
0116 class ConditionalTaskHelper;
0117
0118 namespace service {
0119 class TriggerNamesService;
0120 }
0121
0122 class StreamSchedule {
0123 public:
0124 typedef std::vector<std::string> vstring;
0125 typedef std::vector<Path> TrigPaths;
0126 typedef std::vector<Path> NonTrigPaths;
0127 typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
0128 typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
0129 typedef std::shared_ptr<Worker> WorkerPtr;
0130 typedef std::vector<Worker*> AllWorkers;
0131
0132 typedef std::vector<Worker*> Workers;
0133
0134 typedef std::vector<WorkerInPath> PathWorkers;
0135
0136 StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0137 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0138 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0139 std::shared_ptr<ModuleRegistry>,
0140 ParameterSet& proc_pset,
0141 service::TriggerNamesService const& tns,
0142 PreallocationConfiguration const& prealloc,
0143 ProductRegistry& pregistry,
0144 ExceptionToActionTable const& actions,
0145 std::shared_ptr<ActivityRegistry> areg,
0146 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0147 StreamID streamID,
0148 ProcessContext const* processContext);
0149
0150 StreamSchedule(StreamSchedule const&) = delete;
0151
0152 void processOneEventAsync(
0153 WaitingTaskHolder iTask,
0154 EventTransitionInfo&,
0155 ServiceToken const& token,
0156 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
0157
0158 template <typename T>
0159 void processOneStreamAsync(WaitingTaskHolder iTask,
0160 typename T::TransitionInfoType& transitionInfo,
0161 ServiceToken const& token,
0162 bool cleaningUpAfterException = false);
0163
0164 void beginStream();
0165 void endStream();
0166
0167 StreamID streamID() const { return streamID_; }
0168
0169
0170
0171
0172
0173
0174
0175 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0176
0177
0178 void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0179
0180
0181 void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0182
0183 void moduleDescriptionsInPath(std::string const& iPathLabel,
0184 std::vector<ModuleDescription const*>& descriptions,
0185 unsigned int hint) const;
0186
0187 void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0188 std::vector<ModuleDescription const*>& descriptions,
0189 unsigned int hint) const;
0190
0191
0192
0193
0194 int totalEvents() const { return total_events_; }
0195
0196
0197
0198 int totalEventsPassed() const { return total_passed_; }
0199
0200
0201
0202 int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
0203
0204
0205
0206 void getTriggerReport(TriggerReport& rep) const;
0207
0208
0209 void clearCounters();
0210
0211
0212 void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0213
0214
0215 void deleteModule(std::string const& iLabel);
0216
0217 void initializeEarlyDelete(ModuleRegistry& modReg,
0218 std::vector<std::string> const& branchesToDeleteEarly,
0219 std::multimap<std::string, std::string> const& referencesToBranches,
0220 std::vector<std::string> const& modulesToSkip,
0221 edm::ProductRegistry const& preg);
0222
0223
0224 AllWorkers const& allWorkersBeginEnd() const { return workerManagerBeginEnd_.allWorkers(); }
0225 AllWorkers const& allWorkersRuns() const { return workerManagerRuns_.allWorkers(); }
0226 AllWorkers const& allWorkersLumisAndEvents() const { return workerManagerLumisAndEvents_.allWorkers(); }
0227
0228 AllWorkers const& unscheduledWorkersLumisAndEvents() const {
0229 return workerManagerLumisAndEvents_.unscheduledWorkers();
0230 }
0231 unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }
0232
0233 StreamContext const& context() const { return streamContext_; }
0234
0235 struct AliasInfo {
0236 std::string friendlyClassName;
0237 std::string instanceLabel;
0238 std::string originalInstanceLabel;
0239 std::string originalModuleLabel;
0240 };
0241
0242 private:
0243
0244 ExceptionToActionTable const& actionTable() const { return workerManagerLumisAndEvents_.actionTable(); }
0245
0246 void resetAll();
0247
0248 void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
0249
0250 std::exception_ptr finishProcessOneEvent(std::exception_ptr);
0251
0252 void reportSkipped(EventPrincipal const& ep) const;
0253
0254 std::vector<Worker*> tryToPlaceConditionalModules(
0255 Worker*,
0256 std::unordered_set<std::string>& conditionalModules,
0257 std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0258 std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0259 ParameterSet& proc_pset,
0260 ProductRegistry& preg,
0261 PreallocationConfiguration const* prealloc,
0262 std::shared_ptr<ProcessConfiguration const> processConfiguration);
0263 void fillWorkers(ParameterSet& proc_pset,
0264 ProductRegistry& preg,
0265 PreallocationConfiguration const* prealloc,
0266 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0267 std::string const& name,
0268 bool ignoreFilters,
0269 PathWorkers& out,
0270 std::vector<std::string> const& endPathNames,
0271 ConditionalTaskHelper const& conditionalTaskHelper,
0272 std::unordered_set<std::string>& allConditionalModules);
0273 void fillTrigPath(ParameterSet& proc_pset,
0274 ProductRegistry& preg,
0275 PreallocationConfiguration const* prealloc,
0276 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0277 int bitpos,
0278 std::string const& name,
0279 TrigResPtr,
0280 std::vector<std::string> const& endPathNames,
0281 ConditionalTaskHelper const& conditionalTaskHelper,
0282 std::unordered_set<std::string>& allConditionalModules);
0283 void fillEndPath(ParameterSet& proc_pset,
0284 ProductRegistry& preg,
0285 PreallocationConfiguration const* prealloc,
0286 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0287 int bitpos,
0288 std::string const& name,
0289 std::vector<std::string> const& endPathNames,
0290 ConditionalTaskHelper const& conditionalTaskHelper,
0291 std::unordered_set<std::string>& allConditionalModules);
0292
0293 void addToAllWorkers(Worker* w);
0294
0295 void resetEarlyDelete();
0296
0297 TrigResConstPtr results() const { return get_underlying_safe(results_); }
0298 TrigResPtr& results() { return get_underlying_safe(results_); }
0299
0300 void makePathStatusInserters(
0301 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0302 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0303 ExceptionToActionTable const& actions);
0304
0305 template <typename T>
0306 void preScheduleSignal(StreamContext const*) const;
0307
0308 template <typename T>
0309 void postScheduleSignal(StreamContext const*, ServiceWeakToken const&, std::exception_ptr&) const noexcept;
0310
0311 void handleException(StreamContext const&,
0312 ServiceWeakToken const&,
0313 bool cleaningUpAfterException,
0314 std::exception_ptr&) const noexcept;
0315
0316 WorkerManager workerManagerBeginEnd_;
0317 WorkerManager workerManagerRuns_;
0318 WorkerManager workerManagerLumisAndEvents_;
0319 std::shared_ptr<ActivityRegistry> actReg_;
0320
0321 edm::propagate_const<TrigResPtr> results_;
0322
0323 edm::propagate_const<WorkerPtr> results_inserter_;
0324 std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
0325 std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
0326
0327 TrigPaths trig_paths_;
0328 TrigPaths end_paths_;
0329 std::vector<int> empty_trig_paths_;
0330 std::vector<int> empty_end_paths_;
0331
0332
0333
0334
0335 std::vector<BranchToCount> earlyDeleteBranchToCount_;
0336
0337
0338
0339
0340
0341
0342 std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
0343
0344
0345 std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
0346
0347 int total_events_;
0348 int total_passed_;
0349 unsigned int number_of_unscheduled_modules_;
0350
0351 StreamID streamID_;
0352 StreamContext streamContext_;
0353 };
0354
0355 void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
0356 Service<JobReport> reportSvc;
0357 reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
0358 }
0359
0360 template <typename T>
0361 void StreamSchedule::processOneStreamAsync(WaitingTaskHolder iHolder,
0362 typename T::TransitionInfoType& transitionInfo,
0363 ServiceToken const& token,
0364 bool cleaningUpAfterException) {
0365 auto group = iHolder.group();
0366 auto const& principal = transitionInfo.principal();
0367 T::setStreamContext(streamContext_, principal);
0368
0369 ServiceWeakToken weakToken = token;
0370 auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
0371 std::exception_ptr const* iPtr) mutable {
0372 std::exception_ptr excpt;
0373 if (iPtr) {
0374 excpt = *iPtr;
0375 handleException(streamContext_, weakToken, cleaningUpAfterException, excpt);
0376 }
0377 postScheduleSignal<T>(&streamContext_, weakToken, excpt);
0378 iHolder.doneWaiting(excpt);
0379 });
0380
0381 auto task =
0382 make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
0383 auto token = weakToken.lock();
0384 ServiceRegistry::Operate op(token);
0385
0386 WorkerManager* workerManager = &workerManagerRuns_;
0387 if (T::branchType_ == InLumi) {
0388 workerManager = &workerManagerLumisAndEvents_;
0389 }
0390 CMS_SA_ALLOW try {
0391 preScheduleSignal<T>(&streamContext_);
0392 workerManager->resetAll();
0393 } catch (...) {
0394 h.doneWaiting(std::current_exception());
0395 return;
0396 }
0397
0398 workerManager->processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
0399 });
0400
0401 if (streamID_.value() == 0) {
0402
0403
0404
0405 group->run([task]() {
0406 TaskSentry s{task};
0407 task->execute();
0408 });
0409 } else {
0410 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
0411 arena.enqueue([task]() {
0412 TaskSentry s{task};
0413 task->execute();
0414 });
0415 }
0416 }
0417
0418 template <typename T>
0419 void StreamSchedule::preScheduleSignal(StreamContext const* streamContext) const {
0420 try {
0421 convertException::wrap([this, streamContext]() { T::preScheduleSignal(actReg_.get(), streamContext); });
0422 } catch (cms::Exception& ex) {
0423 std::ostringstream ost;
0424 ex.addContext("Handling pre signal, likely in a service function");
0425 exceptionContext(ost, *streamContext);
0426 ex.addContext(ost.str());
0427 throw;
0428 }
0429 }
0430
0431 template <typename T>
0432 void StreamSchedule::postScheduleSignal(StreamContext const* streamContext,
0433 ServiceWeakToken const& weakToken,
0434 std::exception_ptr& excpt) const noexcept {
0435 try {
0436 convertException::wrap([this, &weakToken, streamContext]() {
0437 ServiceRegistry::Operate op(weakToken.lock());
0438 T::postScheduleSignal(actReg_.get(), streamContext);
0439 });
0440 } catch (cms::Exception& ex) {
0441 if (not excpt) {
0442 std::ostringstream ost;
0443 ex.addContext("Handling post signal, likely in a service function");
0444 exceptionContext(ost, *streamContext);
0445 ex.addContext(ost.str());
0446 excpt = std::current_exception();
0447 }
0448 }
0449 }
0450 }
0451
0452 #endif