Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-03 05:26:59

0001 //
0002 //  SerialTaskQueue_test.cpp
0003 //  DispatchProcessingDemo
0004 //
0005 //  Created by Chris Jones on 9/27/11.
0006 //
0007 
0008 #include <iostream>
0009 
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <chrono>
0012 #include <memory>
0013 #include <atomic>
0014 #include <thread>
0015 #include <iostream>
0016 #include "oneapi/tbb/task.h"
0017 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0018 
0019 class SerialTaskQueueChain_test : public CppUnit::TestFixture {
0020   CPPUNIT_TEST_SUITE(SerialTaskQueueChain_test);
0021   CPPUNIT_TEST(testPush);
0022   CPPUNIT_TEST(testPushOne);
0023   CPPUNIT_TEST(stressTest);
0024   CPPUNIT_TEST_SUITE_END();
0025 
0026 public:
0027   void testPush();
0028   void testPushOne();
0029   void stressTest();
0030   void setUp() {}
0031   void tearDown() {}
0032 };
0033 
0034 CPPUNIT_TEST_SUITE_REGISTRATION(SerialTaskQueueChain_test);
0035 using namespace std::chrono_literals;
0036 
0037 void SerialTaskQueueChain_test::testPush() {
0038   std::atomic<unsigned int> count{0};
0039 
0040   std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0041                                                                std::make_shared<edm::SerialTaskQueue>()};
0042   edm::SerialTaskQueueChain chain(queues);
0043   {
0044     oneapi::tbb::task_group group;
0045     std::atomic<int> waiting{3};
0046     chain.push(group, [&count, &waiting] {
0047       CPPUNIT_ASSERT(count++ == 0);
0048       std::this_thread::sleep_for(10us);
0049       --waiting;
0050     });
0051 
0052     chain.push(group, [&count, &waiting] {
0053       CPPUNIT_ASSERT(count++ == 1);
0054       std::this_thread::sleep_for(10us);
0055       --waiting;
0056     });
0057 
0058     chain.push(group, [&count, &waiting] {
0059       CPPUNIT_ASSERT(count++ == 2);
0060       std::this_thread::sleep_for(10us);
0061       --waiting;
0062     });
0063 
0064     do {
0065       group.wait();
0066     } while (0 != waiting.load());
0067     CPPUNIT_ASSERT(count == 3);
0068     while (chain.outstandingTasks() != 0)
0069       ;
0070   }
0071 }
0072 
0073 void SerialTaskQueueChain_test::testPushOne() {
0074   std::atomic<unsigned int> count{0};
0075 
0076   std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>()};
0077   edm::SerialTaskQueueChain chain(queues);
0078   {
0079     oneapi::tbb::task_group group;
0080     std::atomic<int> waiting{3};
0081 
0082     chain.push(group, [&count, &waiting] {
0083       CPPUNIT_ASSERT(count++ == 0);
0084       std::this_thread::sleep_for(10us);
0085       --waiting;
0086     });
0087 
0088     chain.push(group, [&count, &waiting] {
0089       CPPUNIT_ASSERT(count++ == 1);
0090       std::this_thread::sleep_for(10us);
0091       --waiting;
0092     });
0093 
0094     chain.push(group, [&count, &waiting] {
0095       CPPUNIT_ASSERT(count++ == 2);
0096       std::this_thread::sleep_for(10us);
0097       --waiting;
0098     });
0099 
0100     do {
0101       group.wait();
0102     } while (0 != waiting.load());
0103     CPPUNIT_ASSERT(count == 3);
0104     while (chain.outstandingTasks() != 0)
0105       ;
0106   }
0107 }
0108 
0109 namespace {
0110   void join_thread(std::thread* iThread) {
0111     if (iThread->joinable()) {
0112       iThread->join();
0113     }
0114   }
0115 }  // namespace
0116 
0117 void SerialTaskQueueChain_test::stressTest() {
0118   std::vector<std::shared_ptr<edm::SerialTaskQueue>> queues = {std::make_shared<edm::SerialTaskQueue>(),
0119                                                                std::make_shared<edm::SerialTaskQueue>()};
0120 
0121   edm::SerialTaskQueueChain chain(queues);
0122 
0123   unsigned int index = 100;
0124   const unsigned int nTasks = 1000;
0125   while (0 != --index) {
0126     oneapi::tbb::task_group group;
0127     std::atomic<int> waiting{2};
0128     std::atomic<unsigned int> count{0};
0129 
0130     std::atomic<bool> waitToStart{true};
0131     {
0132       std::thread pushThread([&chain, &waitToStart, &waiting, &group, &count] {
0133         while (waitToStart.load()) {
0134         };
0135         for (unsigned int i = 0; i < nTasks; ++i) {
0136           ++waiting;
0137           chain.push(group, [&count, &waiting] {
0138             ++count;
0139             --waiting;
0140           });
0141         }
0142         --waiting;
0143       });
0144 
0145       waitToStart = false;
0146       for (unsigned int i = 0; i < nTasks; ++i) {
0147         ++waiting;
0148         chain.push(group, [&count, &waiting] {
0149           ++count;
0150           --waiting;
0151         });
0152       }
0153       --waiting;
0154       std::shared_ptr<std::thread>(&pushThread, join_thread);
0155     }
0156     do {
0157       group.wait();
0158     } while (0 != waiting.load());
0159 
0160     CPPUNIT_ASSERT(2 * nTasks == count);
0161   }
0162   while (chain.outstandingTasks() != 0)
0163     ;
0164 }