Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-07-17 22:22:59

0001 //
0002 //  SerialTaskQueue_test.cpp
0003 //  DispatchProcessingDemo
0004 //
0005 //  Created by Chris Jones on 9/27/11.
0006 //
0007 
0008 #include <iostream>
0009 #define CATCH_CONFIG_MAIN
0010 
0011 #include <catch.hpp>
0012 #include <chrono>
0013 #include <memory>
0014 #include <atomic>
0015 #include <thread>
0016 #include "oneapi/tbb/task_arena.h"
0017 #include "FWCore/Concurrency/interface/WaitingTask.h"
0018 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0019 #include "FWCore/Concurrency/interface/FunctorTask.h"
0020 
0021 using namespace std::chrono_literals;
0022 
0023 TEST_CASE("SerialTaskQueue", "[SerialTaskQueue]") {
0024   SECTION("push") {
0025     std::atomic<unsigned int> count{0};
0026     edm::SerialTaskQueue queue;
0027     {
0028       std::atomic<unsigned int> waitingTasks{3};
0029       oneapi::tbb::task_group group;
0030       queue.push(group, [&count, &waitingTasks] {
0031         REQUIRE(count++ == 0u);
0032         std::this_thread::sleep_for(10us);
0033         --waitingTasks;
0034       });
0035       queue.push(group, [&count, &waitingTasks] {
0036         REQUIRE(count++ == 1u);
0037         std::this_thread::sleep_for(10us);
0038         --waitingTasks;
0039       });
0040       queue.push(group, [&count, &waitingTasks] {
0041         REQUIRE(count++ == 2u);
0042         std::this_thread::sleep_for(10us);
0043         --waitingTasks;
0044       });
0045       do {
0046         group.wait();
0047       } while (0 != waitingTasks.load());
0048       REQUIRE(count == 3u);
0049     }
0050   }
0051 
0052   SECTION("pause") {
0053     std::atomic<unsigned int> count{0};
0054     edm::SerialTaskQueue queue;
0055     {
0056       queue.pause();
0057       {
0058         std::atomic<unsigned int> waitingTasks{1};
0059         oneapi::tbb::task_group group;
0060         queue.push(group, [&count, &waitingTasks] {
0061           REQUIRE(count++ == 0u);
0062           --waitingTasks;
0063         });
0064         std::this_thread::sleep_for(1000us);
0065         REQUIRE(count == 0u);
0066         queue.resume();
0067         do {
0068           group.wait();
0069         } while (0 != waitingTasks.load());
0070         REQUIRE(count == 1u);
0071       }
0072       {
0073         std::atomic<unsigned int> waitingTasks{3};
0074         oneapi::tbb::task_group group;
0075         queue.push(group, [&count, &queue, &waitingTasks] {
0076           queue.pause();
0077           REQUIRE(count++ == 1u);
0078           --waitingTasks;
0079         });
0080         queue.push(group, [&count, &waitingTasks] {
0081           REQUIRE(count++ == 2u);
0082           --waitingTasks;
0083         });
0084         queue.push(group, [&count, &waitingTasks] {
0085           REQUIRE(count++ == 3u);
0086           --waitingTasks;
0087         });
0088         std::this_thread::sleep_for(100us);
0089         REQUIRE(2u >= count);
0090         queue.resume();
0091         do {
0092           group.wait();
0093         } while (0 != waitingTasks.load());
0094         REQUIRE(count == 4u);
0095       }
0096     }
0097   }
0098 
0099   SECTION("stress test") {
0100     oneapi::tbb::task_group group;
0101     edm::SerialTaskQueue queue;
0102     unsigned int index = 100;
0103     const unsigned int nTasks = 1000;
0104     while (0 != --index) {
0105       std::atomic<unsigned int> waitingTasks{2};
0106       std::atomic<unsigned int> count{0};
0107       std::atomic<bool> waitToStart{true};
0108       {
0109         group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0110           while (waitToStart.load()) {
0111           }
0112           for (unsigned int i = 0; i < nTasks; ++i) {
0113             ++waitingTasks;
0114             queue.push(group, [&count, &waitingTasks] {
0115               ++count;
0116               --waitingTasks;
0117             });
0118           }
0119           --waitingTasks;
0120         });
0121         group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0122           waitToStart = false;
0123           for (unsigned int i = 0; i < nTasks; ++i) {
0124             ++waitingTasks;
0125             queue.push(group, [&count, &waitingTasks] {
0126               ++count;
0127               --waitingTasks;
0128             });
0129           }
0130           --waitingTasks;
0131         });
0132       }
0133       do {
0134         group.wait();
0135       } while (0 != waitingTasks.load());
0136       REQUIRE(2 * nTasks == count);
0137     }
0138   }
0139 }