Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:56

0001 //
0002 //  SerialTaskQueue_test.cpp
0003 //  DispatchProcessingDemo
0004 //
0005 //  Created by Chris Jones on 9/27/11.
0006 //
0007 
0008 #include <iostream>
0009 
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <unistd.h>
0012 #include <memory>
0013 #include <atomic>
0014 #include "oneapi/tbb/task_arena.h"
0015 #include "FWCore/Concurrency/interface/WaitingTask.h"
0016 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0017 #include "FWCore/Concurrency/interface/FunctorTask.h"
0018 
0019 class SerialTaskQueue_test : public CppUnit::TestFixture {
0020   CPPUNIT_TEST_SUITE(SerialTaskQueue_test);
0021   CPPUNIT_TEST(testPush);
0022   CPPUNIT_TEST(testPause);
0023   CPPUNIT_TEST(stressTest);
0024   CPPUNIT_TEST_SUITE_END();
0025 
0026 public:
0027   void testPush();
0028   void testPause();
0029   void stressTest();
0030   void setUp() {}
0031   void tearDown() {}
0032 };
0033 
0034 CPPUNIT_TEST_SUITE_REGISTRATION(SerialTaskQueue_test);
0035 
0036 void SerialTaskQueue_test::testPush() {
0037   std::atomic<unsigned int> count{0};
0038 
0039   edm::SerialTaskQueue queue;
0040   {
0041     std::atomic<unsigned int> waitingTasks{3};
0042     oneapi::tbb::task_group group;
0043 
0044     queue.push(group, [&count, &waitingTasks] {
0045       CPPUNIT_ASSERT(count++ == 0);
0046       usleep(10);
0047       --waitingTasks;
0048     });
0049 
0050     queue.push(group, [&count, &waitingTasks] {
0051       CPPUNIT_ASSERT(count++ == 1);
0052       usleep(10);
0053       --waitingTasks;
0054     });
0055 
0056     queue.push(group, [&count, &waitingTasks] {
0057       CPPUNIT_ASSERT(count++ == 2);
0058       usleep(10);
0059       --waitingTasks;
0060     });
0061 
0062     do {
0063       group.wait();
0064     } while (0 != waitingTasks.load());
0065     CPPUNIT_ASSERT(count == 3);
0066   }
0067 }
0068 
0069 void SerialTaskQueue_test::testPause() {
0070   std::atomic<unsigned int> count{0};
0071 
0072   edm::SerialTaskQueue queue;
0073   {
0074     queue.pause();
0075     {
0076       std::atomic<unsigned int> waitingTasks{1};
0077       oneapi::tbb::task_group group;
0078 
0079       queue.push(group, [&count, &waitingTasks] {
0080         CPPUNIT_ASSERT(count++ == 0);
0081         --waitingTasks;
0082       });
0083       usleep(1000);
0084       CPPUNIT_ASSERT(0 == count);
0085       queue.resume();
0086       do {
0087         group.wait();
0088       } while (0 != waitingTasks.load());
0089       CPPUNIT_ASSERT(count == 1);
0090     }
0091 
0092     {
0093       std::atomic<unsigned int> waitingTasks{3};
0094       oneapi::tbb::task_group group;
0095 
0096       queue.push(group, [&count, &queue, &waitingTasks] {
0097         queue.pause();
0098         CPPUNIT_ASSERT(count++ == 1);
0099         --waitingTasks;
0100       });
0101       queue.push(group, [&count, &waitingTasks] {
0102         CPPUNIT_ASSERT(count++ == 2);
0103         --waitingTasks;
0104       });
0105       queue.push(group, [&count, &waitingTasks] {
0106         CPPUNIT_ASSERT(count++ == 3);
0107         --waitingTasks;
0108       });
0109       usleep(100);
0110       //can't do == since the queue may not have processed the first task yet
0111       CPPUNIT_ASSERT(2 >= count);
0112       queue.resume();
0113       do {
0114         group.wait();
0115       } while (0 != waitingTasks.load());
0116       CPPUNIT_ASSERT(count == 4);
0117     }
0118   }
0119 }
0120 
0121 void SerialTaskQueue_test::stressTest() {
0122   //note group needs to live longer than queue
0123   oneapi::tbb::task_group group;
0124   edm::SerialTaskQueue queue;
0125 
0126   unsigned int index = 100;
0127   const unsigned int nTasks = 1000;
0128   while (0 != --index) {
0129     std::atomic<unsigned int> waitingTasks{2};
0130     std::atomic<unsigned int> count{0};
0131 
0132     std::atomic<bool> waitToStart{true};
0133     {
0134       group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0135         while (waitToStart.load()) {
0136         }
0137         for (unsigned int i = 0; i < nTasks; ++i) {
0138           ++waitingTasks;
0139           queue.push(group, [&count, &waitingTasks] {
0140             ++count;
0141             --waitingTasks;
0142           });
0143         }
0144 
0145         --waitingTasks;
0146       });
0147 
0148       group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
0149         waitToStart = false;
0150         for (unsigned int i = 0; i < nTasks; ++i) {
0151           ++waitingTasks;
0152           queue.push(group, [&count, &waitingTasks] {
0153             ++count;
0154             --waitingTasks;
0155           });
0156         }
0157         --waitingTasks;
0158       });
0159     }
0160     do {
0161       group.wait();
0162     } while (0 != waitingTasks.load());
0163 
0164     CPPUNIT_ASSERT(2 * nTasks == count);
0165   }
0166 }
0167 
0168 #include <Utilities/Testing/interface/CppUnit_testdriver.icpp>