Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-07-17 22:22:59

0001 //
0002 //  LimitedTaskQueue_test.cpp
0003 //  DispatchProcessingDemo
0004 //
0005 //  Created by Chris Jones on 9/27/11.
0006 //
0007 
0008 #include <iostream>
0009 #include <catch.hpp>
0010 #include <chrono>
0011 #include <memory>
0012 #include <atomic>
0013 #include <thread>
0014 #include "oneapi/tbb/task_arena.h"
0015 #include "FWCore/Concurrency/interface/WaitingTask.h"
0016 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0017 #include "FWCore/Concurrency/interface/FunctorTask.h"
0018 
0019 using namespace std::chrono_literals;
0020 
0021 TEST_CASE("LimitedTaskQueue", "[LimitedTaskQueue]") {
0022   SECTION("push") {
0023     {
0024       std::atomic<unsigned int> count{0};
0025       edm::LimitedTaskQueue queue{1};
0026       {
0027         std::atomic<int> waitingTasks{3};
0028         oneapi::tbb::task_group group;
0029         queue.push(group, [&count, &waitingTasks] {
0030           REQUIRE(count++ == 0u);
0031           std::this_thread::sleep_for(10us);
0032           --waitingTasks;
0033         });
0034         queue.push(group, [&count, &waitingTasks] {
0035           REQUIRE(count++ == 1u);
0036           std::this_thread::sleep_for(10us);
0037           --waitingTasks;
0038         });
0039         queue.push(group, [&count, &waitingTasks] {
0040           REQUIRE(count++ == 2u);
0041           std::this_thread::sleep_for(10us);
0042           --waitingTasks;
0043         });
0044         do {
0045           group.wait();
0046         } while (0 != waitingTasks.load());
0047         REQUIRE(count == 3u);
0048       }
0049     }
0050     {
0051       std::atomic<unsigned int> count{0};
0052       constexpr unsigned int kMax = 2;
0053       edm::LimitedTaskQueue queue{kMax};
0054       {
0055         std::atomic<int> waitingTasks{3};
0056         oneapi::tbb::task_group group;
0057         queue.push(group, [&count, &waitingTasks, kMax] {
0058           REQUIRE(count++ < kMax);
0059           std::this_thread::sleep_for(10us);
0060           --count;
0061           --waitingTasks;
0062         });
0063         queue.push(group, [&count, &waitingTasks, kMax] {
0064           REQUIRE(count++ < kMax);
0065           std::this_thread::sleep_for(10us);
0066           --count;
0067           --waitingTasks;
0068         });
0069         queue.push(group, [&count, &waitingTasks, kMax] {
0070           REQUIRE(count++ < kMax);
0071           std::this_thread::sleep_for(10us);
0072           --count;
0073           --waitingTasks;
0074         });
0075         do {
0076           group.wait();
0077         } while (0 != waitingTasks);
0078         REQUIRE(count == 0u);
0079       }
0080     }
0081   }
0082 
0083   SECTION("pause") {
0084     std::atomic<unsigned int> count{0};
0085     edm::LimitedTaskQueue queue{1};
0086     {
0087       {
0088         std::atomic<int> waitingTasks{3};
0089         oneapi::tbb::task_group group;
0090         edm::LimitedTaskQueue::Resumer resumer;
0091         std::atomic<bool> resumerSet{false};
0092         std::exception_ptr e1;
0093         queue.pushAndPause(
0094             group, [&resumer, &resumerSet, &count, &waitingTasks, &e1](edm::LimitedTaskQueue::Resumer iResumer) {
0095               resumer = std::move(iResumer);
0096               resumerSet = true;
0097               try {
0098                 REQUIRE(++count == 1u);
0099               } catch (...) {
0100                 e1 = std::current_exception();
0101               }
0102               --waitingTasks;
0103             });
0104         std::exception_ptr e2;
0105         queue.push(group, [&count, &waitingTasks, &e2] {
0106           try {
0107             REQUIRE(++count == 2u);
0108           } catch (...) {
0109             e2 = std::current_exception();
0110           }
0111           --waitingTasks;
0112         });
0113         std::exception_ptr e3;
0114         queue.push(group, [&count, &waitingTasks, &e3] {
0115           try {
0116             REQUIRE(++count == 3u);
0117           } catch (...) {
0118             e3 = std::current_exception();
0119           }
0120           --waitingTasks;
0121         });
0122         std::this_thread::sleep_for(100us);
0123         REQUIRE(2u >= count);
0124         while (not resumerSet) {
0125         }
0126         REQUIRE(resumer.resume());
0127         do {
0128           group.wait();
0129         } while (0 != waitingTasks.load());
0130         REQUIRE(count == 3u);
0131         if (e1) {
0132           std::rethrow_exception(e1);
0133         }
0134         if (e2) {
0135           std::rethrow_exception(e2);
0136         }
0137         if (e3) {
0138           std::rethrow_exception(e3);
0139         }
0140       }
0141     }
0142   }
0143 
0144   SECTION("stress test") {
0145     oneapi::tbb::task_group group;
0146     constexpr unsigned int kMax = 3;
0147     edm::LimitedTaskQueue queue{kMax};
0148     unsigned int index = 100;
0149     const unsigned int nTasks = 1000;
0150     //catch2 REQUIRE is not thread safe
0151     std::mutex mutex;
0152     while (0 != --index) {
0153       std::atomic<int> waiting{1};
0154       std::atomic<unsigned int> count{0};
0155       std::atomic<unsigned int> nRunningTasks{0};
0156       std::atomic<bool> waitToStart{true};
0157       {
0158         group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks, &mutex, kMax] {
0159           while (waitToStart) {
0160           }
0161           for (unsigned int i = 0; i < nTasks; ++i) {
0162             ++waiting;
0163             queue.push(group, [&count, &waiting, &nRunningTasks, &mutex, kMax] {
0164               std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0165               auto nrt = nRunningTasks++;
0166               if (nrt >= kMax) {
0167                 std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0168                 std::lock_guard lock{mutex};
0169                 REQUIRE(nrt < kMax);
0170               }
0171               ++count;
0172               --nRunningTasks;
0173             });
0174           }
0175         });
0176         group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks, &mutex, kMax] {
0177           waitToStart = false;
0178           for (unsigned int i = 0; i < nTasks; ++i) {
0179             ++waiting;
0180             queue.push(group, [&count, &waiting, &nRunningTasks, &mutex, kMax] {
0181               std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0182               auto nrt = nRunningTasks++;
0183               if (nrt >= kMax) {
0184                 std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0185                 std::lock_guard lock{mutex};
0186                 REQUIRE(nrt < kMax);
0187               }
0188               ++count;
0189               --nRunningTasks;
0190             });
0191           }
0192           --waiting;
0193         });
0194       }
0195       do {
0196         group.wait();
0197       } while (0 != waiting.load());
0198       REQUIRE(nRunningTasks == 0u);
0199       REQUIRE(2 * nTasks == count);
0200     }
0201   }
0202 }