Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:56

0001 //
0002 //  WaitingTaskList_test.cpp
0003 //  DispatchProcessingDemo
0004 //
0005 //  Created by Chris Jones on 9/27/11.
0006 //
0007 
0008 #include <iostream>
0009 
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <unistd.h>
0012 #include <memory>
0013 #include <atomic>
0014 #include <thread>
0015 #include "oneapi/tbb/task.h"
0016 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0017 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0018 
0019 class WaitingTaskList_test : public CppUnit::TestFixture {
0020   CPPUNIT_TEST_SUITE(WaitingTaskList_test);
0021   CPPUNIT_TEST(addThenDone);
0022   CPPUNIT_TEST(doneThenAdd);
0023   CPPUNIT_TEST(addThenDoneFailed);
0024   CPPUNIT_TEST(doneThenAddFailed);
0025   CPPUNIT_TEST(stressTest);
0026   CPPUNIT_TEST_SUITE_END();
0027 
0028 public:
0029   void addThenDone();
0030   void doneThenAdd();
0031   void addThenDoneFailed();
0032   void doneThenAddFailed();
0033   void stressTest();
0034   void setUp() {}
0035   void tearDown() {}
0036 };
0037 
0038 namespace {
0039   class TestCalledTask : public edm::WaitingTask {
0040   public:
0041     TestCalledTask(std::atomic<bool>& iCalled, std::exception_ptr& iPtr) : m_called(iCalled), m_ptr(iPtr) {}
0042 
0043     void execute() final {
0044       if (exceptionPtr()) {
0045         m_ptr = exceptionPtr();
0046       }
0047       m_called = true;
0048       return;
0049     }
0050 
0051   private:
0052     std::atomic<bool>& m_called;
0053     std::exception_ptr& m_ptr;
0054   };
0055 
0056   class TestValueSetTask : public edm::WaitingTask {
0057   public:
0058     TestValueSetTask(std::atomic<bool>& iValue) : m_value(iValue) {}
0059     void execute() final {
0060       CPPUNIT_ASSERT(m_value);
0061       return;
0062     }
0063 
0064   private:
0065     std::atomic<bool>& m_value;
0066   };
0067 
0068 }  // namespace
0069 
0070 void WaitingTaskList_test::addThenDone() {
0071   std::atomic<bool> called{false};
0072 
0073   edm::WaitingTaskList waitList;
0074   {
0075     std::exception_ptr excPtr;
0076 
0077     auto t = new TestCalledTask{called, excPtr};
0078 
0079     oneapi::tbb::task_group group;
0080     waitList.add(edm::WaitingTaskHolder(group, t));
0081 
0082     usleep(10);
0083     CPPUNIT_ASSERT(false == called);
0084 
0085     waitList.doneWaiting(std::exception_ptr{});
0086     group.wait();
0087     CPPUNIT_ASSERT(true == called);
0088     CPPUNIT_ASSERT(bool(excPtr) == false);
0089   }
0090 
0091   waitList.reset();
0092   called = false;
0093 
0094   {
0095     std::exception_ptr excPtr;
0096 
0097     auto t = new TestCalledTask{called, excPtr};
0098 
0099     oneapi::tbb::task_group group;
0100 
0101     waitList.add(edm::WaitingTaskHolder(group, t));
0102 
0103     usleep(10);
0104     CPPUNIT_ASSERT(false == called);
0105 
0106     waitList.doneWaiting(std::exception_ptr{});
0107     group.wait();
0108     CPPUNIT_ASSERT(true == called);
0109     CPPUNIT_ASSERT(bool(excPtr) == false);
0110   }
0111 }
0112 
0113 void WaitingTaskList_test::doneThenAdd() {
0114   std::atomic<bool> called{false};
0115   std::exception_ptr excPtr;
0116 
0117   edm::WaitingTaskList waitList;
0118   {
0119     oneapi::tbb::task_group group;
0120 
0121     auto t = new TestCalledTask{called, excPtr};
0122 
0123     waitList.doneWaiting(std::exception_ptr{});
0124 
0125     waitList.add(edm::WaitingTaskHolder(group, t));
0126     group.wait();
0127     CPPUNIT_ASSERT(true == called);
0128     CPPUNIT_ASSERT(bool(excPtr) == false);
0129   }
0130 }
0131 
0132 void WaitingTaskList_test::addThenDoneFailed() {
0133   std::atomic<bool> called{false};
0134 
0135   edm::WaitingTaskList waitList;
0136   {
0137     std::exception_ptr excPtr;
0138 
0139     auto t = new TestCalledTask{called, excPtr};
0140 
0141     oneapi::tbb::task_group group;
0142 
0143     waitList.add(edm::WaitingTaskHolder(group, t));
0144 
0145     usleep(10);
0146     CPPUNIT_ASSERT(false == called);
0147 
0148     waitList.doneWaiting(std::make_exception_ptr(std::string("failed")));
0149     group.wait();
0150     CPPUNIT_ASSERT(true == called);
0151     CPPUNIT_ASSERT(bool(excPtr) == true);
0152   }
0153 }
0154 
0155 void WaitingTaskList_test::doneThenAddFailed() {
0156   std::atomic<bool> called{false};
0157   std::exception_ptr excPtr;
0158 
0159   edm::WaitingTaskList waitList;
0160   {
0161     auto t = new TestCalledTask{called, excPtr};
0162 
0163     waitList.doneWaiting(std::make_exception_ptr(std::string("failed")));
0164 
0165     oneapi::tbb::task_group group;
0166 
0167     waitList.add(edm::WaitingTaskHolder(group, t));
0168     group.wait();
0169     CPPUNIT_ASSERT(true == called);
0170     CPPUNIT_ASSERT(bool(excPtr) == true);
0171   }
0172 }
0173 
0174 namespace {
0175   void join_thread(std::thread* iThread) {
0176     if (iThread->joinable()) {
0177       iThread->join();
0178     }
0179   }
0180 }  // namespace
0181 
0182 void WaitingTaskList_test::stressTest() {
0183   edm::WaitingTaskList waitList;
0184   oneapi::tbb::task_group group;
0185 
0186   unsigned int index = 1000;
0187   const unsigned int nTasks = 10000;
0188   while (0 != --index) {
0189     edm::FinalWaitingTask waitTask{group};
0190     auto* pWaitTask = &waitTask;
0191     {
0192       edm::WaitingTaskHolder waitTaskH(group, pWaitTask);
0193       std::thread makeTasksThread([&waitList, waitTaskH] {
0194         for (unsigned int i = 0; i < nTasks; ++i) {
0195           waitList.add(waitTaskH);
0196         }
0197       });
0198       std::shared_ptr<std::thread>(&makeTasksThread, join_thread);
0199 
0200       std::thread doneWaitThread([&waitList, waitTaskH] { waitList.doneWaiting(std::exception_ptr{}); });
0201       std::shared_ptr<std::thread>(&doneWaitThread, join_thread);
0202     }
0203     waitTask.wait();
0204   }
0205 }
0206 
0207 CPPUNIT_TEST_SUITE_REGISTRATION(WaitingTaskList_test);