File indexing completed on 2023-03-17 11:01:51
0001
0002
0003
0004
0005
0006
0007
0008 #include <iostream>
0009
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <unistd.h>
0012 #include <memory>
0013 #include <atomic>
0014 #include <thread>
0015 #include <iostream>
0016 #include "oneapi/tbb/task.h"
0017 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0018
0019 class SerialTaskQueueChain_test : public CppUnit::TestFixture {
0020 CPPUNIT_TEST_SUITE(SerialTaskQueueChain_test);
0021 CPPUNIT_TEST(testPush);
0022 CPPUNIT_TEST(testPushOne);
0023 CPPUNIT_TEST(stressTest);
0024 CPPUNIT_TEST_SUITE_END();
0025
0026 public:
0027 void testPush();
0028 void testPushOne();
0029 void stressTest();
0030 void setUp() {}
0031 void tearDown() {}
0032 };
0033
0034 CPPUNIT_TEST_SUITE_REGISTRATION(SerialTaskQueueChain_test);
0035
0036 void SerialTaskQueueChain_test::testPush() {
0037 std::atomic<unsigned int> count{0};
0038
0039 std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0040 std::make_shared<edm::SerialTaskQueue>()};
0041 edm::SerialTaskQueueChain chain(queues);
0042 {
0043 oneapi::tbb::task_group group;
0044 std::atomic<int> waiting{3};
0045 chain.push(group, [&count, &waiting] {
0046 CPPUNIT_ASSERT(count++ == 0);
0047 usleep(10);
0048 --waiting;
0049 });
0050
0051 chain.push(group, [&count, &waiting] {
0052 CPPUNIT_ASSERT(count++ == 1);
0053 usleep(10);
0054 --waiting;
0055 });
0056
0057 chain.push(group, [&count, &waiting] {
0058 CPPUNIT_ASSERT(count++ == 2);
0059 usleep(10);
0060 --waiting;
0061 });
0062
0063 do {
0064 group.wait();
0065 } while (0 != waiting.load());
0066 CPPUNIT_ASSERT(count == 3);
0067 while (chain.outstandingTasks() != 0)
0068 ;
0069 }
0070 }
0071
0072 void SerialTaskQueueChain_test::testPushOne() {
0073 std::atomic<unsigned int> count{0};
0074
0075 std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>()};
0076 edm::SerialTaskQueueChain chain(queues);
0077 {
0078 oneapi::tbb::task_group group;
0079 std::atomic<int> waiting{3};
0080
0081 chain.push(group, [&count, &waiting] {
0082 CPPUNIT_ASSERT(count++ == 0);
0083 usleep(10);
0084 --waiting;
0085 });
0086
0087 chain.push(group, [&count, &waiting] {
0088 CPPUNIT_ASSERT(count++ == 1);
0089 usleep(10);
0090 --waiting;
0091 });
0092
0093 chain.push(group, [&count, &waiting] {
0094 CPPUNIT_ASSERT(count++ == 2);
0095 usleep(10);
0096 --waiting;
0097 });
0098
0099 do {
0100 group.wait();
0101 } while (0 != waiting.load());
0102 CPPUNIT_ASSERT(count == 3);
0103 while (chain.outstandingTasks() != 0)
0104 ;
0105 }
0106 }
0107
0108 namespace {
0109 void join_thread(std::thread* iThread) {
0110 if (iThread->joinable()) {
0111 iThread->join();
0112 }
0113 }
0114 }
0115
0116 void SerialTaskQueueChain_test::stressTest() {
0117 std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0118 std::make_shared<edm::SerialTaskQueue>()};
0119
0120 edm::SerialTaskQueueChain chain(queues);
0121
0122 unsigned int index = 100;
0123 const unsigned int nTasks = 1000;
0124 while (0 != --index) {
0125 oneapi::tbb::task_group group;
0126 std::atomic<int> waiting{2};
0127 std::atomic<unsigned int> count{0};
0128
0129 std::atomic<bool> waitToStart{true};
0130 {
0131 std::thread pushThread([&chain, &waitToStart, &waiting, &group, &count] {
0132 while (waitToStart.load()) {
0133 };
0134 for (unsigned int i = 0; i < nTasks; ++i) {
0135 ++waiting;
0136 chain.push(group, [&count, &waiting] {
0137 ++count;
0138 --waiting;
0139 });
0140 }
0141 --waiting;
0142 });
0143
0144 waitToStart = false;
0145 for (unsigned int i = 0; i < nTasks; ++i) {
0146 ++waiting;
0147 chain.push(group, [&count, &waiting] {
0148 ++count;
0149 --waiting;
0150 });
0151 }
0152 --waiting;
0153 std::shared_ptr<std::thread>(&pushThread, join_thread);
0154 }
0155 do {
0156 group.wait();
0157 } while (0 != waiting.load());
0158
0159 CPPUNIT_ASSERT(2 * nTasks == count);
0160 }
0161 while (chain.outstandingTasks() != 0)
0162 ;
0163 }