Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-03 05:26:59

0001 //
0002 //  SerialTaskQueue_test.cpp
0003 //  DispatchProcessingDemo
0004 //
0005 //  Created by Chris Jones on 9/27/11.
0006 //
0007 
0008 #include <iostream>
0009 
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <chrono>
0012 #include <memory>
0013 #include <atomic>
0014 #include "oneapi/tbb/task_arena.h"
0015 #include "FWCore/Concurrency/interface/WaitingTask.h"
0016 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0017 #include "FWCore/Concurrency/interface/FunctorTask.h"
0018 
0019 class SerialTaskQueue_test : public CppUnit::TestFixture {
0020   CPPUNIT_TEST_SUITE(SerialTaskQueue_test);
0021   CPPUNIT_TEST(testPush);
0022   CPPUNIT_TEST(testPause);
0023   CPPUNIT_TEST(stressTest);
0024   CPPUNIT_TEST_SUITE_END();
0025 
0026 public:
0027   void testPush();
0028   void testPause();
0029   void stressTest();
0030   void setUp() {}
0031   void tearDown() {}
0032 };
0033 
0034 CPPUNIT_TEST_SUITE_REGISTRATION(SerialTaskQueue_test);
0035 using namespace std::chrono_literals;
0036 
0037 void SerialTaskQueue_test::testPush() {
0038   std::atomic<unsigned int> count{0};
0039 
0040   edm::SerialTaskQueue queue;
0041   {
0042     std::atomic<unsigned int> waitingTasks{3};
0043     oneapi::tbb::task_group group;
0044 
0045     queue.push(group, [&count, &waitingTasks] {
0046       CPPUNIT_ASSERT(count++ == 0);
0047       std::this_thread::sleep_for(10us);
0048       --waitingTasks;
0049     });
0050 
0051     queue.push(group, [&count, &waitingTasks] {
0052       CPPUNIT_ASSERT(count++ == 1);
0053       std::this_thread::sleep_for(10us);
0054       --waitingTasks;
0055     });
0056 
0057     queue.push(group, [&count, &waitingTasks] {
0058       CPPUNIT_ASSERT(count++ == 2);
0059       std::this_thread::sleep_for(10us);
0060       --waitingTasks;
0061     });
0062 
0063     do {
0064       group.wait();
0065     } while (0 != waitingTasks.load());
0066     CPPUNIT_ASSERT(count == 3);
0067   }
0068 }
0069 
0070 void SerialTaskQueue_test::testPause() {
0071   std::atomic<unsigned int> count{0};
0072 
0073   edm::SerialTaskQueue queue;
0074   {
0075     queue.pause();
0076     {
0077       std::atomic<unsigned int> waitingTasks{1};
0078       oneapi::tbb::task_group group;
0079 
0080       queue.push(group, [&count, &waitingTasks] {
0081         CPPUNIT_ASSERT(count++ == 0);
0082         --waitingTasks;
0083       });
0084       std::this_thread::sleep_for(1000us);
0085       CPPUNIT_ASSERT(0 == count);
0086       queue.resume();
0087       do {
0088         group.wait();
0089       } while (0 != waitingTasks.load());
0090       CPPUNIT_ASSERT(count == 1);
0091     }
0092 
0093     {
0094       std::atomic<unsigned int> waitingTasks{3};
0095       oneapi::tbb::task_group group;
0096 
0097       queue.push(group, [&count, &queue, &waitingTasks] {
0098         queue.pause();
0099         CPPUNIT_ASSERT(count++ == 1);
0100         --waitingTasks;
0101       });
0102       queue.push(group, [&count, &waitingTasks] {
0103         CPPUNIT_ASSERT(count++ == 2);
0104         --waitingTasks;
0105       });
0106       queue.push(group, [&count, &waitingTasks] {
0107         CPPUNIT_ASSERT(count++ == 3);
0108         --waitingTasks;
0109       });
0110       std::this_thread::sleep_for(100us);
0111       //can't do == since the queue may not have processed the first task yet
0112       CPPUNIT_ASSERT(2 >= count);
0113       queue.resume();
0114       do {
0115         group.wait();
0116       } while (0 != waitingTasks.load());
0117       CPPUNIT_ASSERT(count == 4);
0118     }
0119   }
0120 }
0121 
0122 void SerialTaskQueue_test::stressTest() {
0123   //note group needs to live longer than queue
0124   oneapi::tbb::task_group group;
0125   edm::SerialTaskQueue queue;
0126 
0127   unsigned int index = 100;
0128   const unsigned int nTasks = 1000;
0129   while (0 != --index) {
0130     std::atomic<unsigned int> waitingTasks{2};
0131     std::atomic<unsigned int> count{0};
0132 
0133     std::atomic<bool> waitToStart{true};
0134     {
0135       group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0136         while (waitToStart.load()) {
0137         }
0138         for (unsigned int i = 0; i < nTasks; ++i) {
0139           ++waitingTasks;
0140           queue.push(group, [&count, &waitingTasks] {
0141             ++count;
0142             --waitingTasks;
0143           });
0144         }
0145 
0146         --waitingTasks;
0147       });
0148 
0149       group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0150         waitToStart = false;
0151         for (unsigned int i = 0; i < nTasks; ++i) {
0152           ++waitingTasks;
0153           queue.push(group, [&count, &waitingTasks] {
0154             ++count;
0155             --waitingTasks;
0156           });
0157         }
0158         --waitingTasks;
0159       });
0160     }
0161     do {
0162       group.wait();
0163     } while (0 != waitingTasks.load());
0164 
0165     CPPUNIT_ASSERT(2 * nTasks == count);
0166   }
0167 }
0168 
0169 #include <Utilities/Testing/interface/CppUnit_testdriver.icpp>