Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-03 05:26:59

0001 //
0002 //  LimitedTaskQueue_test.cpp
0003 //  DispatchProcessingDemo
0004 //
0005 //  Created by Chris Jones on 9/27/11.
0006 //
0007 
0008 #include <iostream>
0009 
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <chrono>
0012 #include <memory>
0013 #include <atomic>
0014 #include "oneapi/tbb/task_arena.h"
0015 #include "FWCore/Concurrency/interface/WaitingTask.h"
0016 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0017 #include "FWCore/Concurrency/interface/FunctorTask.h"
0018 
0019 class LimitedTaskQueue_test : public CppUnit::TestFixture {
0020   CPPUNIT_TEST_SUITE(LimitedTaskQueue_test);
0021   CPPUNIT_TEST(testPush);
0022   CPPUNIT_TEST(testPause);
0023   CPPUNIT_TEST(stressTest);
0024   CPPUNIT_TEST_SUITE_END();
0025 
0026 public:
0027   void testPush();
0028   void testPause();
0029   void stressTest();
0030   void setUp() {}
0031   void tearDown() {}
0032 };
0033 
0034 CPPUNIT_TEST_SUITE_REGISTRATION(LimitedTaskQueue_test);
0035 using namespace std::chrono_literals;
0036 
0037 void LimitedTaskQueue_test::testPush() {
0038   {
0039     std::atomic<unsigned int> count{0};
0040 
0041     edm::LimitedTaskQueue queue{1};
0042     {
0043       std::atomic<int> waitingTasks{3};
0044       oneapi::tbb::task_group group;
0045 
0046       queue.push(group, [&count, &waitingTasks] {
0047         CPPUNIT_ASSERT(count++ == 0);
0048         std::this_thread::sleep_for(10us);
0049         --waitingTasks;
0050       });
0051 
0052       queue.push(group, [&count, &waitingTasks] {
0053         CPPUNIT_ASSERT(count++ == 1);
0054         std::this_thread::sleep_for(10us);
0055         --waitingTasks;
0056       });
0057 
0058       queue.push(group, [&count, &waitingTasks] {
0059         CPPUNIT_ASSERT(count++ == 2);
0060         std::this_thread::sleep_for(10us);
0061         --waitingTasks;
0062       });
0063 
0064       do {
0065         group.wait();
0066       } while (0 != waitingTasks.load());
0067       CPPUNIT_ASSERT(count == 3);
0068     }
0069   }
0070 
0071   {
0072     std::atomic<unsigned int> count{0};
0073 
0074     constexpr unsigned int kMax = 2;
0075     edm::LimitedTaskQueue queue{kMax};
0076     {
0077       std::atomic<int> waitingTasks{3};
0078       oneapi::tbb::task_group group;
0079 
0080       queue.push(group, [&count, &waitingTasks] {
0081         CPPUNIT_ASSERT(count++ < kMax);
0082         std::this_thread::sleep_for(10us);
0083         --count;
0084         --waitingTasks;
0085       });
0086 
0087       queue.push(group, [&count, &waitingTasks] {
0088         CPPUNIT_ASSERT(count++ < kMax);
0089         std::this_thread::sleep_for(10us);
0090         --count;
0091         --waitingTasks;
0092       });
0093 
0094       queue.push(group, [&count, &waitingTasks] {
0095         CPPUNIT_ASSERT(count++ < kMax);
0096         std::this_thread::sleep_for(10us);
0097         --count;
0098         --waitingTasks;
0099       });
0100 
0101       do {
0102         group.wait();
0103       } while (0 != waitingTasks);
0104       CPPUNIT_ASSERT(count == 0);
0105     }
0106   }
0107 }
0108 
0109 void LimitedTaskQueue_test::testPause() {
0110   std::atomic<unsigned int> count{0};
0111 
0112   edm::LimitedTaskQueue queue{1};
0113   {
0114     {
0115       std::atomic<int> waitingTasks{3};
0116       oneapi::tbb::task_group group;
0117 
0118       edm::LimitedTaskQueue::Resumer resumer;
0119       std::atomic<bool> resumerSet{false};
0120       std::exception_ptr e1;
0121       queue.pushAndPause(group,
0122                          [&resumer, &resumerSet, &count, &waitingTasks, &e1](edm::LimitedTaskQueue::Resumer iResumer) {
0123                            resumer = std::move(iResumer);
0124                            resumerSet = true;
0125                            try {
0126                              CPPUNIT_ASSERT(++count == 1);
0127                            } catch (...) {
0128                              e1 = std::current_exception();
0129                            }
0130                            --waitingTasks;
0131                          });
0132 
0133       std::exception_ptr e2;
0134       queue.push(group, [&count, &waitingTasks, &e2] {
0135         try {
0136           CPPUNIT_ASSERT(++count == 2);
0137         } catch (...) {
0138           e2 = std::current_exception();
0139         }
0140         --waitingTasks;
0141       });
0142 
0143       std::exception_ptr e3;
0144       queue.push(group, [&count, &waitingTasks, &e3] {
0145         try {
0146           CPPUNIT_ASSERT(++count == 3);
0147         } catch (...) {
0148           e3 = std::current_exception();
0149         }
0150         --waitingTasks;
0151       });
0152       std::this_thread::sleep_for(100us);
0153       //can't do == since the queue may not have processed the first task yet
0154       CPPUNIT_ASSERT(2 >= count);
0155       while (not resumerSet) {
0156       }
0157       CPPUNIT_ASSERT(resumer.resume());
0158       do {
0159         group.wait();
0160       } while (0 != waitingTasks.load());
0161       CPPUNIT_ASSERT(count == 3);
0162       if (e1) {
0163         std::rethrow_exception(e1);
0164       }
0165       if (e2) {
0166         std::rethrow_exception(e2);
0167       }
0168       if (e3) {
0169         std::rethrow_exception(e3);
0170       }
0171     }
0172   }
0173 }
0174 
0175 void LimitedTaskQueue_test::stressTest() {
0176   //NOTE: group needs to last longer than queue
0177   oneapi::tbb::task_group group;
0178 
0179   constexpr unsigned int kMax = 3;
0180   edm::LimitedTaskQueue queue{kMax};
0181 
0182   unsigned int index = 100;
0183   const unsigned int nTasks = 1000;
0184   while (0 != --index) {
0185     std::atomic<int> waiting{1};
0186     std::atomic<unsigned int> count{0};
0187     std::atomic<unsigned int> nRunningTasks{0};
0188 
0189     std::atomic<bool> waitToStart{true};
0190     {
0191       group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks] {
0192         while (waitToStart) {
0193         };
0194         for (unsigned int i = 0; i < nTasks; ++i) {
0195           ++waiting;
0196           queue.push(group, [&count, &waiting, &nRunningTasks] {
0197             std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0198             auto nrt = nRunningTasks++;
0199             if (nrt >= kMax) {
0200               std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0201             }
0202             CPPUNIT_ASSERT(nrt < kMax);
0203             ++count;
0204             --nRunningTasks;
0205           });
0206         }
0207       });
0208 
0209       group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks] {
0210         waitToStart = false;
0211         for (unsigned int i = 0; i < nTasks; ++i) {
0212           ++waiting;
0213           queue.push(group, [&count, &waiting, &nRunningTasks] {
0214             std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0215             auto nrt = nRunningTasks++;
0216             if (nrt >= kMax) {
0217               std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0218             }
0219             CPPUNIT_ASSERT(nrt < kMax);
0220             ++count;
0221             --nRunningTasks;
0222           });
0223         }
0224         --waiting;
0225       });
0226     }
0227     do {
0228       group.wait();
0229     } while (0 != waiting.load());
0230 
0231     CPPUNIT_ASSERT(0 == nRunningTasks);
0232     CPPUNIT_ASSERT(2 * nTasks == count);
0233   }
0234 }