File indexing completed on 2024-10-03 05:26:59
0001
0002
0003
0004
0005
0006
0007
0008 #include <iostream>
0009
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <chrono>
0012 #include <memory>
0013 #include <atomic>
0014 #include "oneapi/tbb/task_arena.h"
0015 #include "FWCore/Concurrency/interface/WaitingTask.h"
0016 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0017 #include "FWCore/Concurrency/interface/FunctorTask.h"
0018
0019 class SerialTaskQueue_test : public CppUnit::TestFixture {
0020 CPPUNIT_TEST_SUITE(SerialTaskQueue_test);
0021 CPPUNIT_TEST(testPush);
0022 CPPUNIT_TEST(testPause);
0023 CPPUNIT_TEST(stressTest);
0024 CPPUNIT_TEST_SUITE_END();
0025
0026 public:
0027 void testPush();
0028 void testPause();
0029 void stressTest();
0030 void setUp() {}
0031 void tearDown() {}
0032 };
0033
0034 CPPUNIT_TEST_SUITE_REGISTRATION(SerialTaskQueue_test);
0035 using namespace std::chrono_literals;
0036
0037 void SerialTaskQueue_test::testPush() {
0038 std::atomic<unsigned int> count{0};
0039
0040 edm::SerialTaskQueue queue;
0041 {
0042 std::atomic<unsigned int> waitingTasks{3};
0043 oneapi::tbb::task_group group;
0044
0045 queue.push(group, [&count, &waitingTasks] {
0046 CPPUNIT_ASSERT(count++ == 0);
0047 std::this_thread::sleep_for(10us);
0048 --waitingTasks;
0049 });
0050
0051 queue.push(group, [&count, &waitingTasks] {
0052 CPPUNIT_ASSERT(count++ == 1);
0053 std::this_thread::sleep_for(10us);
0054 --waitingTasks;
0055 });
0056
0057 queue.push(group, [&count, &waitingTasks] {
0058 CPPUNIT_ASSERT(count++ == 2);
0059 std::this_thread::sleep_for(10us);
0060 --waitingTasks;
0061 });
0062
0063 do {
0064 group.wait();
0065 } while (0 != waitingTasks.load());
0066 CPPUNIT_ASSERT(count == 3);
0067 }
0068 }
0069
0070 void SerialTaskQueue_test::testPause() {
0071 std::atomic<unsigned int> count{0};
0072
0073 edm::SerialTaskQueue queue;
0074 {
0075 queue.pause();
0076 {
0077 std::atomic<unsigned int> waitingTasks{1};
0078 oneapi::tbb::task_group group;
0079
0080 queue.push(group, [&count, &waitingTasks] {
0081 CPPUNIT_ASSERT(count++ == 0);
0082 --waitingTasks;
0083 });
0084 std::this_thread::sleep_for(1000us);
0085 CPPUNIT_ASSERT(0 == count);
0086 queue.resume();
0087 do {
0088 group.wait();
0089 } while (0 != waitingTasks.load());
0090 CPPUNIT_ASSERT(count == 1);
0091 }
0092
0093 {
0094 std::atomic<unsigned int> waitingTasks{3};
0095 oneapi::tbb::task_group group;
0096
0097 queue.push(group, [&count, &queue, &waitingTasks] {
0098 queue.pause();
0099 CPPUNIT_ASSERT(count++ == 1);
0100 --waitingTasks;
0101 });
0102 queue.push(group, [&count, &waitingTasks] {
0103 CPPUNIT_ASSERT(count++ == 2);
0104 --waitingTasks;
0105 });
0106 queue.push(group, [&count, &waitingTasks] {
0107 CPPUNIT_ASSERT(count++ == 3);
0108 --waitingTasks;
0109 });
0110 std::this_thread::sleep_for(100us);
0111
0112 CPPUNIT_ASSERT(2 >= count);
0113 queue.resume();
0114 do {
0115 group.wait();
0116 } while (0 != waitingTasks.load());
0117 CPPUNIT_ASSERT(count == 4);
0118 }
0119 }
0120 }
0121
0122 void SerialTaskQueue_test::stressTest() {
0123
0124 oneapi::tbb::task_group group;
0125 edm::SerialTaskQueue queue;
0126
0127 unsigned int index = 100;
0128 const unsigned int nTasks = 1000;
0129 while (0 != --index) {
0130 std::atomic<unsigned int> waitingTasks{2};
0131 std::atomic<unsigned int> count{0};
0132
0133 std::atomic<bool> waitToStart{true};
0134 {
0135 group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0136 while (waitToStart.load()) {
0137 }
0138 for (unsigned int i = 0; i < nTasks; ++i) {
0139 ++waitingTasks;
0140 queue.push(group, [&count, &waitingTasks] {
0141 ++count;
0142 --waitingTasks;
0143 });
0144 }
0145
0146 --waitingTasks;
0147 });
0148
0149 group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0150 waitToStart = false;
0151 for (unsigned int i = 0; i < nTasks; ++i) {
0152 ++waitingTasks;
0153 queue.push(group, [&count, &waitingTasks] {
0154 ++count;
0155 --waitingTasks;
0156 });
0157 }
0158 --waitingTasks;
0159 });
0160 }
0161 do {
0162 group.wait();
0163 } while (0 != waitingTasks.load());
0164
0165 CPPUNIT_ASSERT(2 * nTasks == count);
0166 }
0167 }
0168
0169 #include <Utilities/Testing/interface/CppUnit_testdriver.icpp>