Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:56

0001 #ifndef FWCore_Concurrency_WaitingTaskList_h
0002 #define FWCore_Concurrency_WaitingTaskList_h
0003 // -*- C++ -*-
0004 //
0005 // Package:     Concurrency
0006 // Class  :     WaitingTaskList
0007 //
0008 /**\class WaitingTaskList WaitingTaskList.h FWCore/Concurrency/interface/WaitingTaskList.h
0009 
0010  Description: Handles starting tasks once some resource becomes available.
0011 
0012  Usage:
0013     This class can be used to have tasks wait to be spawned until a resource is available.
0014  Tasks that want to use the resource are added to the list by calling add(oneapi::tbb::task*).
0015  When the resource becomes available one calls doneWaiting() and then any waiting tasks will
0016  be spawned. If a call to add() is made after doneWaiting() the newly added task will
0017  immediately be spawned.
0018  The class can be reused by calling reset(). However, reset() is not thread-safe so one 
0019  must be certain neither add(...) nor doneWaiting() is called while reset() is running.
0020  
0021  An example usage would be if you had a task doing a long calculation (the resource) and
0022  then several other tasks have been created in a different thread and before running those
0023  new tasks you need the result of the long calculation.
0024  \code
0025  class CalcTask : public edm::WaitingTask {
0026     public:
0027     CalcTask(edm::WaitingTaskList* iWL, Value* v):
0028     m_waitList(iWL), m_output(v) {}
0029  
0030     oneapi::tbb::task* execute() {
0031      std::exception_ptr ptr;
0032      try {
0033        *m_output = doCalculation();
0034      } catch(...) {
0035        ptr = std::current_exception();
0036      }
0037      m_waitList.doneWaiting(ptr);
0038      return nullptr;
0039     }
0040     private:
0041      edm::WaitingTaskList* m_waitList;
0042      Value* m_output;
0043  };
0044  \endcode
0045  
0046  In one part of the code we can setup the shared resource
0047  \code
0048  WaitingTaskList waitList;
0049  Value v;
0050  \endcode
0051 
0052  In another part we can start the calculation
0053  \code
0054  oneapi::tbb::task* calc = new(oneapi::tbb::task::allocate_root()) CalcTask(&waitList,&v);
0055  oneapi::tbb::task::spawn(calc);
0056  \endcode
0057  
0058  Finally in some unrelated part of the code we can create tasks that need the calculation
0059  \code
0060  oneapi::tbb::task* t1 = makeTask1(v);
0061  waitList.add(t1);
0062  oneapi::tbb::task* t2 = makeTask2(v);
0063  waitList.add(t2);
0064  \endcode
0065 
0066 */
0067 //
0068 // Original Author:  Chris Jones
0069 //         Created:  Thu Feb 21 13:46:31 CST 2013
0070 // $Id$
0071 //
0072 
0073 // system include files
0074 #include <atomic>
0075 
0076 // user include files
0077 #include "FWCore/Concurrency/interface/WaitingTask.h"
0078 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0079 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0080 
0081 // forward declarations
0082 
0083 namespace edm {
0084   class WaitingTaskList {
0085   public:
0086     ///Constructor
0087     /**The WaitingTaskList is initial set to waiting.
0088        * \param[in] iInitialSize specifies the initial size of the cache used to hold waiting tasks.
0089        * The value is only useful for optimization as the object can resize itself.
0090        */
0091     explicit WaitingTaskList(unsigned int iInitialSize = 2);
0092     WaitingTaskList(const WaitingTaskList&) = delete;                   // stop default
0093     const WaitingTaskList& operator=(const WaitingTaskList&) = delete;  // stop default
0094     ~WaitingTaskList() = default;
0095 
0096     // ---------- member functions ---------------------------
0097 
0098     /** Use in the case where you need to inform the parent task of a
0099        failure before some other child task which may be run later reports
0100        a different, but related failure. You must later call doneWaiting
0101        with same exception later in the same thread.
0102        */
0103     void presetTaskAsFailed(std::exception_ptr iExcept);
0104 
0105     ///Adds task to the waiting list
0106     /**If doneWaiting() has already been called then the added task will immediately be run.
0107        * If that is not the case then the task will be held until doneWaiting() is called and will
0108        * then be run.
0109        * Calls to add() and doneWaiting() can safely be done concurrently.
0110        */
0111     void add(oneapi::tbb::task_group*, WaitingTask*);
0112 
0113     ///Adds task to the waiting list
0114     /**Calls to add() and doneWaiting() can safely be done concurrently.
0115       */
0116     void add(WaitingTaskHolder);
0117 
0118     ///Signals that the resource is now available and tasks should be spawned
0119     /**The owner of the resource calls this function to allow the waiting tasks to
0120        * start accessing it.
0121        * If the task fails, a non 'null' std::exception_ptr should be used.
0122        * To have tasks wait again one must call reset().
0123        * Calls to add() and doneWaiting() can safely be done concurrently.
0124        */
0125     void doneWaiting(std::exception_ptr iPtr);
0126 
0127     ///Resets access to the resource so that added tasks will wait.
0128     /**The owner of the resouce calls reset() to make tasks wait.
0129        * Calling reset() is NOT thread safe. The system must guarantee that no tasks are
0130        * using the resource when reset() is called and neither add() nor doneWaiting() can
0131        * be called concurrently with reset().
0132        */
0133     void reset();
0134 
0135   private:
0136     /**Handles running the tasks,
0137        * safe to call from multiple threads
0138        */
0139     void announce();
0140 
0141     struct WaitNode {
0142       WaitingTask* m_task;
0143       oneapi::tbb::task_group* m_group;
0144       std::atomic<WaitNode*> m_next;
0145       bool m_fromCache;
0146 
0147       void setNextNode(WaitNode* iNext) { m_next = iNext; }
0148 
0149       WaitNode* nextNode() const { return m_next; }
0150     };
0151 
0152     WaitNode* createNode(oneapi::tbb::task_group* iGroup, WaitingTask* iTask);
0153 
0154     // ---------- member data --------------------------------
0155     std::atomic<WaitNode*> m_head;
0156     std::unique_ptr<WaitNode[]> m_nodeCache;
0157     CMS_THREAD_GUARD(m_waiting) std::exception_ptr m_exceptionPtr;
0158     unsigned int m_nodeCacheSize;
0159     std::atomic<unsigned int> m_lastAssignedCacheIndex;
0160     std::atomic<bool> m_waiting;
0161   };
0162 }  // namespace edm
0163 
0164 #endif