File indexing completed on 2024-04-06 12:11:55
0001 #ifndef FWCore_Concurrency_SerialTaskQueueChain_h
0002 #define FWCore_Concurrency_SerialTaskQueueChain_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022 #include <cassert>
0023 #include <memory>
0024 #include <vector>
0025
0026
0027 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0028 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0029
0030
0031 namespace edm {
0032 class SerialTaskQueueChain {
0033 public:
0034 SerialTaskQueueChain() {}
0035 explicit SerialTaskQueueChain(std::vector<std::shared_ptr<SerialTaskQueue>> iQueues)
0036 : m_queues(std::move(iQueues)) {}
0037
0038 SerialTaskQueueChain(const SerialTaskQueueChain&) = delete;
0039 SerialTaskQueueChain& operator=(const SerialTaskQueueChain&) = delete;
0040 SerialTaskQueueChain(SerialTaskQueueChain&& iOld)
0041 : m_queues(std::move(iOld.m_queues)), m_outstandingTasks{iOld.m_outstandingTasks.load()} {}
0042
0043 SerialTaskQueueChain& operator=(SerialTaskQueueChain&& iOld) {
0044 m_queues = std::move(iOld.m_queues);
0045 m_outstandingTasks.store(iOld.m_outstandingTasks.load());
0046 return *this;
0047 }
0048
0049
0050
0051
0052
0053
0054
0055
0056 template <typename T>
0057 void push(oneapi::tbb::task_group& iGroup, T&& iAction);
0058
0059 unsigned long outstandingTasks() const { return m_outstandingTasks; }
0060 std::size_t numberOfQueues() const { return m_queues.size(); }
0061
0062 private:
0063
0064 std::vector<std::shared_ptr<SerialTaskQueue>> m_queues;
0065 std::atomic<unsigned long> m_outstandingTasks{0};
0066
0067 template <typename T>
0068 void passDownChain(unsigned int iIndex, oneapi::tbb::task_group& iGroup, T&& iAction);
0069
0070 template <typename T>
0071 void actionToRun(T&& iAction);
0072 };
0073
0074 template <typename T>
0075 void SerialTaskQueueChain::push(oneapi::tbb::task_group& iGroup, T&& iAction) {
0076 ++m_outstandingTasks;
0077 if (m_queues.size() == 1) {
0078 m_queues[0]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
0079 } else {
0080 assert(!m_queues.empty());
0081 m_queues[0]->push(iGroup, [this, &iGroup, iAction]() mutable { this->passDownChain(1, iGroup, iAction); });
0082 }
0083 }
0084
0085 template <typename T>
0086 void SerialTaskQueueChain::passDownChain(unsigned int iQueueIndex, oneapi::tbb::task_group& iGroup, T&& iAction) {
0087
0088
0089 m_queues[iQueueIndex - 1]->pause();
0090
0091 if (iQueueIndex + 1 == m_queues.size()) {
0092 m_queues[iQueueIndex]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
0093 } else {
0094 auto nextQueue = iQueueIndex + 1;
0095 m_queues[iQueueIndex]->push(
0096 iGroup, [this, nextQueue, &iGroup, iAction]() mutable { this->passDownChain(nextQueue, iGroup, iAction); });
0097 }
0098 }
0099
0100 template <typename T>
0101 void SerialTaskQueueChain::actionToRun(T&& iAction) {
0102
0103 using Queues = std::vector<std::shared_ptr<SerialTaskQueue>>;
0104 auto sentryAction = [](SerialTaskQueueChain* iChain) {
0105 auto& vec = iChain->m_queues;
0106 for (auto it = vec.rbegin() + 1; it != vec.rend(); ++it) {
0107 (*it)->resume();
0108 }
0109 --(iChain->m_outstandingTasks);
0110 };
0111
0112 std::unique_ptr<SerialTaskQueueChain, decltype(sentryAction)> sentry(this, sentryAction);
0113 iAction();
0114 }
0115 }
0116
0117 #endif