File indexing completed on 2025-07-17 22:22:59
0001
0002
0003
0004
0005
0006
0007
0008 #include <iostream>
0009 #include <catch.hpp>
0010 #include <chrono>
0011 #include <memory>
0012 #include <atomic>
0013 #include <thread>
0014 #include <iostream>
0015 #include "oneapi/tbb/task.h"
0016 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0017
0018 using namespace std::chrono_literals;
0019
0020 namespace {
0021 void join_thread(std::thread* iThread) {
0022 if (iThread->joinable()) {
0023 iThread->join();
0024 }
0025 }
0026 }
0027
0028 TEST_CASE("SerialTaskQueueChain", "[SerialTaskQueueChain]") {
0029 SECTION("push") {
0030 std::atomic<unsigned int> count{0};
0031 std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0032 std::make_shared<edm::SerialTaskQueue>()};
0033 edm::SerialTaskQueueChain chain(queues);
0034 {
0035 oneapi::tbb::task_group group;
0036 std::atomic<int> waiting{3};
0037 chain.push(group, [&count, &waiting] {
0038 REQUIRE(count++ == 0u);
0039 std::this_thread::sleep_for(10us);
0040 --waiting;
0041 });
0042 chain.push(group, [&count, &waiting] {
0043 REQUIRE(count++ == 1u);
0044 std::this_thread::sleep_for(10us);
0045 --waiting;
0046 });
0047 chain.push(group, [&count, &waiting] {
0048 REQUIRE(count++ == 2u);
0049 std::this_thread::sleep_for(10us);
0050 --waiting;
0051 });
0052 do {
0053 group.wait();
0054 } while (0 != waiting.load());
0055 REQUIRE(count == 3u);
0056 while (chain.outstandingTasks() != 0)
0057 ;
0058 }
0059 }
0060
0061 SECTION("push one") {
0062 std::atomic<unsigned int> count{0};
0063 std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>()};
0064 edm::SerialTaskQueueChain chain(queues);
0065 {
0066 oneapi::tbb::task_group group;
0067 std::atomic<int> waiting{3};
0068 chain.push(group, [&count, &waiting] {
0069 REQUIRE(count++ == 0u);
0070 std::this_thread::sleep_for(10us);
0071 --waiting;
0072 });
0073 chain.push(group, [&count, &waiting] {
0074 REQUIRE(count++ == 1u);
0075 std::this_thread::sleep_for(10us);
0076 --waiting;
0077 });
0078 chain.push(group, [&count, &waiting] {
0079 REQUIRE(count++ == 2u);
0080 std::this_thread::sleep_for(10us);
0081 --waiting;
0082 });
0083 do {
0084 group.wait();
0085 } while (0 != waiting.load());
0086 REQUIRE(count == 3u);
0087 while (chain.outstandingTasks() != 0)
0088 ;
0089 }
0090 }
0091
0092 SECTION("stress test") {
0093 std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0094 std::make_shared<edm::SerialTaskQueue>()};
0095 edm::SerialTaskQueueChain chain(queues);
0096 unsigned int index = 100;
0097 const unsigned int nTasks = 1000;
0098 while (0 != --index) {
0099 oneapi::tbb::task_group group;
0100 std::atomic<int> waiting{2};
0101 std::atomic<unsigned int> count{0};
0102 std::atomic<bool> waitToStart{true};
0103 {
0104 std::thread pushThread([&chain, &waitToStart, &waiting, &group, &count] {
0105 while (waitToStart.load()) {
0106 };
0107 for (unsigned int i = 0; i < nTasks; ++i) {
0108 ++waiting;
0109 chain.push(group, [&count, &waiting] {
0110 ++count;
0111 --waiting;
0112 });
0113 }
0114 --waiting;
0115 });
0116 waitToStart = false;
0117 for (unsigned int i = 0; i < nTasks; ++i) {
0118 ++waiting;
0119 chain.push(group, [&count, &waiting] {
0120 ++count;
0121 --waiting;
0122 });
0123 }
0124 --waiting;
0125 std::shared_ptr<std::thread>(&pushThread, join_thread);
0126 }
0127 do {
0128 group.wait();
0129 } while (0 != waiting.load());
0130 REQUIRE(2 * nTasks == count);
0131 }
0132 while (chain.outstandingTasks() != 0)
0133 ;
0134 }
0135 }