File indexing completed on 2024-10-03 05:26:59
0001
0002
0003
0004
0005
0006
0007
0008 #include <iostream>
0009
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <chrono>
0012 #include <memory>
0013 #include <atomic>
0014 #include "oneapi/tbb/task_arena.h"
0015 #include "FWCore/Concurrency/interface/WaitingTask.h"
0016 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0017 #include "FWCore/Concurrency/interface/FunctorTask.h"
0018
0019 class LimitedTaskQueue_test : public CppUnit::TestFixture {
0020 CPPUNIT_TEST_SUITE(LimitedTaskQueue_test);
0021 CPPUNIT_TEST(testPush);
0022 CPPUNIT_TEST(testPause);
0023 CPPUNIT_TEST(stressTest);
0024 CPPUNIT_TEST_SUITE_END();
0025
0026 public:
0027 void testPush();
0028 void testPause();
0029 void stressTest();
0030 void setUp() {}
0031 void tearDown() {}
0032 };
0033
0034 CPPUNIT_TEST_SUITE_REGISTRATION(LimitedTaskQueue_test);
0035 using namespace std::chrono_literals;
0036
0037 void LimitedTaskQueue_test::testPush() {
0038 {
0039 std::atomic<unsigned int> count{0};
0040
0041 edm::LimitedTaskQueue queue{1};
0042 {
0043 std::atomic<int> waitingTasks{3};
0044 oneapi::tbb::task_group group;
0045
0046 queue.push(group, [&count, &waitingTasks] {
0047 CPPUNIT_ASSERT(count++ == 0);
0048 std::this_thread::sleep_for(10us);
0049 --waitingTasks;
0050 });
0051
0052 queue.push(group, [&count, &waitingTasks] {
0053 CPPUNIT_ASSERT(count++ == 1);
0054 std::this_thread::sleep_for(10us);
0055 --waitingTasks;
0056 });
0057
0058 queue.push(group, [&count, &waitingTasks] {
0059 CPPUNIT_ASSERT(count++ == 2);
0060 std::this_thread::sleep_for(10us);
0061 --waitingTasks;
0062 });
0063
0064 do {
0065 group.wait();
0066 } while (0 != waitingTasks.load());
0067 CPPUNIT_ASSERT(count == 3);
0068 }
0069 }
0070
0071 {
0072 std::atomic<unsigned int> count{0};
0073
0074 constexpr unsigned int kMax = 2;
0075 edm::LimitedTaskQueue queue{kMax};
0076 {
0077 std::atomic<int> waitingTasks{3};
0078 oneapi::tbb::task_group group;
0079
0080 queue.push(group, [&count, &waitingTasks] {
0081 CPPUNIT_ASSERT(count++ < kMax);
0082 std::this_thread::sleep_for(10us);
0083 --count;
0084 --waitingTasks;
0085 });
0086
0087 queue.push(group, [&count, &waitingTasks] {
0088 CPPUNIT_ASSERT(count++ < kMax);
0089 std::this_thread::sleep_for(10us);
0090 --count;
0091 --waitingTasks;
0092 });
0093
0094 queue.push(group, [&count, &waitingTasks] {
0095 CPPUNIT_ASSERT(count++ < kMax);
0096 std::this_thread::sleep_for(10us);
0097 --count;
0098 --waitingTasks;
0099 });
0100
0101 do {
0102 group.wait();
0103 } while (0 != waitingTasks);
0104 CPPUNIT_ASSERT(count == 0);
0105 }
0106 }
0107 }
0108
0109 void LimitedTaskQueue_test::testPause() {
0110 std::atomic<unsigned int> count{0};
0111
0112 edm::LimitedTaskQueue queue{1};
0113 {
0114 {
0115 std::atomic<int> waitingTasks{3};
0116 oneapi::tbb::task_group group;
0117
0118 edm::LimitedTaskQueue::Resumer resumer;
0119 std::atomic<bool> resumerSet{false};
0120 std::exception_ptr e1;
0121 queue.pushAndPause(group,
0122 [&resumer, &resumerSet, &count, &waitingTasks, &e1](edm::LimitedTaskQueue::Resumer iResumer) {
0123 resumer = std::move(iResumer);
0124 resumerSet = true;
0125 try {
0126 CPPUNIT_ASSERT(++count == 1);
0127 } catch (...) {
0128 e1 = std::current_exception();
0129 }
0130 --waitingTasks;
0131 });
0132
0133 std::exception_ptr e2;
0134 queue.push(group, [&count, &waitingTasks, &e2] {
0135 try {
0136 CPPUNIT_ASSERT(++count == 2);
0137 } catch (...) {
0138 e2 = std::current_exception();
0139 }
0140 --waitingTasks;
0141 });
0142
0143 std::exception_ptr e3;
0144 queue.push(group, [&count, &waitingTasks, &e3] {
0145 try {
0146 CPPUNIT_ASSERT(++count == 3);
0147 } catch (...) {
0148 e3 = std::current_exception();
0149 }
0150 --waitingTasks;
0151 });
0152 std::this_thread::sleep_for(100us);
0153
0154 CPPUNIT_ASSERT(2 >= count);
0155 while (not resumerSet) {
0156 }
0157 CPPUNIT_ASSERT(resumer.resume());
0158 do {
0159 group.wait();
0160 } while (0 != waitingTasks.load());
0161 CPPUNIT_ASSERT(count == 3);
0162 if (e1) {
0163 std::rethrow_exception(e1);
0164 }
0165 if (e2) {
0166 std::rethrow_exception(e2);
0167 }
0168 if (e3) {
0169 std::rethrow_exception(e3);
0170 }
0171 }
0172 }
0173 }
0174
0175 void LimitedTaskQueue_test::stressTest() {
0176
0177 oneapi::tbb::task_group group;
0178
0179 constexpr unsigned int kMax = 3;
0180 edm::LimitedTaskQueue queue{kMax};
0181
0182 unsigned int index = 100;
0183 const unsigned int nTasks = 1000;
0184 while (0 != --index) {
0185 std::atomic<int> waiting{1};
0186 std::atomic<unsigned int> count{0};
0187 std::atomic<unsigned int> nRunningTasks{0};
0188
0189 std::atomic<bool> waitToStart{true};
0190 {
0191 group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks] {
0192 while (waitToStart) {
0193 };
0194 for (unsigned int i = 0; i < nTasks; ++i) {
0195 ++waiting;
0196 queue.push(group, [&count, &waiting, &nRunningTasks] {
0197 std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0198 auto nrt = nRunningTasks++;
0199 if (nrt >= kMax) {
0200 std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0201 }
0202 CPPUNIT_ASSERT(nrt < kMax);
0203 ++count;
0204 --nRunningTasks;
0205 });
0206 }
0207 });
0208
0209 group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks] {
0210 waitToStart = false;
0211 for (unsigned int i = 0; i < nTasks; ++i) {
0212 ++waiting;
0213 queue.push(group, [&count, &waiting, &nRunningTasks] {
0214 std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0215 auto nrt = nRunningTasks++;
0216 if (nrt >= kMax) {
0217 std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0218 }
0219 CPPUNIT_ASSERT(nrt < kMax);
0220 ++count;
0221 --nRunningTasks;
0222 });
0223 }
0224 --waiting;
0225 });
0226 }
0227 do {
0228 group.wait();
0229 } while (0 != waiting.load());
0230
0231 CPPUNIT_ASSERT(0 == nRunningTasks);
0232 CPPUNIT_ASSERT(2 * nTasks == count);
0233 }
0234 }