File indexing completed on 2022-10-19 23:19:14
0001 #ifndef FWCore_Framework_StreamSchedule_h
0002 #define FWCore_Framework_StreamSchedule_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
0061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/EventPrincipal.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0068 #include "FWCore/Framework/interface/WorkerManager.h"
0069 #include "FWCore/Framework/interface/Path.h"
0070 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0071 #include "FWCore/Framework/interface/maker/Worker.h"
0072 #include "FWCore/Framework/interface/WorkerRegistry.h"
0073 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0074 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0075 #include "FWCore/MessageLogger/interface/JobReport.h"
0076 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0077 #include "FWCore/ServiceRegistry/interface/Service.h"
0078 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0079 #include "FWCore/Concurrency/interface/FunctorTask.h"
0080 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0081 #include "FWCore/Utilities/interface/Algorithms.h"
0082 #include "FWCore/Utilities/interface/BranchType.h"
0083 #include "FWCore/Utilities/interface/ConvertException.h"
0084 #include "FWCore/Utilities/interface/Exception.h"
0085 #include "FWCore/Utilities/interface/StreamID.h"
0086 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0087 #include "FWCore/Utilities/interface/propagate_const.h"
0088 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0089
0090 #include <map>
0091 #include <memory>
0092 #include <set>
0093 #include <string>
0094 #include <vector>
0095 #include <sstream>
0096 #include <atomic>
0097 #include <unordered_set>
0098
0099 namespace edm {
0100
0101 class ActivityRegistry;
0102 class BranchIDListHelper;
0103 class ExceptionCollector;
0104 class ExceptionToActionTable;
0105 class OutputModuleCommunicator;
0106 class ProcessContext;
0107 class UnscheduledCallProducer;
0108 class WorkerInPath;
0109 class ModuleRegistry;
0110 class TriggerResultInserter;
0111 class PathStatusInserter;
0112 class EndPathStatusInserter;
0113 class PreallocationConfiguration;
0114 class WaitingTaskHolder;
0115
0116 class ConditionalTaskHelper;
0117
0118 namespace service {
0119 class TriggerNamesService;
0120 }
0121
0122 namespace {
0123 template <typename T>
0124 class StreamScheduleSignalSentry {
0125 public:
0126 StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
0127 : a_(a), context_(context), allowThrow_(false) {
0128 if (a_)
0129 T::preScheduleSignal(a_, context_);
0130 }
0131 ~StreamScheduleSignalSentry() noexcept(false) {
0132
0133 CMS_SA_ALLOW try {
0134 if (a_) {
0135 T::postScheduleSignal(a_, context_);
0136 }
0137 } catch (...) {
0138 if (allowThrow_) {
0139 throw;
0140 }
0141 }
0142 }
0143
0144 void allowThrow() { allowThrow_ = true; }
0145
0146 private:
0147
0148 ActivityRegistry* a_;
0149 typename T::Context const* context_;
0150 bool allowThrow_;
0151 };
0152 }
0153
0154 class StreamSchedule {
0155 public:
0156 typedef std::vector<std::string> vstring;
0157 typedef std::vector<Path> TrigPaths;
0158 typedef std::vector<Path> NonTrigPaths;
0159 typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
0160 typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
0161 typedef std::shared_ptr<Worker> WorkerPtr;
0162 typedef std::vector<Worker*> AllWorkers;
0163
0164 typedef std::vector<Worker*> Workers;
0165
0166 typedef std::vector<WorkerInPath> PathWorkers;
0167
0168 StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0169 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0170 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0171 std::shared_ptr<ModuleRegistry>,
0172 ParameterSet& proc_pset,
0173 service::TriggerNamesService const& tns,
0174 PreallocationConfiguration const& prealloc,
0175 ProductRegistry& pregistry,
0176 ExceptionToActionTable const& actions,
0177 std::shared_ptr<ActivityRegistry> areg,
0178 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0179 StreamID streamID,
0180 ProcessContext const* processContext);
0181
0182 StreamSchedule(StreamSchedule const&) = delete;
0183
0184 void processOneEventAsync(
0185 WaitingTaskHolder iTask,
0186 EventTransitionInfo&,
0187 ServiceToken const& token,
0188 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
0189
0190 template <typename T>
0191 void processOneStreamAsync(WaitingTaskHolder iTask,
0192 typename T::TransitionInfoType& transitionInfo,
0193 ServiceToken const& token,
0194 bool cleaningUpAfterException = false);
0195
0196 void beginStream();
0197 void endStream();
0198
0199 StreamID streamID() const { return streamID_; }
0200
0201
0202
0203
0204
0205
0206
0207 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0208
0209
0210 void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0211
0212
0213 void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0214
0215 void moduleDescriptionsInPath(std::string const& iPathLabel,
0216 std::vector<ModuleDescription const*>& descriptions,
0217 unsigned int hint) const;
0218
0219 void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0220 std::vector<ModuleDescription const*>& descriptions,
0221 unsigned int hint) const;
0222
0223
0224
0225
0226 int totalEvents() const { return total_events_; }
0227
0228
0229
0230 int totalEventsPassed() const { return total_passed_; }
0231
0232
0233
0234 int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
0235
0236
0237
0238 void getTriggerReport(TriggerReport& rep) const;
0239
0240
0241 void clearCounters();
0242
0243
0244 void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0245
0246
0247 void deleteModule(std::string const& iLabel);
0248
0249 void initializeEarlyDelete(ModuleRegistry& modReg,
0250 std::vector<std::string> const& branchesToDeleteEarly,
0251 std::multimap<std::string, std::string> const& referencesToBranches,
0252 std::vector<std::string> const& modulesToSkip,
0253 edm::ProductRegistry const& preg);
0254
0255
0256 AllWorkers const& allWorkers() const { return workerManager_.allWorkers(); }
0257
0258 AllWorkers const& unscheduledWorkers() const { return workerManager_.unscheduledWorkers(); }
0259 unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }
0260
0261 StreamContext const& context() const { return streamContext_; }
0262
0263 struct AliasInfo {
0264 std::string friendlyClassName;
0265 std::string instanceLabel;
0266 std::string originalInstanceLabel;
0267 std::string originalModuleLabel;
0268 };
0269
0270 private:
0271
0272
0273
0274
0275 class SendTerminationSignalIfException {
0276 public:
0277 SendTerminationSignalIfException(edm::ActivityRegistry* iReg, edm::StreamContext const* iContext)
0278 : reg_(iReg), context_(iContext) {}
0279 ~SendTerminationSignalIfException() {
0280 if (reg_) {
0281 reg_->preStreamEarlyTerminationSignal_(*context_, TerminationOrigin::ExceptionFromThisContext);
0282 }
0283 }
0284 void completedSuccessfully() { reg_ = nullptr; }
0285
0286 private:
0287 edm::ActivityRegistry* reg_;
0288 StreamContext const* context_;
0289 };
0290
0291
0292 ExceptionToActionTable const& actionTable() const { return workerManager_.actionTable(); }
0293
0294 void resetAll();
0295
0296 void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
0297
0298 std::exception_ptr finishProcessOneEvent(std::exception_ptr);
0299
0300 void reportSkipped(EventPrincipal const& ep) const;
0301
0302 std::vector<Worker*> tryToPlaceConditionalModules(
0303 Worker*,
0304 std::unordered_set<std::string>& conditionalModules,
0305 std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0306 std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0307 ParameterSet& proc_pset,
0308 ProductRegistry& preg,
0309 PreallocationConfiguration const* prealloc,
0310 std::shared_ptr<ProcessConfiguration const> processConfiguration);
0311 void fillWorkers(ParameterSet& proc_pset,
0312 ProductRegistry& preg,
0313 PreallocationConfiguration const* prealloc,
0314 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0315 std::string const& name,
0316 bool ignoreFilters,
0317 PathWorkers& out,
0318 std::vector<std::string> const& endPathNames,
0319 ConditionalTaskHelper const& conditionalTaskHelper);
0320 void fillTrigPath(ParameterSet& proc_pset,
0321 ProductRegistry& preg,
0322 PreallocationConfiguration const* prealloc,
0323 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0324 int bitpos,
0325 std::string const& name,
0326 TrigResPtr,
0327 std::vector<std::string> const& endPathNames,
0328 ConditionalTaskHelper const& conditionalTaskHelper);
0329 void fillEndPath(ParameterSet& proc_pset,
0330 ProductRegistry& preg,
0331 PreallocationConfiguration const* prealloc,
0332 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0333 int bitpos,
0334 std::string const& name,
0335 std::vector<std::string> const& endPathNames,
0336 ConditionalTaskHelper const& conditionalTaskHelper);
0337
0338 void addToAllWorkers(Worker* w);
0339
0340 void resetEarlyDelete();
0341
0342 TrigResConstPtr results() const { return get_underlying_safe(results_); }
0343 TrigResPtr& results() { return get_underlying_safe(results_); }
0344
0345 void makePathStatusInserters(
0346 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0347 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0348 ExceptionToActionTable const& actions);
0349
0350 WorkerManager workerManager_;
0351 std::shared_ptr<ActivityRegistry> actReg_;
0352
0353 edm::propagate_const<TrigResPtr> results_;
0354
0355 edm::propagate_const<WorkerPtr> results_inserter_;
0356 std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
0357 std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
0358
0359 TrigPaths trig_paths_;
0360 TrigPaths end_paths_;
0361 std::vector<int> empty_trig_paths_;
0362 std::vector<int> empty_end_paths_;
0363
0364
0365
0366
0367 std::vector<BranchToCount> earlyDeleteBranchToCount_;
0368
0369
0370
0371
0372
0373
0374 std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
0375
0376
0377 std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
0378
0379 int total_events_;
0380 int total_passed_;
0381 unsigned int number_of_unscheduled_modules_;
0382
0383 StreamID streamID_;
0384 StreamContext streamContext_;
0385 std::atomic<bool> skippingEvent_;
0386 };
0387
0388 void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
0389 Service<JobReport> reportSvc;
0390 reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
0391 }
0392
0393 template <typename T>
0394 void StreamSchedule::processOneStreamAsync(WaitingTaskHolder iHolder,
0395 typename T::TransitionInfoType& transitionInfo,
0396 ServiceToken const& token,
0397 bool cleaningUpAfterException) {
0398 auto const& principal = transitionInfo.principal();
0399 T::setStreamContext(streamContext_, principal);
0400
0401 auto id = principal.id();
0402 ServiceWeakToken weakToken = token;
0403 auto doneTask = make_waiting_task(
0404 [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
0405 std::exception_ptr excpt;
0406 if (iPtr) {
0407 excpt = *iPtr;
0408
0409 try {
0410 convertException::wrap([&]() { std::rethrow_exception(excpt); });
0411 } catch (cms::Exception& ex) {
0412
0413 std::ostringstream ost;
0414 if (ex.context().empty()) {
0415 ost << "Processing " << T::transitionName() << " " << id;
0416 }
0417 ServiceRegistry::Operate op(weakToken.lock());
0418 addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0419 excpt = std::current_exception();
0420 }
0421
0422 ServiceRegistry::Operate op(weakToken.lock());
0423 actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
0424 }
0425
0426 CMS_SA_ALLOW try {
0427 ServiceRegistry::Operate op(weakToken.lock());
0428 T::postScheduleSignal(actReg_.get(), &streamContext_);
0429 } catch (...) {
0430 if (not excpt) {
0431 excpt = std::current_exception();
0432 }
0433 }
0434 iHolder.doneWaiting(excpt);
0435 });
0436
0437 auto task = make_functor_task(
0438 [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
0439 auto token = weakToken.lock();
0440 ServiceRegistry::Operate op(token);
0441
0442 CMS_SA_ALLOW try {
0443 T::preScheduleSignal(actReg_.get(), &streamContext_);
0444
0445 workerManager_.resetAll();
0446 } catch (...) {
0447 h.doneWaiting(std::current_exception());
0448 return;
0449 }
0450
0451 for (auto& p : end_paths_) {
0452 p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
0453 }
0454
0455 for (auto& p : trig_paths_) {
0456 p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
0457 }
0458
0459 workerManager_.processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
0460 });
0461
0462 if (streamID_.value() == 0) {
0463
0464
0465
0466 iHolder.group()->run([task]() {
0467 TaskSentry s{task};
0468 task->execute();
0469 });
0470 } else {
0471 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
0472 arena.enqueue([task]() {
0473 TaskSentry s{task};
0474 task->execute();
0475 });
0476 }
0477 }
0478 }
0479
0480 #endif