File indexing completed on 2024-04-06 12:11:56
0001
0002
0003
0004
0005
0006
0007
0008 #include <iostream>
0009
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <unistd.h>
0012 #include <memory>
0013 #include <atomic>
0014 #include "oneapi/tbb/task_arena.h"
0015 #include "FWCore/Concurrency/interface/WaitingTask.h"
0016 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0017 #include "FWCore/Concurrency/interface/FunctorTask.h"
0018
0019 class SerialTaskQueue_test : public CppUnit::TestFixture {
0020 CPPUNIT_TEST_SUITE(SerialTaskQueue_test);
0021 CPPUNIT_TEST(testPush);
0022 CPPUNIT_TEST(testPause);
0023 CPPUNIT_TEST(stressTest);
0024 CPPUNIT_TEST_SUITE_END();
0025
0026 public:
0027 void testPush();
0028 void testPause();
0029 void stressTest();
0030 void setUp() {}
0031 void tearDown() {}
0032 };
0033
0034 CPPUNIT_TEST_SUITE_REGISTRATION(SerialTaskQueue_test);
0035
0036 void SerialTaskQueue_test::testPush() {
0037 std::atomic<unsigned int> count{0};
0038
0039 edm::SerialTaskQueue queue;
0040 {
0041 std::atomic<unsigned int> waitingTasks{3};
0042 oneapi::tbb::task_group group;
0043
0044 queue.push(group, [&count, &waitingTasks] {
0045 CPPUNIT_ASSERT(count++ == 0);
0046 usleep(10);
0047 --waitingTasks;
0048 });
0049
0050 queue.push(group, [&count, &waitingTasks] {
0051 CPPUNIT_ASSERT(count++ == 1);
0052 usleep(10);
0053 --waitingTasks;
0054 });
0055
0056 queue.push(group, [&count, &waitingTasks] {
0057 CPPUNIT_ASSERT(count++ == 2);
0058 usleep(10);
0059 --waitingTasks;
0060 });
0061
0062 do {
0063 group.wait();
0064 } while (0 != waitingTasks.load());
0065 CPPUNIT_ASSERT(count == 3);
0066 }
0067 }
0068
0069 void SerialTaskQueue_test::testPause() {
0070 std::atomic<unsigned int> count{0};
0071
0072 edm::SerialTaskQueue queue;
0073 {
0074 queue.pause();
0075 {
0076 std::atomic<unsigned int> waitingTasks{1};
0077 oneapi::tbb::task_group group;
0078
0079 queue.push(group, [&count, &waitingTasks] {
0080 CPPUNIT_ASSERT(count++ == 0);
0081 --waitingTasks;
0082 });
0083 usleep(1000);
0084 CPPUNIT_ASSERT(0 == count);
0085 queue.resume();
0086 do {
0087 group.wait();
0088 } while (0 != waitingTasks.load());
0089 CPPUNIT_ASSERT(count == 1);
0090 }
0091
0092 {
0093 std::atomic<unsigned int> waitingTasks{3};
0094 oneapi::tbb::task_group group;
0095
0096 queue.push(group, [&count, &queue, &waitingTasks] {
0097 queue.pause();
0098 CPPUNIT_ASSERT(count++ == 1);
0099 --waitingTasks;
0100 });
0101 queue.push(group, [&count, &waitingTasks] {
0102 CPPUNIT_ASSERT(count++ == 2);
0103 --waitingTasks;
0104 });
0105 queue.push(group, [&count, &waitingTasks] {
0106 CPPUNIT_ASSERT(count++ == 3);
0107 --waitingTasks;
0108 });
0109 usleep(100);
0110
0111 CPPUNIT_ASSERT(2 >= count);
0112 queue.resume();
0113 do {
0114 group.wait();
0115 } while (0 != waitingTasks.load());
0116 CPPUNIT_ASSERT(count == 4);
0117 }
0118 }
0119 }
0120
0121 void SerialTaskQueue_test::stressTest() {
0122
0123 oneapi::tbb::task_group group;
0124 edm::SerialTaskQueue queue;
0125
0126 unsigned int index = 100;
0127 const unsigned int nTasks = 1000;
0128 while (0 != --index) {
0129 std::atomic<unsigned int> waitingTasks{2};
0130 std::atomic<unsigned int> count{0};
0131
0132 std::atomic<bool> waitToStart{true};
0133 {
0134 group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0135 while (waitToStart.load()) {
0136 }
0137 for (unsigned int i = 0; i < nTasks; ++i) {
0138 ++waitingTasks;
0139 queue.push(group, [&count, &waitingTasks] {
0140 ++count;
0141 --waitingTasks;
0142 });
0143 }
0144
0145 --waitingTasks;
0146 });
0147
0148 group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0149 waitToStart = false;
0150 for (unsigned int i = 0; i < nTasks; ++i) {
0151 ++waitingTasks;
0152 queue.push(group, [&count, &waitingTasks] {
0153 ++count;
0154 --waitingTasks;
0155 });
0156 }
0157 --waitingTasks;
0158 });
0159 }
0160 do {
0161 group.wait();
0162 } while (0 != waitingTasks.load());
0163
0164 CPPUNIT_ASSERT(2 * nTasks == count);
0165 }
0166 }
0167
0168 #include <Utilities/Testing/interface/CppUnit_testdriver.icpp>