File indexing completed on 2024-10-03 05:26:59
0001
0002
0003
0004
0005
0006
0007
0008 #include <iostream>
0009
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <chrono>
0012 #include <memory>
0013 #include <atomic>
0014 #include <thread>
0015 #include <iostream>
0016 #include "oneapi/tbb/task.h"
0017 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0018
0019 class SerialTaskQueueChain_test : public CppUnit::TestFixture {
0020 CPPUNIT_TEST_SUITE(SerialTaskQueueChain_test);
0021 CPPUNIT_TEST(testPush);
0022 CPPUNIT_TEST(testPushOne);
0023 CPPUNIT_TEST(stressTest);
0024 CPPUNIT_TEST_SUITE_END();
0025
0026 public:
0027 void testPush();
0028 void testPushOne();
0029 void stressTest();
0030 void setUp() {}
0031 void tearDown() {}
0032 };
0033
0034 CPPUNIT_TEST_SUITE_REGISTRATION(SerialTaskQueueChain_test);
0035 using namespace std::chrono_literals;
0036
0037 void SerialTaskQueueChain_test::testPush() {
0038 std::atomic<unsigned int> count{0};
0039
0040 std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0041 std::make_shared<edm::SerialTaskQueue>()};
0042 edm::SerialTaskQueueChain chain(queues);
0043 {
0044 oneapi::tbb::task_group group;
0045 std::atomic<int> waiting{3};
0046 chain.push(group, [&count, &waiting] {
0047 CPPUNIT_ASSERT(count++ == 0);
0048 std::this_thread::sleep_for(10us);
0049 --waiting;
0050 });
0051
0052 chain.push(group, [&count, &waiting] {
0053 CPPUNIT_ASSERT(count++ == 1);
0054 std::this_thread::sleep_for(10us);
0055 --waiting;
0056 });
0057
0058 chain.push(group, [&count, &waiting] {
0059 CPPUNIT_ASSERT(count++ == 2);
0060 std::this_thread::sleep_for(10us);
0061 --waiting;
0062 });
0063
0064 do {
0065 group.wait();
0066 } while (0 != waiting.load());
0067 CPPUNIT_ASSERT(count == 3);
0068 while (chain.outstandingTasks() != 0)
0069 ;
0070 }
0071 }
0072
0073 void SerialTaskQueueChain_test::testPushOne() {
0074 std::atomic<unsigned int> count{0};
0075
0076 std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>()};
0077 edm::SerialTaskQueueChain chain(queues);
0078 {
0079 oneapi::tbb::task_group group;
0080 std::atomic<int> waiting{3};
0081
0082 chain.push(group, [&count, &waiting] {
0083 CPPUNIT_ASSERT(count++ == 0);
0084 std::this_thread::sleep_for(10us);
0085 --waiting;
0086 });
0087
0088 chain.push(group, [&count, &waiting] {
0089 CPPUNIT_ASSERT(count++ == 1);
0090 std::this_thread::sleep_for(10us);
0091 --waiting;
0092 });
0093
0094 chain.push(group, [&count, &waiting] {
0095 CPPUNIT_ASSERT(count++ == 2);
0096 std::this_thread::sleep_for(10us);
0097 --waiting;
0098 });
0099
0100 do {
0101 group.wait();
0102 } while (0 != waiting.load());
0103 CPPUNIT_ASSERT(count == 3);
0104 while (chain.outstandingTasks() != 0)
0105 ;
0106 }
0107 }
0108
0109 namespace {
0110 void join_thread(std::thread* iThread) {
0111 if (iThread->joinable()) {
0112 iThread->join();
0113 }
0114 }
0115 }
0116
0117 void SerialTaskQueueChain_test::stressTest() {
0118 std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0119 std::make_shared<edm::SerialTaskQueue>()};
0120
0121 edm::SerialTaskQueueChain chain(queues);
0122
0123 unsigned int index = 100;
0124 const unsigned int nTasks = 1000;
0125 while (0 != --index) {
0126 oneapi::tbb::task_group group;
0127 std::atomic<int> waiting{2};
0128 std::atomic<unsigned int> count{0};
0129
0130 std::atomic<bool> waitToStart{true};
0131 {
0132 std::thread pushThread([&chain, &waitToStart, &waiting, &group, &count] {
0133 while (waitToStart.load()) {
0134 };
0135 for (unsigned int i = 0; i < nTasks; ++i) {
0136 ++waiting;
0137 chain.push(group, [&count, &waiting] {
0138 ++count;
0139 --waiting;
0140 });
0141 }
0142 --waiting;
0143 });
0144
0145 waitToStart = false;
0146 for (unsigned int i = 0; i < nTasks; ++i) {
0147 ++waiting;
0148 chain.push(group, [&count, &waiting] {
0149 ++count;
0150 --waiting;
0151 });
0152 }
0153 --waiting;
0154 std::shared_ptr<std::thread>(&pushThread, join_thread);
0155 }
0156 do {
0157 group.wait();
0158 } while (0 != waiting.load());
0159
0160 CPPUNIT_ASSERT(2 * nTasks == count);
0161 }
0162 while (chain.outstandingTasks() != 0)
0163 ;
0164 }