File indexing completed on 2024-04-06 12:11:55
0001 #ifndef FWCore_Concurrency_LimitedTaskQueue_h
0002 #define FWCore_Concurrency_LimitedTaskQueue_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028 #include <atomic>
0029 #include <vector>
0030 #include <memory>
0031
0032 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0033 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0034
0035
0036
0037
0038 namespace edm {
0039 class LimitedTaskQueue {
0040 public:
0041 LimitedTaskQueue(unsigned int iLimit) : m_queues{iLimit} {}
0042 LimitedTaskQueue(const LimitedTaskQueue&) = delete;
0043 const LimitedTaskQueue& operator=(const LimitedTaskQueue&) = delete;
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054 template <typename T>
0055 void push(oneapi::tbb::task_group& iGroup, T&& iAction);
0056
0057 class Resumer {
0058 public:
0059 friend class LimitedTaskQueue;
0060
0061 Resumer() = default;
0062 ~Resumer() { resume(); }
0063
0064 Resumer(Resumer&& iOther) : m_queue(iOther.m_queue) { iOther.m_queue = nullptr; }
0065
0066 Resumer(Resumer const& iOther) : m_queue(iOther.m_queue) {
0067 if (m_queue) {
0068 m_queue->pause();
0069 }
0070 }
0071
0072 Resumer& operator=(Resumer const& iOther) {
0073 auto t = iOther;
0074 return (*this = std::move(t));
0075 }
0076 Resumer& operator=(Resumer&& iOther) {
0077 if (m_queue) {
0078 m_queue->resume();
0079 }
0080 m_queue = iOther.m_queue;
0081 iOther.m_queue = nullptr;
0082 return *this;
0083 }
0084
0085 bool resume() {
0086 if (m_queue) {
0087 auto q = m_queue;
0088 m_queue = nullptr;
0089 return q->resume();
0090 }
0091 return false;
0092 }
0093
0094 private:
0095 Resumer(SerialTaskQueue* iQueue) : m_queue{iQueue} {}
0096 SerialTaskQueue* m_queue = nullptr;
0097 };
0098
0099
0100
0101
0102
0103
0104 template <typename T>
0105 void pushAndPause(oneapi::tbb::task_group& iGroup, T&& iAction);
0106
0107 unsigned int concurrencyLimit() const { return m_queues.size(); }
0108
0109 private:
0110
0111 std::vector<SerialTaskQueue> m_queues;
0112 };
0113
0114 template <typename T>
0115 void LimitedTaskQueue::push(oneapi::tbb::task_group& iGroup, T&& iAction) {
0116 auto set_to_run = std::make_shared<std::atomic<bool>>(false);
0117 for (auto& q : m_queues) {
0118 q.push(iGroup, [set_to_run, iAction]() mutable {
0119 bool expected = false;
0120 if (set_to_run->compare_exchange_strong(expected, true)) {
0121 iAction();
0122 }
0123 });
0124 }
0125 }
0126
0127 template <typename T>
0128 void LimitedTaskQueue::pushAndPause(oneapi::tbb::task_group& iGroup, T&& iAction) {
0129 auto set_to_run = std::make_shared<std::atomic<bool>>(false);
0130 for (auto& q : m_queues) {
0131 q.push(iGroup, [&q, set_to_run, iAction]() mutable {
0132 bool expected = false;
0133 if (set_to_run->compare_exchange_strong(expected, true)) {
0134 q.pause();
0135 iAction(Resumer(&q));
0136 }
0137 });
0138 }
0139 }
0140
0141 }
0142
0143 #endif