Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:56

0001 //
0002 //  SerialTaskQueue_test.cpp
0003 //  DispatchProcessingDemo
0004 //
0005 //  Created by Chris Jones on 9/27/11.
0006 //
0007 
0008 #include <iostream>
0009 
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <unistd.h>
0012 #include <memory>
0013 #include <atomic>
0014 #include <thread>
0015 #include <iostream>
0016 #include "oneapi/tbb/task.h"
0017 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0018 
0019 class SerialTaskQueueChain_test : public CppUnit::TestFixture {
0020   CPPUNIT_TEST_SUITE(SerialTaskQueueChain_test);
0021   CPPUNIT_TEST(testPush);
0022   CPPUNIT_TEST(testPushOne);
0023   CPPUNIT_TEST(stressTest);
0024   CPPUNIT_TEST_SUITE_END();
0025 
0026 public:
0027   void testPush();
0028   void testPushOne();
0029   void stressTest();
0030   void setUp() {}
0031   void tearDown() {}
0032 };
0033 
0034 CPPUNIT_TEST_SUITE_REGISTRATION(SerialTaskQueueChain_test);
0035 
0036 void SerialTaskQueueChain_test::testPush() {
0037   std::atomic<unsigned int> count{0};
0038 
0039   std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0040                                                                std::make_shared<edm::SerialTaskQueue>()};
0041   edm::SerialTaskQueueChain chain(queues);
0042   {
0043     oneapi::tbb::task_group group;
0044     std::atomic<int> waiting{3};
0045     chain.push(group, [&count, &waiting] {
0046       CPPUNIT_ASSERT(count++ == 0);
0047       usleep(10);
0048       --waiting;
0049     });
0050 
0051     chain.push(group, [&count, &waiting] {
0052       CPPUNIT_ASSERT(count++ == 1);
0053       usleep(10);
0054       --waiting;
0055     });
0056 
0057     chain.push(group, [&count, &waiting] {
0058       CPPUNIT_ASSERT(count++ == 2);
0059       usleep(10);
0060       --waiting;
0061     });
0062 
0063     do {
0064       group.wait();
0065     } while (0 != waiting.load());
0066     CPPUNIT_ASSERT(count == 3);
0067     while (chain.outstandingTasks() != 0)
0068       ;
0069   }
0070 }
0071 
0072 void SerialTaskQueueChain_test::testPushOne() {
0073   std::atomic<unsigned int> count{0};
0074 
0075   std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>()};
0076   edm::SerialTaskQueueChain chain(queues);
0077   {
0078     oneapi::tbb::task_group group;
0079     std::atomic<int> waiting{3};
0080 
0081     chain.push(group, [&count, &waiting] {
0082       CPPUNIT_ASSERT(count++ == 0);
0083       usleep(10);
0084       --waiting;
0085     });
0086 
0087     chain.push(group, [&count, &waiting] {
0088       CPPUNIT_ASSERT(count++ == 1);
0089       usleep(10);
0090       --waiting;
0091     });
0092 
0093     chain.push(group, [&count, &waiting] {
0094       CPPUNIT_ASSERT(count++ == 2);
0095       usleep(10);
0096       --waiting;
0097     });
0098 
0099     do {
0100       group.wait();
0101     } while (0 != waiting.load());
0102     CPPUNIT_ASSERT(count == 3);
0103     while (chain.outstandingTasks() != 0)
0104       ;
0105   }
0106 }
0107 
0108 namespace {
0109   void join_thread(std::thread* iThread) {
0110     if (iThread->joinable()) {
0111       iThread->join();
0112     }
0113   }
0114 }  // namespace
0115 
0116 void SerialTaskQueueChain_test::stressTest() {
0117   std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0118                                                                std::make_shared<edm::SerialTaskQueue>()};
0119 
0120   edm::SerialTaskQueueChain chain(queues);
0121 
0122   unsigned int index = 100;
0123   const unsigned int nTasks = 1000;
0124   while (0 != --index) {
0125     oneapi::tbb::task_group group;
0126     std::atomic<int> waiting{2};
0127     std::atomic<unsigned int> count{0};
0128 
0129     std::atomic<bool> waitToStart{true};
0130     {
0131       std::thread pushThread([&chain, &waitToStart, &waiting, &group, &count] {
0132         while (waitToStart.load()) {
0133         };
0134         for (unsigned int i = 0; i < nTasks; ++i) {
0135           ++waiting;
0136           chain.push(group, [&count, &waiting] {
0137             ++count;
0138             --waiting;
0139           });
0140         }
0141         --waiting;
0142       });
0143 
0144       waitToStart = false;
0145       for (unsigned int i = 0; i < nTasks; ++i) {
0146         ++waiting;
0147         chain.push(group, [&count, &waiting] {
0148           ++count;
0149           --waiting;
0150         });
0151       }
0152       --waiting;
0153       std::shared_ptr<std::thread>(&pushThread, join_thread);
0154     }
0155     do {
0156       group.wait();
0157     } while (0 != waiting.load());
0158 
0159     CPPUNIT_ASSERT(2 * nTasks == count);
0160   }
0161   while (chain.outstandingTasks() != 0)
0162     ;
0163 }