File indexing completed on 2023-03-17 11:01:51
0001
0002
0003
0004
0005
0006
0007
0008 #include <iostream>
0009
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <unistd.h>
0012 #include <memory>
0013 #include <atomic>
0014 #include "oneapi/tbb/task_arena.h"
0015 #include "FWCore/Concurrency/interface/WaitingTask.h"
0016 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0017 #include "FWCore/Concurrency/interface/FunctorTask.h"
0018
0019 class LimitedTaskQueue_test : public CppUnit::TestFixture {
0020 CPPUNIT_TEST_SUITE(LimitedTaskQueue_test);
0021 CPPUNIT_TEST(testPush);
0022 CPPUNIT_TEST(testPause);
0023 CPPUNIT_TEST(stressTest);
0024 CPPUNIT_TEST_SUITE_END();
0025
0026 public:
0027 void testPush();
0028 void testPause();
0029 void stressTest();
0030 void setUp() {}
0031 void tearDown() {}
0032 };
0033
0034 CPPUNIT_TEST_SUITE_REGISTRATION(LimitedTaskQueue_test);
0035
0036 void LimitedTaskQueue_test::testPush() {
0037 {
0038 std::atomic<unsigned int> count{0};
0039
0040 edm::LimitedTaskQueue queue{1};
0041 {
0042 std::atomic<int> waitingTasks{3};
0043 oneapi::tbb::task_group group;
0044
0045 queue.push(group, [&count, &waitingTasks] {
0046 CPPUNIT_ASSERT(count++ == 0);
0047 usleep(10);
0048 --waitingTasks;
0049 });
0050
0051 queue.push(group, [&count, &waitingTasks] {
0052 CPPUNIT_ASSERT(count++ == 1);
0053 usleep(10);
0054 --waitingTasks;
0055 });
0056
0057 queue.push(group, [&count, &waitingTasks] {
0058 CPPUNIT_ASSERT(count++ == 2);
0059 usleep(10);
0060 --waitingTasks;
0061 });
0062
0063 do {
0064 group.wait();
0065 } while (0 != waitingTasks.load());
0066 CPPUNIT_ASSERT(count == 3);
0067 }
0068 }
0069
0070 {
0071 std::atomic<unsigned int> count{0};
0072
0073 constexpr unsigned int kMax = 2;
0074 edm::LimitedTaskQueue queue{kMax};
0075 {
0076 std::atomic<int> waitingTasks{3};
0077 oneapi::tbb::task_group group;
0078
0079 queue.push(group, [&count, &waitingTasks] {
0080 CPPUNIT_ASSERT(count++ < kMax);
0081 usleep(10);
0082 --count;
0083 --waitingTasks;
0084 });
0085
0086 queue.push(group, [&count, &waitingTasks] {
0087 CPPUNIT_ASSERT(count++ < kMax);
0088 usleep(10);
0089 --count;
0090 --waitingTasks;
0091 });
0092
0093 queue.push(group, [&count, &waitingTasks] {
0094 CPPUNIT_ASSERT(count++ < kMax);
0095 usleep(10);
0096 --count;
0097 --waitingTasks;
0098 });
0099
0100 do {
0101 group.wait();
0102 } while (0 != waitingTasks);
0103 CPPUNIT_ASSERT(count == 0);
0104 }
0105 }
0106 }
0107
0108 void LimitedTaskQueue_test::testPause() {
0109 std::atomic<unsigned int> count{0};
0110
0111 edm::LimitedTaskQueue queue{1};
0112 {
0113 {
0114 std::atomic<int> waitingTasks{3};
0115 oneapi::tbb::task_group group;
0116
0117 edm::LimitedTaskQueue::Resumer resumer;
0118 std::atomic<bool> resumerSet{false};
0119 std::exception_ptr e1;
0120 queue.pushAndPause(group,
0121 [&resumer, &resumerSet, &count, &waitingTasks, &e1](edm::LimitedTaskQueue::Resumer iResumer) {
0122 resumer = std::move(iResumer);
0123 resumerSet = true;
0124 try {
0125 CPPUNIT_ASSERT(++count == 1);
0126 } catch (...) {
0127 e1 = std::current_exception();
0128 }
0129 --waitingTasks;
0130 });
0131
0132 std::exception_ptr e2;
0133 queue.push(group, [&count, &waitingTasks, &e2] {
0134 try {
0135 CPPUNIT_ASSERT(++count == 2);
0136 } catch (...) {
0137 e2 = std::current_exception();
0138 }
0139 --waitingTasks;
0140 });
0141
0142 std::exception_ptr e3;
0143 queue.push(group, [&count, &waitingTasks, &e3] {
0144 try {
0145 CPPUNIT_ASSERT(++count == 3);
0146 } catch (...) {
0147 e3 = std::current_exception();
0148 }
0149 --waitingTasks;
0150 });
0151 usleep(100);
0152
0153 CPPUNIT_ASSERT(2 >= count);
0154 while (not resumerSet) {
0155 }
0156 CPPUNIT_ASSERT(resumer.resume());
0157 do {
0158 group.wait();
0159 } while (0 != waitingTasks.load());
0160 CPPUNIT_ASSERT(count == 3);
0161 if (e1) {
0162 std::rethrow_exception(e1);
0163 }
0164 if (e2) {
0165 std::rethrow_exception(e2);
0166 }
0167 if (e3) {
0168 std::rethrow_exception(e3);
0169 }
0170 }
0171 }
0172 }
0173
0174 void LimitedTaskQueue_test::stressTest() {
0175
0176 oneapi::tbb::task_group group;
0177
0178 constexpr unsigned int kMax = 3;
0179 edm::LimitedTaskQueue queue{kMax};
0180
0181 unsigned int index = 100;
0182 const unsigned int nTasks = 1000;
0183 while (0 != --index) {
0184 std::atomic<int> waiting{1};
0185 std::atomic<unsigned int> count{0};
0186 std::atomic<unsigned int> nRunningTasks{0};
0187
0188 std::atomic<bool> waitToStart{true};
0189 {
0190 group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks] {
0191 while (waitToStart) {
0192 };
0193 for (unsigned int i = 0; i < nTasks; ++i) {
0194 ++waiting;
0195 queue.push(group, [&count, &waiting, &nRunningTasks] {
0196 std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0197 auto nrt = nRunningTasks++;
0198 if (nrt >= kMax) {
0199 std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0200 }
0201 CPPUNIT_ASSERT(nrt < kMax);
0202 ++count;
0203 --nRunningTasks;
0204 });
0205 }
0206 });
0207
0208 group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks] {
0209 waitToStart = false;
0210 for (unsigned int i = 0; i < nTasks; ++i) {
0211 ++waiting;
0212 queue.push(group, [&count, &waiting, &nRunningTasks] {
0213 std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0214 auto nrt = nRunningTasks++;
0215 if (nrt >= kMax) {
0216 std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0217 }
0218 CPPUNIT_ASSERT(nrt < kMax);
0219 ++count;
0220 --nRunningTasks;
0221 });
0222 }
0223 --waiting;
0224 });
0225 }
0226 do {
0227 group.wait();
0228 } while (0 != waiting.load());
0229
0230 CPPUNIT_ASSERT(0 == nRunningTasks);
0231 CPPUNIT_ASSERT(2 * nTasks == count);
0232 }
0233 }