File indexing completed on 2025-07-17 22:22:59
0001
0002
0003
0004
0005
0006
0007
0008 #include <iostream>
0009 #define CATCH_CONFIG_MAIN
0010
0011 #include <catch.hpp>
0012 #include <chrono>
0013 #include <memory>
0014 #include <atomic>
0015 #include <thread>
0016 #include "oneapi/tbb/task_arena.h"
0017 #include "FWCore/Concurrency/interface/WaitingTask.h"
0018 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0019 #include "FWCore/Concurrency/interface/FunctorTask.h"
0020
0021 using namespace std::chrono_literals;
0022
0023 TEST_CASE("SerialTaskQueue", "[SerialTaskQueue]") {
0024 SECTION("push") {
0025 std::atomic<unsigned int> count{0};
0026 edm::SerialTaskQueue queue;
0027 {
0028 std::atomic<unsigned int> waitingTasks{3};
0029 oneapi::tbb::task_group group;
0030 queue.push(group, [&count, &waitingTasks] {
0031 REQUIRE(count++ == 0u);
0032 std::this_thread::sleep_for(10us);
0033 --waitingTasks;
0034 });
0035 queue.push(group, [&count, &waitingTasks] {
0036 REQUIRE(count++ == 1u);
0037 std::this_thread::sleep_for(10us);
0038 --waitingTasks;
0039 });
0040 queue.push(group, [&count, &waitingTasks] {
0041 REQUIRE(count++ == 2u);
0042 std::this_thread::sleep_for(10us);
0043 --waitingTasks;
0044 });
0045 do {
0046 group.wait();
0047 } while (0 != waitingTasks.load());
0048 REQUIRE(count == 3u);
0049 }
0050 }
0051
0052 SECTION("pause") {
0053 std::atomic<unsigned int> count{0};
0054 edm::SerialTaskQueue queue;
0055 {
0056 queue.pause();
0057 {
0058 std::atomic<unsigned int> waitingTasks{1};
0059 oneapi::tbb::task_group group;
0060 queue.push(group, [&count, &waitingTasks] {
0061 REQUIRE(count++ == 0u);
0062 --waitingTasks;
0063 });
0064 std::this_thread::sleep_for(1000us);
0065 REQUIRE(count == 0u);
0066 queue.resume();
0067 do {
0068 group.wait();
0069 } while (0 != waitingTasks.load());
0070 REQUIRE(count == 1u);
0071 }
0072 {
0073 std::atomic<unsigned int> waitingTasks{3};
0074 oneapi::tbb::task_group group;
0075 queue.push(group, [&count, &queue, &waitingTasks] {
0076 queue.pause();
0077 REQUIRE(count++ == 1u);
0078 --waitingTasks;
0079 });
0080 queue.push(group, [&count, &waitingTasks] {
0081 REQUIRE(count++ == 2u);
0082 --waitingTasks;
0083 });
0084 queue.push(group, [&count, &waitingTasks] {
0085 REQUIRE(count++ == 3u);
0086 --waitingTasks;
0087 });
0088 std::this_thread::sleep_for(100us);
0089 REQUIRE(2u >= count);
0090 queue.resume();
0091 do {
0092 group.wait();
0093 } while (0 != waitingTasks.load());
0094 REQUIRE(count == 4u);
0095 }
0096 }
0097 }
0098
0099 SECTION("stress test") {
0100 oneapi::tbb::task_group group;
0101 edm::SerialTaskQueue queue;
0102 unsigned int index = 100;
0103 const unsigned int nTasks = 1000;
0104 while (0 != --index) {
0105 std::atomic<unsigned int> waitingTasks{2};
0106 std::atomic<unsigned int> count{0};
0107 std::atomic<bool> waitToStart{true};
0108 {
0109 group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0110 while (waitToStart.load()) {
0111 }
0112 for (unsigned int i = 0; i < nTasks; ++i) {
0113 ++waitingTasks;
0114 queue.push(group, [&count, &waitingTasks] {
0115 ++count;
0116 --waitingTasks;
0117 });
0118 }
0119 --waitingTasks;
0120 });
0121 group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0122 waitToStart = false;
0123 for (unsigned int i = 0; i < nTasks; ++i) {
0124 ++waitingTasks;
0125 queue.push(group, [&count, &waitingTasks] {
0126 ++count;
0127 --waitingTasks;
0128 });
0129 }
0130 --waitingTasks;
0131 });
0132 }
0133 do {
0134 group.wait();
0135 } while (0 != waitingTasks.load());
0136 REQUIRE(2 * nTasks == count);
0137 }
0138 }
0139 }