![]() |
|
|||
File indexing completed on 2024-04-06 12:11:55
0001 #ifndef FWCore_Concurrency_SerialTaskQueue_h 0002 #define FWCore_Concurrency_SerialTaskQueue_h 0003 // -*- C++ -*- 0004 // 0005 // Package: Concurrency 0006 // Class : SerialTaskQueue 0007 // 0008 /**\class SerialTaskQueue SerialTaskQueue.h "FWCore/Concurrency/interface/SerialTaskQueue.h" 0009 0010 Description: Runs only one task from the queue at a time 0011 0012 Usage: 0013 A SerialTaskQueue is used to provide thread-safe access to a resource. You create a SerialTaskQueue 0014 for the resource. When every you need to perform an operation on the resource, you push a 'task' that 0015 does that operation onto the queue. The queue then makes sure to run one and only one task at a time. 0016 This guarantees serial access to the resource and therefore thread-safety. 0017 0018 The 'tasks' managed by the SerialTaskQueue are just functor objects who which take no arguments and 0019 return no values. The simplest way to create a task is to use a C++11 lambda. 0020 0021 Example: Imagine we have the following data structures. 0022 \code 0023 std::vector<int> values; 0024 edm::SerialTaskQueue queue; 0025 \endcode 0026 0027 On thread 1 we can fill the vector 0028 \code 0029 for(int i=0; i<1000;++i) { 0030 queue.pushAndWait( [&values,i]{ values.push_back(i);} ); 0031 } 0032 \endcode 0033 0034 While on thread 2 we periodically print and stop when the vector is filled 0035 \code 0036 bool stop = false; 0037 while(not stop) { 0038 queue.pushAndWait([&false,&values] { 0039 if( 0 == (values.size() % 100) ) { 0040 std::cout <<values.size()<<std::endl; 0041 } 0042 if(values.size()>999) { 0043 stop = true; 0044 } 0045 }); 0046 } 0047 \endcode 0048 */ 0049 // 0050 // Original Author: Chris Jones 0051 // Created: Thu Feb 21 11:14:39 CST 2013 0052 // $Id$ 0053 // 0054 0055 // system include files 0056 #include <atomic> 0057 #include <cassert> 0058 0059 #include "oneapi/tbb/task_group.h" 0060 #include "oneapi/tbb/concurrent_queue.h" 0061 #include "FWCore/Utilities/interface/thread_safety_macros.h" 0062 0063 // user include files 0064 0065 // forward declarations 0066 namespace edm { 0067 class SerialTaskQueue { 0068 public: 0069 SerialTaskQueue() : m_taskChosen(false), m_pauseCount{0} {} 0070 0071 SerialTaskQueue(SerialTaskQueue&& iOther) 0072 : m_tasks(std::move(iOther.m_tasks)), 0073 m_taskChosen(iOther.m_taskChosen.exchange(false)), 0074 m_pauseCount(iOther.m_pauseCount.exchange(0)) { 0075 assert(m_tasks.empty() and m_taskChosen == false); 0076 } 0077 SerialTaskQueue(const SerialTaskQueue&) = delete; 0078 const SerialTaskQueue& operator=(const SerialTaskQueue&) = delete; 0079 0080 ~SerialTaskQueue(); 0081 0082 // ---------- const member functions --------------------- 0083 /// Checks to see if the queue has been paused. 0084 /**\return true if the queue is paused 0085 * \sa pause(), resume() 0086 */ 0087 bool isPaused() const { return m_pauseCount.load() != 0; } 0088 0089 // ---------- member functions --------------------------- 0090 /// Pauses processing of additional tasks from the queue. 0091 /** 0092 * Any task already running will not be paused however once that 0093 * running task finishes no further tasks will be started. 0094 * Multiple calls to pause() are allowed, however each call to 0095 * pause() must be balanced by a call to resume(). 0096 * \return false if queue was already paused. 0097 * \sa resume(), isPaused() 0098 */ 0099 bool pause() { return 1 == ++m_pauseCount; } 0100 0101 /// Resumes processing if the queue was paused. 0102 /** 0103 * Multiple calls to resume() are allowed if there 0104 * were multiple calls to pause(). Only when we reach as 0105 * many resume() calls as pause() calls will the queue restart. 0106 * \return true if the call really restarts the queue 0107 * \sa pause(), isPaused() 0108 */ 0109 bool resume(); 0110 0111 /// asynchronously pushes functor iAction into queue 0112 /** 0113 * The function will return immediately and iAction will either 0114 * process concurrently with the calling thread or wait until the 0115 * protected resource becomes available or until a CPU becomes available. 0116 * \param[in] iAction Must be a functor that takes no arguments and return no values. 0117 */ 0118 template <typename T> 0119 void push(oneapi::tbb::task_group&, const T& iAction); 0120 0121 private: 0122 /** Base class for all tasks held by the SerialTaskQueue */ 0123 class TaskBase { 0124 friend class SerialTaskQueue; 0125 0126 oneapi::tbb::task_group* group() { return m_group; } 0127 virtual void execute() = 0; 0128 0129 public: 0130 virtual ~TaskBase() = default; 0131 0132 protected: 0133 explicit TaskBase(oneapi::tbb::task_group* iGroup) : m_group(iGroup) {} 0134 0135 private: 0136 oneapi::tbb::task_group* m_group; 0137 }; 0138 0139 template <typename T> 0140 class QueuedTask : public TaskBase { 0141 public: 0142 QueuedTask(oneapi::tbb::task_group& iGroup, const T& iAction) : TaskBase(&iGroup), m_action(iAction) {} 0143 0144 private: 0145 void execute() final; 0146 0147 T m_action; 0148 }; 0149 0150 friend class TaskBase; 0151 0152 void pushTask(TaskBase*); 0153 TaskBase* pushAndGetNextTask(TaskBase*); 0154 TaskBase* finishedTask(); 0155 //returns nullptr if a task is already being processed 0156 TaskBase* pickNextTask(); 0157 0158 void spawn(TaskBase&); 0159 0160 // ---------- member data -------------------------------- 0161 oneapi::tbb::concurrent_queue<TaskBase*> m_tasks; 0162 std::atomic<bool> m_taskChosen; 0163 std::atomic<unsigned long> m_pauseCount; 0164 }; 0165 0166 template <typename T> 0167 void SerialTaskQueue::push(oneapi::tbb::task_group& iGroup, const T& iAction) { 0168 QueuedTask<T>* pTask{new QueuedTask<T>{iGroup, iAction}}; 0169 pushTask(pTask); 0170 } 0171 0172 template <typename T> 0173 void SerialTaskQueue::QueuedTask<T>::execute() { 0174 // Exception has to swallowed in order to avoid throwing from execute(). The user of SerialTaskQueue should handle exceptions within m_action(). 0175 CMS_SA_ALLOW try { this->m_action(); } catch (...) { 0176 } 0177 } 0178 0179 } // namespace edm 0180 0181 #endif
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.2.1 LXR engine. The LXR team |
![]() ![]() |