File indexing completed on 2024-10-03 05:26:59
0001
0002
0003
0004
0005
0006
0007
0008 #include <iostream>
0009
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <chrono>
0012 #include <memory>
0013 #include <atomic>
0014 #include <thread>
0015 #include "oneapi/tbb/task.h"
0016 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0017 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0018
0019 class WaitingTaskList_test : public CppUnit::TestFixture {
0020 CPPUNIT_TEST_SUITE(WaitingTaskList_test);
0021 CPPUNIT_TEST(addThenDone);
0022 CPPUNIT_TEST(doneThenAdd);
0023 CPPUNIT_TEST(addThenDoneFailed);
0024 CPPUNIT_TEST(doneThenAddFailed);
0025 CPPUNIT_TEST(stressTest);
0026 CPPUNIT_TEST_SUITE_END();
0027
0028 public:
0029 void addThenDone();
0030 void doneThenAdd();
0031 void addThenDoneFailed();
0032 void doneThenAddFailed();
0033 void stressTest();
0034 void setUp() {}
0035 void tearDown() {}
0036 };
0037
0038 namespace {
0039 class TestCalledTask : public edm::WaitingTask {
0040 public:
0041 TestCalledTask(std::atomic<bool>& iCalled, std::exception_ptr& iPtr) : m_called(iCalled), m_ptr(iPtr) {}
0042
0043 void execute() final {
0044 if (exceptionPtr()) {
0045 m_ptr = exceptionPtr();
0046 }
0047 m_called = true;
0048 return;
0049 }
0050
0051 private:
0052 std::atomic<bool>& m_called;
0053 std::exception_ptr& m_ptr;
0054 };
0055
0056 class TestValueSetTask : public edm::WaitingTask {
0057 public:
0058 TestValueSetTask(std::atomic<bool>& iValue) : m_value(iValue) {}
0059 void execute() final {
0060 CPPUNIT_ASSERT(m_value);
0061 return;
0062 }
0063
0064 private:
0065 std::atomic<bool>& m_value;
0066 };
0067
0068 }
0069 using namespace std::chrono_literals;
0070
0071 void WaitingTaskList_test::addThenDone() {
0072 std::atomic<bool> called{false};
0073
0074 edm::WaitingTaskList waitList;
0075 {
0076 std::exception_ptr excPtr;
0077
0078 auto t = new TestCalledTask{called, excPtr};
0079
0080 oneapi::tbb::task_group group;
0081 waitList.add(edm::WaitingTaskHolder(group, t));
0082
0083 std::this_thread::sleep_for(10us);
0084 CPPUNIT_ASSERT(false == called);
0085
0086 waitList.doneWaiting(std::exception_ptr{});
0087 group.wait();
0088 CPPUNIT_ASSERT(true == called);
0089 CPPUNIT_ASSERT(bool(excPtr) == false);
0090 }
0091
0092 waitList.reset();
0093 called = false;
0094
0095 {
0096 std::exception_ptr excPtr;
0097
0098 auto t = new TestCalledTask{called, excPtr};
0099
0100 oneapi::tbb::task_group group;
0101
0102 waitList.add(edm::WaitingTaskHolder(group, t));
0103
0104 std::this_thread::sleep_for(10us);
0105 CPPUNIT_ASSERT(false == called);
0106
0107 waitList.doneWaiting(std::exception_ptr{});
0108 group.wait();
0109 CPPUNIT_ASSERT(true == called);
0110 CPPUNIT_ASSERT(bool(excPtr) == false);
0111 }
0112 }
0113
0114 void WaitingTaskList_test::doneThenAdd() {
0115 std::atomic<bool> called{false};
0116 std::exception_ptr excPtr;
0117
0118 edm::WaitingTaskList waitList;
0119 {
0120 oneapi::tbb::task_group group;
0121
0122 auto t = new TestCalledTask{called, excPtr};
0123
0124 waitList.doneWaiting(std::exception_ptr{});
0125
0126 waitList.add(edm::WaitingTaskHolder(group, t));
0127 group.wait();
0128 CPPUNIT_ASSERT(true == called);
0129 CPPUNIT_ASSERT(bool(excPtr) == false);
0130 }
0131 }
0132
0133 void WaitingTaskList_test::addThenDoneFailed() {
0134 std::atomic<bool> called{false};
0135
0136 edm::WaitingTaskList waitList;
0137 {
0138 std::exception_ptr excPtr;
0139
0140 auto t = new TestCalledTask{called, excPtr};
0141
0142 oneapi::tbb::task_group group;
0143
0144 waitList.add(edm::WaitingTaskHolder(group, t));
0145
0146 std::this_thread::sleep_for(10us);
0147 CPPUNIT_ASSERT(false == called);
0148
0149 waitList.doneWaiting(std::make_exception_ptr(std::string("failed")));
0150 group.wait();
0151 CPPUNIT_ASSERT(true == called);
0152 CPPUNIT_ASSERT(bool(excPtr) == true);
0153 }
0154 }
0155
0156 void WaitingTaskList_test::doneThenAddFailed() {
0157 std::atomic<bool> called{false};
0158 std::exception_ptr excPtr;
0159
0160 edm::WaitingTaskList waitList;
0161 {
0162 auto t = new TestCalledTask{called, excPtr};
0163
0164 waitList.doneWaiting(std::make_exception_ptr(std::string("failed")));
0165
0166 oneapi::tbb::task_group group;
0167
0168 waitList.add(edm::WaitingTaskHolder(group, t));
0169 group.wait();
0170 CPPUNIT_ASSERT(true == called);
0171 CPPUNIT_ASSERT(bool(excPtr) == true);
0172 }
0173 }
0174
0175 namespace {
0176 void join_thread(std::thread* iThread) {
0177 if (iThread->joinable()) {
0178 iThread->join();
0179 }
0180 }
0181 }
0182
0183 void WaitingTaskList_test::stressTest() {
0184 edm::WaitingTaskList waitList;
0185 oneapi::tbb::task_group group;
0186
0187 unsigned int index = 1000;
0188 const unsigned int nTasks = 10000;
0189 while (0 != --index) {
0190 edm::FinalWaitingTask waitTask{group};
0191 auto* pWaitTask = &waitTask;
0192 {
0193 edm::WaitingTaskHolder waitTaskH(group, pWaitTask);
0194 std::thread makeTasksThread([&waitList, waitTaskH] {
0195 for (unsigned int i = 0; i < nTasks; ++i) {
0196 waitList.add(waitTaskH);
0197 }
0198 });
0199 std::shared_ptr<std::thread>(&makeTasksThread, join_thread);
0200
0201 std::thread doneWaitThread([&waitList, waitTaskH] { waitList.doneWaiting(std::exception_ptr{}); });
0202 std::shared_ptr<std::thread>(&doneWaitThread, join_thread);
0203 }
0204 waitTask.wait();
0205 }
0206 }
0207
0208 CPPUNIT_TEST_SUITE_REGISTRATION(WaitingTaskList_test);