File indexing completed on 2025-07-17 22:22:59
0001
0002
0003
0004
0005
0006
0007
0008 #include <iostream>
0009 #include <catch.hpp>
0010 #include <chrono>
0011 #include <memory>
0012 #include <atomic>
0013 #include <thread>
0014 #include "oneapi/tbb/task.h"
0015 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0016 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0017
0018 namespace {
0019 class TestCalledTask : public edm::WaitingTask {
0020 public:
0021 TestCalledTask(std::atomic<bool>& iCalled, std::exception_ptr& iPtr) : m_called(iCalled), m_ptr(iPtr) {}
0022 void execute() final {
0023 if (exceptionPtr()) {
0024 m_ptr = exceptionPtr();
0025 }
0026 m_called = true;
0027 return;
0028 }
0029
0030 private:
0031 std::atomic<bool>& m_called;
0032 std::exception_ptr& m_ptr;
0033 };
0034
0035 class TestValueSetTask : public edm::WaitingTask {
0036 public:
0037 TestValueSetTask(std::atomic<bool>& iValue) : m_value(iValue) {}
0038 void execute() final {
0039 REQUIRE(m_value);
0040 return;
0041 }
0042
0043 private:
0044 std::atomic<bool>& m_value;
0045 };
0046
0047 void join_thread(std::thread* iThread) {
0048 if (iThread->joinable()) {
0049 iThread->join();
0050 }
0051 }
0052 }
0053 using namespace std::chrono_literals;
0054
0055 TEST_CASE("WaitingTaskList", "[WaitingTaskList]") {
0056 SECTION("add then done") {
0057 std::atomic<bool> called{false};
0058 edm::WaitingTaskList waitList;
0059 {
0060 std::exception_ptr excPtr;
0061 auto t = new TestCalledTask{called, excPtr};
0062 oneapi::tbb::task_group group;
0063 waitList.add(edm::WaitingTaskHolder(group, t));
0064 std::this_thread::sleep_for(10us);
0065 REQUIRE_FALSE(called);
0066 waitList.doneWaiting(std::exception_ptr{});
0067 group.wait();
0068 REQUIRE(called);
0069 REQUIRE_FALSE(bool(excPtr));
0070 }
0071 waitList.reset();
0072 called = false;
0073 {
0074 std::exception_ptr excPtr;
0075 auto t = new TestCalledTask{called, excPtr};
0076 oneapi::tbb::task_group group;
0077 waitList.add(edm::WaitingTaskHolder(group, t));
0078 std::this_thread::sleep_for(10us);
0079 REQUIRE_FALSE(called);
0080 waitList.doneWaiting(std::exception_ptr{});
0081 group.wait();
0082 REQUIRE(called);
0083 REQUIRE_FALSE(bool(excPtr));
0084 }
0085 }
0086
0087 SECTION("done then add") {
0088 std::atomic<bool> called{false};
0089 std::exception_ptr excPtr;
0090 edm::WaitingTaskList waitList;
0091 {
0092 oneapi::tbb::task_group group;
0093 auto t = new TestCalledTask{called, excPtr};
0094 waitList.doneWaiting(std::exception_ptr{});
0095 waitList.add(edm::WaitingTaskHolder(group, t));
0096 group.wait();
0097 REQUIRE(called);
0098 REQUIRE_FALSE(bool(excPtr));
0099 }
0100 }
0101
0102 SECTION("add then done failed") {
0103 std::atomic<bool> called{false};
0104 edm::WaitingTaskList waitList;
0105 {
0106 std::exception_ptr excPtr;
0107 auto t = new TestCalledTask{called, excPtr};
0108 oneapi::tbb::task_group group;
0109 waitList.add(edm::WaitingTaskHolder(group, t));
0110 std::this_thread::sleep_for(10us);
0111 REQUIRE_FALSE(called);
0112 waitList.doneWaiting(std::make_exception_ptr(std::string("failed")));
0113 group.wait();
0114 REQUIRE(called);
0115 REQUIRE(bool(excPtr));
0116 }
0117 }
0118
0119 SECTION("done then add failed") {
0120 std::atomic<bool> called{false};
0121 std::exception_ptr excPtr;
0122 edm::WaitingTaskList waitList;
0123 {
0124 auto t = new TestCalledTask{called, excPtr};
0125 waitList.doneWaiting(std::make_exception_ptr(std::string("failed")));
0126 oneapi::tbb::task_group group;
0127 waitList.add(edm::WaitingTaskHolder(group, t));
0128 group.wait();
0129 REQUIRE(called);
0130 REQUIRE(bool(excPtr));
0131 }
0132 }
0133
0134 SECTION("stress Test") {
0135 edm::WaitingTaskList waitList;
0136 oneapi::tbb::task_group group;
0137 unsigned int index = 1000;
0138 const unsigned int nTasks = 10000;
0139 while (0 != --index) {
0140 edm::FinalWaitingTask waitTask{group};
0141 auto* pWaitTask = &waitTask;
0142 {
0143 edm::WaitingTaskHolder waitTaskH(group, pWaitTask);
0144 std::thread makeTasksThread([&waitList, waitTaskH] {
0145 for (unsigned int i = 0; i < nTasks; ++i) {
0146 waitList.add(waitTaskH);
0147 }
0148 });
0149 std::shared_ptr<std::thread>(&makeTasksThread, join_thread);
0150 std::thread doneWaitThread([&waitList, waitTaskH] { waitList.doneWaiting(std::exception_ptr{}); });
0151 std::shared_ptr<std::thread>(&doneWaitThread, join_thread);
0152 }
0153 waitTask.wait();
0154 }
0155 }
0156 }