Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-03 05:26:59

0001 //
0002 //  WaitingTaskList_test.cpp
0003 //  DispatchProcessingDemo
0004 //
0005 //  Created by Chris Jones on 9/27/11.
0006 //
0007 
0008 #include <iostream>
0009 
0010 #include <cppunit/extensions/HelperMacros.h>
0011 #include <chrono>
0012 #include <memory>
0013 #include <atomic>
0014 #include <thread>
0015 #include "oneapi/tbb/task.h"
0016 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0017 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0018 
0019 class WaitingTaskList_test : public CppUnit::TestFixture {
0020   CPPUNIT_TEST_SUITE(WaitingTaskList_test);
0021   CPPUNIT_TEST(addThenDone);
0022   CPPUNIT_TEST(doneThenAdd);
0023   CPPUNIT_TEST(addThenDoneFailed);
0024   CPPUNIT_TEST(doneThenAddFailed);
0025   CPPUNIT_TEST(stressTest);
0026   CPPUNIT_TEST_SUITE_END();
0027 
0028 public:
0029   void addThenDone();
0030   void doneThenAdd();
0031   void addThenDoneFailed();
0032   void doneThenAddFailed();
0033   void stressTest();
0034   void setUp() {}
0035   void tearDown() {}
0036 };
0037 
0038 namespace {
0039   class TestCalledTask : public edm::WaitingTask {
0040   public:
0041     TestCalledTask(std::atomic<bool>& iCalled, std::exception_ptr& iPtr) : m_called(iCalled), m_ptr(iPtr) {}
0042 
0043     void execute() final {
0044       if (exceptionPtr()) {
0045         m_ptr = exceptionPtr();
0046       }
0047       m_called = true;
0048       return;
0049     }
0050 
0051   private:
0052     std::atomic<bool>& m_called;
0053     std::exception_ptr& m_ptr;
0054   };
0055 
0056   class TestValueSetTask : public edm::WaitingTask {
0057   public:
0058     TestValueSetTask(std::atomic<bool>& iValue) : m_value(iValue) {}
0059     void execute() final {
0060       CPPUNIT_ASSERT(m_value);
0061       return;
0062     }
0063 
0064   private:
0065     std::atomic<bool>& m_value;
0066   };
0067 
0068 }  // namespace
0069 using namespace std::chrono_literals;
0070 
0071 void WaitingTaskList_test::addThenDone() {
0072   std::atomic<bool> called{false};
0073 
0074   edm::WaitingTaskList waitList;
0075   {
0076     std::exception_ptr excPtr;
0077 
0078     auto t = new TestCalledTask{called, excPtr};
0079 
0080     oneapi::tbb::task_group group;
0081     waitList.add(edm::WaitingTaskHolder(group, t));
0082 
0083     std::this_thread::sleep_for(10us);
0084     CPPUNIT_ASSERT(false == called);
0085 
0086     waitList.doneWaiting(std::exception_ptr{});
0087     group.wait();
0088     CPPUNIT_ASSERT(true == called);
0089     CPPUNIT_ASSERT(bool(excPtr) == false);
0090   }
0091 
0092   waitList.reset();
0093   called = false;
0094 
0095   {
0096     std::exception_ptr excPtr;
0097 
0098     auto t = new TestCalledTask{called, excPtr};
0099 
0100     oneapi::tbb::task_group group;
0101 
0102     waitList.add(edm::WaitingTaskHolder(group, t));
0103 
0104     std::this_thread::sleep_for(10us);
0105     CPPUNIT_ASSERT(false == called);
0106 
0107     waitList.doneWaiting(std::exception_ptr{});
0108     group.wait();
0109     CPPUNIT_ASSERT(true == called);
0110     CPPUNIT_ASSERT(bool(excPtr) == false);
0111   }
0112 }
0113 
0114 void WaitingTaskList_test::doneThenAdd() {
0115   std::atomic<bool> called{false};
0116   std::exception_ptr excPtr;
0117 
0118   edm::WaitingTaskList waitList;
0119   {
0120     oneapi::tbb::task_group group;
0121 
0122     auto t = new TestCalledTask{called, excPtr};
0123 
0124     waitList.doneWaiting(std::exception_ptr{});
0125 
0126     waitList.add(edm::WaitingTaskHolder(group, t));
0127     group.wait();
0128     CPPUNIT_ASSERT(true == called);
0129     CPPUNIT_ASSERT(bool(excPtr) == false);
0130   }
0131 }
0132 
0133 void WaitingTaskList_test::addThenDoneFailed() {
0134   std::atomic<bool> called{false};
0135 
0136   edm::WaitingTaskList waitList;
0137   {
0138     std::exception_ptr excPtr;
0139 
0140     auto t = new TestCalledTask{called, excPtr};
0141 
0142     oneapi::tbb::task_group group;
0143 
0144     waitList.add(edm::WaitingTaskHolder(group, t));
0145 
0146     std::this_thread::sleep_for(10us);
0147     CPPUNIT_ASSERT(false == called);
0148 
0149     waitList.doneWaiting(std::make_exception_ptr(std::string("failed")));
0150     group.wait();
0151     CPPUNIT_ASSERT(true == called);
0152     CPPUNIT_ASSERT(bool(excPtr) == true);
0153   }
0154 }
0155 
0156 void WaitingTaskList_test::doneThenAddFailed() {
0157   std::atomic<bool> called{false};
0158   std::exception_ptr excPtr;
0159 
0160   edm::WaitingTaskList waitList;
0161   {
0162     auto t = new TestCalledTask{called, excPtr};
0163 
0164     waitList.doneWaiting(std::make_exception_ptr(std::string("failed")));
0165 
0166     oneapi::tbb::task_group group;
0167 
0168     waitList.add(edm::WaitingTaskHolder(group, t));
0169     group.wait();
0170     CPPUNIT_ASSERT(true == called);
0171     CPPUNIT_ASSERT(bool(excPtr) == true);
0172   }
0173 }
0174 
0175 namespace {
0176   void join_thread(std::thread* iThread) {
0177     if (iThread->joinable()) {
0178       iThread->join();
0179     }
0180   }
0181 }  // namespace
0182 
0183 void WaitingTaskList_test::stressTest() {
0184   edm::WaitingTaskList waitList;
0185   oneapi::tbb::task_group group;
0186 
0187   unsigned int index = 1000;
0188   const unsigned int nTasks = 10000;
0189   while (0 != --index) {
0190     edm::FinalWaitingTask waitTask{group};
0191     auto* pWaitTask = &waitTask;
0192     {
0193       edm::WaitingTaskHolder waitTaskH(group, pWaitTask);
0194       std::thread makeTasksThread([&waitList, waitTaskH] {
0195         for (unsigned int i = 0; i < nTasks; ++i) {
0196           waitList.add(waitTaskH);
0197         }
0198       });
0199       std::shared_ptr<std::thread>(&makeTasksThread, join_thread);
0200 
0201       std::thread doneWaitThread([&waitList, waitTaskH] { waitList.doneWaiting(std::exception_ptr{}); });
0202       std::shared_ptr<std::thread>(&doneWaitThread, join_thread);
0203     }
0204     waitTask.wait();
0205   }
0206 }
0207 
0208 CPPUNIT_TEST_SUITE_REGISTRATION(WaitingTaskList_test);