|
||||
File indexing completed on 2024-04-06 12:11:56
0001 #ifndef FWCore_Concurrency_WaitingTaskList_h 0002 #define FWCore_Concurrency_WaitingTaskList_h 0003 // -*- C++ -*- 0004 // 0005 // Package: Concurrency 0006 // Class : WaitingTaskList 0007 // 0008 /**\class WaitingTaskList WaitingTaskList.h FWCore/Concurrency/interface/WaitingTaskList.h 0009 0010 Description: Handles starting tasks once some resource becomes available. 0011 0012 Usage: 0013 This class can be used to have tasks wait to be spawned until a resource is available. 0014 Tasks that want to use the resource are added to the list by calling add(oneapi::tbb::task*). 0015 When the resource becomes available one calls doneWaiting() and then any waiting tasks will 0016 be spawned. If a call to add() is made after doneWaiting() the newly added task will 0017 immediately be spawned. 0018 The class can be reused by calling reset(). However, reset() is not thread-safe so one 0019 must be certain neither add(...) nor doneWaiting() is called while reset() is running. 0020 0021 An example usage would be if you had a task doing a long calculation (the resource) and 0022 then several other tasks have been created in a different thread and before running those 0023 new tasks you need the result of the long calculation. 0024 \code 0025 class CalcTask : public edm::WaitingTask { 0026 public: 0027 CalcTask(edm::WaitingTaskList* iWL, Value* v): 0028 m_waitList(iWL), m_output(v) {} 0029 0030 oneapi::tbb::task* execute() { 0031 std::exception_ptr ptr; 0032 try { 0033 *m_output = doCalculation(); 0034 } catch(...) { 0035 ptr = std::current_exception(); 0036 } 0037 m_waitList.doneWaiting(ptr); 0038 return nullptr; 0039 } 0040 private: 0041 edm::WaitingTaskList* m_waitList; 0042 Value* m_output; 0043 }; 0044 \endcode 0045 0046 In one part of the code we can setup the shared resource 0047 \code 0048 WaitingTaskList waitList; 0049 Value v; 0050 \endcode 0051 0052 In another part we can start the calculation 0053 \code 0054 oneapi::tbb::task* calc = new(oneapi::tbb::task::allocate_root()) CalcTask(&waitList,&v); 0055 oneapi::tbb::task::spawn(calc); 0056 \endcode 0057 0058 Finally in some unrelated part of the code we can create tasks that need the calculation 0059 \code 0060 oneapi::tbb::task* t1 = makeTask1(v); 0061 waitList.add(t1); 0062 oneapi::tbb::task* t2 = makeTask2(v); 0063 waitList.add(t2); 0064 \endcode 0065 0066 */ 0067 // 0068 // Original Author: Chris Jones 0069 // Created: Thu Feb 21 13:46:31 CST 2013 0070 // $Id$ 0071 // 0072 0073 // system include files 0074 #include <atomic> 0075 0076 // user include files 0077 #include "FWCore/Concurrency/interface/WaitingTask.h" 0078 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h" 0079 #include "FWCore/Utilities/interface/thread_safety_macros.h" 0080 0081 // forward declarations 0082 0083 namespace edm { 0084 class WaitingTaskList { 0085 public: 0086 ///Constructor 0087 /**The WaitingTaskList is initial set to waiting. 0088 * \param[in] iInitialSize specifies the initial size of the cache used to hold waiting tasks. 0089 * The value is only useful for optimization as the object can resize itself. 0090 */ 0091 explicit WaitingTaskList(unsigned int iInitialSize = 2); 0092 WaitingTaskList(const WaitingTaskList&) = delete; // stop default 0093 const WaitingTaskList& operator=(const WaitingTaskList&) = delete; // stop default 0094 ~WaitingTaskList() = default; 0095 0096 // ---------- member functions --------------------------- 0097 0098 /** Use in the case where you need to inform the parent task of a 0099 failure before some other child task which may be run later reports 0100 a different, but related failure. You must later call doneWaiting 0101 with same exception later in the same thread. 0102 */ 0103 void presetTaskAsFailed(std::exception_ptr iExcept); 0104 0105 ///Adds task to the waiting list 0106 /**If doneWaiting() has already been called then the added task will immediately be run. 0107 * If that is not the case then the task will be held until doneWaiting() is called and will 0108 * then be run. 0109 * Calls to add() and doneWaiting() can safely be done concurrently. 0110 */ 0111 void add(oneapi::tbb::task_group*, WaitingTask*); 0112 0113 ///Adds task to the waiting list 0114 /**Calls to add() and doneWaiting() can safely be done concurrently. 0115 */ 0116 void add(WaitingTaskHolder); 0117 0118 ///Signals that the resource is now available and tasks should be spawned 0119 /**The owner of the resource calls this function to allow the waiting tasks to 0120 * start accessing it. 0121 * If the task fails, a non 'null' std::exception_ptr should be used. 0122 * To have tasks wait again one must call reset(). 0123 * Calls to add() and doneWaiting() can safely be done concurrently. 0124 */ 0125 void doneWaiting(std::exception_ptr iPtr); 0126 0127 ///Resets access to the resource so that added tasks will wait. 0128 /**The owner of the resouce calls reset() to make tasks wait. 0129 * Calling reset() is NOT thread safe. The system must guarantee that no tasks are 0130 * using the resource when reset() is called and neither add() nor doneWaiting() can 0131 * be called concurrently with reset(). 0132 */ 0133 void reset(); 0134 0135 private: 0136 /**Handles running the tasks, 0137 * safe to call from multiple threads 0138 */ 0139 void announce(); 0140 0141 struct WaitNode { 0142 WaitingTask* m_task; 0143 oneapi::tbb::task_group* m_group; 0144 std::atomic<WaitNode*> m_next; 0145 bool m_fromCache; 0146 0147 void setNextNode(WaitNode* iNext) { m_next = iNext; } 0148 0149 WaitNode* nextNode() const { return m_next; } 0150 }; 0151 0152 WaitNode* createNode(oneapi::tbb::task_group* iGroup, WaitingTask* iTask); 0153 0154 // ---------- member data -------------------------------- 0155 std::atomic<WaitNode*> m_head; 0156 std::unique_ptr<WaitNode[]> m_nodeCache; 0157 CMS_THREAD_GUARD(m_waiting) std::exception_ptr m_exceptionPtr; 0158 unsigned int m_nodeCacheSize; 0159 std::atomic<unsigned int> m_lastAssignedCacheIndex; 0160 std::atomic<bool> m_waiting; 0161 }; 0162 } // namespace edm 0163 0164 #endif
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.2.1 LXR engine. The LXR team |