Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:56

0001 //
0002 //  LimitedTaskQueue_test.cpp
0003 //  DispatchProcessingDemo
0004 //
0005 //  Created by Chris Jones on 9/27/11.
0006 //
0007 
0008 #include <iostream>
0009 
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <unistd.h>
0012 #include <memory>
0013 #include <atomic>
0014 #include "oneapi/tbb/task_arena.h"
0015 #include "FWCore/Concurrency/interface/WaitingTask.h"
0016 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0017 #include "FWCore/Concurrency/interface/FunctorTask.h"
0018 
0019 class LimitedTaskQueue_test : public CppUnit::TestFixture {
0020   CPPUNIT_TEST_SUITE(LimitedTaskQueue_test);
0021   CPPUNIT_TEST(testPush);
0022   CPPUNIT_TEST(testPause);
0023   CPPUNIT_TEST(stressTest);
0024   CPPUNIT_TEST_SUITE_END();
0025 
0026 public:
0027   void testPush();
0028   void testPause();
0029   void stressTest();
0030   void setUp() {}
0031   void tearDown() {}
0032 };
0033 
0034 CPPUNIT_TEST_SUITE_REGISTRATION(LimitedTaskQueue_test);
0035 
0036 void LimitedTaskQueue_test::testPush() {
0037   {
0038     std::atomic<unsigned int> count{0};
0039 
0040     edm::LimitedTaskQueue queue{1};
0041     {
0042       std::atomic<int> waitingTasks{3};
0043       oneapi::tbb::task_group group;
0044 
0045       queue.push(group, [&count, &waitingTasks] {
0046         CPPUNIT_ASSERT(count++ == 0);
0047         usleep(10);
0048         --waitingTasks;
0049       });
0050 
0051       queue.push(group, [&count, &waitingTasks] {
0052         CPPUNIT_ASSERT(count++ == 1);
0053         usleep(10);
0054         --waitingTasks;
0055       });
0056 
0057       queue.push(group, [&count, &waitingTasks] {
0058         CPPUNIT_ASSERT(count++ == 2);
0059         usleep(10);
0060         --waitingTasks;
0061       });
0062 
0063       do {
0064         group.wait();
0065       } while (0 != waitingTasks.load());
0066       CPPUNIT_ASSERT(count == 3);
0067     }
0068   }
0069 
0070   {
0071     std::atomic<unsigned int> count{0};
0072 
0073     constexpr unsigned int kMax = 2;
0074     edm::LimitedTaskQueue queue{kMax};
0075     {
0076       std::atomic<int> waitingTasks{3};
0077       oneapi::tbb::task_group group;
0078 
0079       queue.push(group, [&count, &waitingTasks] {
0080         CPPUNIT_ASSERT(count++ < kMax);
0081         usleep(10);
0082         --count;
0083         --waitingTasks;
0084       });
0085 
0086       queue.push(group, [&count, &waitingTasks] {
0087         CPPUNIT_ASSERT(count++ < kMax);
0088         usleep(10);
0089         --count;
0090         --waitingTasks;
0091       });
0092 
0093       queue.push(group, [&count, &waitingTasks] {
0094         CPPUNIT_ASSERT(count++ < kMax);
0095         usleep(10);
0096         --count;
0097         --waitingTasks;
0098       });
0099 
0100       do {
0101         group.wait();
0102       } while (0 != waitingTasks);
0103       CPPUNIT_ASSERT(count == 0);
0104     }
0105   }
0106 }
0107 
0108 void LimitedTaskQueue_test::testPause() {
0109   std::atomic<unsigned int> count{0};
0110 
0111   edm::LimitedTaskQueue queue{1};
0112   {
0113     {
0114       std::atomic<int> waitingTasks{3};
0115       oneapi::tbb::task_group group;
0116 
0117       edm::LimitedTaskQueue::Resumer resumer;
0118       std::atomic<bool> resumerSet{false};
0119       std::exception_ptr e1;
0120       queue.pushAndPause(group,
0121                          [&resumer, &resumerSet, &count, &waitingTasks, &e1](edm::LimitedTaskQueue::Resumer iResumer) {
0122                            resumer = std::move(iResumer);
0123                            resumerSet = true;
0124                            try {
0125                              CPPUNIT_ASSERT(++count == 1);
0126                            } catch (...) {
0127                              e1 = std::current_exception();
0128                            }
0129                            --waitingTasks;
0130                          });
0131 
0132       std::exception_ptr e2;
0133       queue.push(group, [&count, &waitingTasks, &e2] {
0134         try {
0135           CPPUNIT_ASSERT(++count == 2);
0136         } catch (...) {
0137           e2 = std::current_exception();
0138         }
0139         --waitingTasks;
0140       });
0141 
0142       std::exception_ptr e3;
0143       queue.push(group, [&count, &waitingTasks, &e3] {
0144         try {
0145           CPPUNIT_ASSERT(++count == 3);
0146         } catch (...) {
0147           e3 = std::current_exception();
0148         }
0149         --waitingTasks;
0150       });
0151       usleep(100);
0152       //can't do == since the queue may not have processed the first task yet
0153       CPPUNIT_ASSERT(2 >= count);
0154       while (not resumerSet) {
0155       }
0156       CPPUNIT_ASSERT(resumer.resume());
0157       do {
0158         group.wait();
0159       } while (0 != waitingTasks.load());
0160       CPPUNIT_ASSERT(count == 3);
0161       if (e1) {
0162         std::rethrow_exception(e1);
0163       }
0164       if (e2) {
0165         std::rethrow_exception(e2);
0166       }
0167       if (e3) {
0168         std::rethrow_exception(e3);
0169       }
0170     }
0171   }
0172 }
0173 
0174 void LimitedTaskQueue_test::stressTest() {
0175   //NOTE: group needs to last longer than queue
0176   oneapi::tbb::task_group group;
0177 
0178   constexpr unsigned int kMax = 3;
0179   edm::LimitedTaskQueue queue{kMax};
0180 
0181   unsigned int index = 100;
0182   const unsigned int nTasks = 1000;
0183   while (0 != --index) {
0184     std::atomic<int> waiting{1};
0185     std::atomic<unsigned int> count{0};
0186     std::atomic<unsigned int> nRunningTasks{0};
0187 
0188     std::atomic<bool> waitToStart{true};
0189     {
0190       group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks] {
0191         while (waitToStart) {
0192         };
0193         for (unsigned int i = 0; i < nTasks; ++i) {
0194           ++waiting;
0195           queue.push(group, [&count, &waiting, &nRunningTasks] {
0196             std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0197             auto nrt = nRunningTasks++;
0198             if (nrt >= kMax) {
0199               std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0200             }
0201             CPPUNIT_ASSERT(nrt < kMax);
0202             ++count;
0203             --nRunningTasks;
0204           });
0205         }
0206       });
0207 
0208       group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks] {
0209         waitToStart = false;
0210         for (unsigned int i = 0; i < nTasks; ++i) {
0211           ++waiting;
0212           queue.push(group, [&count, &waiting, &nRunningTasks] {
0213             std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
0214             auto nrt = nRunningTasks++;
0215             if (nrt >= kMax) {
0216               std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
0217             }
0218             CPPUNIT_ASSERT(nrt < kMax);
0219             ++count;
0220             --nRunningTasks;
0221           });
0222         }
0223         --waiting;
0224       });
0225     }
0226     do {
0227       group.wait();
0228     } while (0 != waiting.load());
0229 
0230     CPPUNIT_ASSERT(0 == nRunningTasks);
0231     CPPUNIT_ASSERT(2 * nTasks == count);
0232   }
0233 }