File indexing completed on 2025-07-17 22:22:59
0001
0002
0003
0004
0005
0006
0007
0008 #include <iostream>
0009 #include <catch.hpp>
0010 #include <chrono>
0011 #include <memory>
0012 #include <atomic>
0013 #include <thread>
0014 #include "oneapi/tbb/task_arena.h"
0015 #include "FWCore/Concurrency/interface/WaitingTask.h"
0016 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0017 #include "FWCore/Concurrency/interface/FunctorTask.h"
0018
0019 using namespace std::chrono_literals;
0020
0021 TEST_CASE("LimitedTaskQueue", "[LimitedTaskQueue]") {
0022 SECTION("push") {
0023 {
0024 std::atomic<unsigned int> count{0};
0025 edm::LimitedTaskQueue queue{1};
0026 {
0027 std::atomic<int> waitingTasks{3};
0028 oneapi::tbb::task_group group;
0029 queue.push(group, [&count, &waitingTasks] {
0030 REQUIRE(count++ == 0u);
0031 std::this_thread::sleep_for(10us);
0032 --waitingTasks;
0033 });
0034 queue.push(group, [&count, &waitingTasks] {
0035 REQUIRE(count++ == 1u);
0036 std::this_thread::sleep_for(10us);
0037 --waitingTasks;
0038 });
0039 queue.push(group, [&count, &waitingTasks] {
0040 REQUIRE(count++ == 2u);
0041 std::this_thread::sleep_for(10us);
0042 --waitingTasks;
0043 });
0044 do {
0045 group.wait();
0046 } while (0 != waitingTasks.load());
0047 REQUIRE(count == 3u);
0048 }
0049 }
0050 {
0051 std::atomic<unsigned int> count{0};
0052 constexpr unsigned int kMax = 2;
0053 edm::LimitedTaskQueue queue{kMax};
0054 {
0055 std::atomic<int> waitingTasks{3};
0056 oneapi::tbb::task_group group;
0057 queue.push(group, [&count, &waitingTasks, kMax] {
0058 REQUIRE(count++ < kMax);
0059 std::this_thread::sleep_for(10us);
0060 --count;
0061 --waitingTasks;
0062 });
0063 queue.push(group, [&count, &waitingTasks, kMax] {
0064 REQUIRE(count++ < kMax);
0065 std::this_thread::sleep_for(10us);
0066 --count;
0067 --waitingTasks;
0068 });
0069 queue.push(group, [&count, &waitingTasks, kMax] {
0070 REQUIRE(count++ < kMax);
0071 std::this_thread::sleep_for(10us);
0072 --count;
0073 --waitingTasks;
0074 });
0075 do {
0076 group.wait();
0077 } while (0 != waitingTasks);
0078 REQUIRE(count == 0u);
0079 }
0080 }
0081 }
0082
0083 SECTION("pause") {
0084 std::atomic<unsigned int> count{0};
0085 edm::LimitedTaskQueue queue{1};
0086 {
0087 {
0088 std::atomic<int> waitingTasks{3};
0089 oneapi::tbb::task_group group;
0090 edm::LimitedTaskQueue::Resumer resumer;
0091 std::atomic<bool> resumerSet{false};
0092 std::exception_ptr e1;
0093 queue.pushAndPause(
0094 group, [&resumer, &resumerSet, &count, &waitingTasks, &e1](edm::LimitedTaskQueue::Resumer iResumer) {
0095 resumer = std::move(iResumer);
0096 resumerSet = true;
0097 try {
0098 REQUIRE(++count == 1u);
0099 } catch (...) {
0100 e1 = std::current_exception();
0101 }
0102 --waitingTasks;
0103 });
0104 std::exception_ptr e2;
0105 queue.push(group, [&count, &waitingTasks, &e2] {
0106 try {
0107 REQUIRE(++count == 2u);
0108 } catch (...) {
0109 e2 = std::current_exception();
0110 }
0111 --waitingTasks;
0112 });
0113 std::exception_ptr e3;
0114 queue.push(group, [&count, &waitingTasks, &e3] {
0115 try {
0116 REQUIRE(++count == 3u);
0117 } catch (...) {
0118 e3 = std::current_exception();
0119 }
0120 --waitingTasks;
0121 });
0122 std::this_thread::sleep_for(100us);
0123 REQUIRE(2u >= count);
0124 while (not resumerSet) {
0125 }
0126 REQUIRE(resumer.resume());
0127 do {
0128 group.wait();
0129 } while (0 != waitingTasks.load());
0130 REQUIRE(count == 3u);
0131 if (e1) {
0132 std::rethrow_exception(e1);
0133 }
0134 if (e2) {
0135 std::rethrow_exception(e2);
0136 }
0137 if (e3) {
0138 std::rethrow_exception(e3);
0139 }
0140 }
0141 }
0142 }
0143
0144 SECTION("stress test") {
0145 oneapi::tbb::task_group group;
0146 constexpr unsigned int kMax = 3;
0147 edm::LimitedTaskQueue queue{kMax};
0148 unsigned int index = 100;
0149 const unsigned int nTasks = 1000;
0150
0151 std::mutex mutex;
0152 while (0 != --index) {
0153 std::atomic<int> waiting{1};
0154 std::atomic<unsigned int> count{0};
0155 std::atomic<unsigned int> nRunningTasks{0};
0156 std::atomic<bool> waitToStart{true};
0157 {
0158 group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks, &mutex, kMax] {
0159 while (waitToStart) {
0160 }
0161 for (unsigned int i = 0; i < nTasks; ++i) {
0162 ++waiting;
0163 queue.push(group, [&count, &waiting, &nRunningTasks, &mutex, kMax] {
0164 std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0165 auto nrt = nRunningTasks++;
0166 if (nrt >= kMax) {
0167 std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0168 std::lock_guard lock{mutex};
0169 REQUIRE(nrt < kMax);
0170 }
0171 ++count;
0172 --nRunningTasks;
0173 });
0174 }
0175 });
0176 group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks, &mutex, kMax] {
0177 waitToStart = false;
0178 for (unsigned int i = 0; i < nTasks; ++i) {
0179 ++waiting;
0180 queue.push(group, [&count, &waiting, &nRunningTasks, &mutex, kMax] {
0181 std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0182 auto nrt = nRunningTasks++;
0183 if (nrt >= kMax) {
0184 std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0185 std::lock_guard lock{mutex};
0186 REQUIRE(nrt < kMax);
0187 }
0188 ++count;
0189 --nRunningTasks;
0190 });
0191 }
0192 --waiting;
0193 });
0194 }
0195 do {
0196 group.wait();
0197 } while (0 != waiting.load());
0198 REQUIRE(nRunningTasks == 0u);
0199 REQUIRE(2 * nTasks == count);
0200 }
0201 }
0202 }