Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-07-17 22:22:59

0001 //
0002 //  SerialTaskQueue_test.cpp
0003 //  DispatchProcessingDemo
0004 //
0005 //  Created by Chris Jones on 9/27/11.
0006 //
0007 
0008 #include <iostream>
0009 #include <catch.hpp>
0010 #include <chrono>
0011 #include <memory>
0012 #include <atomic>
0013 #include <thread>
0014 #include <iostream>
0015 #include "oneapi/tbb/task.h"
0016 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0017 
0018 using namespace std::chrono_literals;
0019 
0020 namespace {
0021   void join_thread(std::thread* iThread) {
0022     if (iThread->joinable()) {
0023       iThread->join();
0024     }
0025   }
0026 }  // namespace
0027 
0028 TEST_CASE("SerialTaskQueueChain", "[SerialTaskQueueChain]") {
0029   SECTION("push") {
0030     std::atomic<unsigned int> count{0};
0031     std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0032                                                                  std::make_shared<edm::SerialTaskQueue>()};
0033     edm::SerialTaskQueueChain chain(queues);
0034     {
0035       oneapi::tbb::task_group group;
0036       std::atomic<int> waiting{3};
0037       chain.push(group, [&count, &waiting] {
0038         REQUIRE(count++ == 0u);
0039         std::this_thread::sleep_for(10us);
0040         --waiting;
0041       });
0042       chain.push(group, [&count, &waiting] {
0043         REQUIRE(count++ == 1u);
0044         std::this_thread::sleep_for(10us);
0045         --waiting;
0046       });
0047       chain.push(group, [&count, &waiting] {
0048         REQUIRE(count++ == 2u);
0049         std::this_thread::sleep_for(10us);
0050         --waiting;
0051       });
0052       do {
0053         group.wait();
0054       } while (0 != waiting.load());
0055       REQUIRE(count == 3u);
0056       while (chain.outstandingTasks() != 0)
0057         ;
0058     }
0059   }
0060 
0061   SECTION("push one") {
0062     std::atomic<unsigned int> count{0};
0063     std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>()};
0064     edm::SerialTaskQueueChain chain(queues);
0065     {
0066       oneapi::tbb::task_group group;
0067       std::atomic<int> waiting{3};
0068       chain.push(group, [&count, &waiting] {
0069         REQUIRE(count++ == 0u);
0070         std::this_thread::sleep_for(10us);
0071         --waiting;
0072       });
0073       chain.push(group, [&count, &waiting] {
0074         REQUIRE(count++ == 1u);
0075         std::this_thread::sleep_for(10us);
0076         --waiting;
0077       });
0078       chain.push(group, [&count, &waiting] {
0079         REQUIRE(count++ == 2u);
0080         std::this_thread::sleep_for(10us);
0081         --waiting;
0082       });
0083       do {
0084         group.wait();
0085       } while (0 != waiting.load());
0086       REQUIRE(count == 3u);
0087       while (chain.outstandingTasks() != 0)
0088         ;
0089     }
0090   }
0091 
0092   SECTION("stress test") {
0093     std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0094                                                                  std::make_shared<edm::SerialTaskQueue>()};
0095     edm::SerialTaskQueueChain chain(queues);
0096     unsigned int index = 100;
0097     const unsigned int nTasks = 1000;
0098     while (0 != --index) {
0099       oneapi::tbb::task_group group;
0100       std::atomic<int> waiting{2};
0101       std::atomic<unsigned int> count{0};
0102       std::atomic<bool> waitToStart{true};
0103       {
0104         std::thread pushThread([&chain, &waitToStart, &waiting, &group, &count] {
0105           while (waitToStart.load()) {
0106           };
0107           for (unsigned int i = 0; i < nTasks; ++i) {
0108             ++waiting;
0109             chain.push(group, [&count, &waiting] {
0110               ++count;
0111               --waiting;
0112             });
0113           }
0114           --waiting;
0115         });
0116         waitToStart = false;
0117         for (unsigned int i = 0; i < nTasks; ++i) {
0118           ++waiting;
0119           chain.push(group, [&count, &waiting] {
0120             ++count;
0121             --waiting;
0122           });
0123         }
0124         --waiting;
0125         std::shared_ptr<std::thread>(&pushThread, join_thread);
0126       }
0127       do {
0128         group.wait();
0129       } while (0 != waiting.load());
0130       REQUIRE(2 * nTasks == count);
0131     }
0132     while (chain.outstandingTasks() != 0)
0133       ;
0134   }
0135 }