WaitNode

WaitingTaskList

Macros

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
#ifndef FWCore_Concurrency_WaitingTaskList_h
#define FWCore_Concurrency_WaitingTaskList_h
// -*- C++ -*-
//
// Package:     Concurrency
// Class  :     WaitingTaskList
//
/**\class WaitingTaskList WaitingTaskList.h FWCore/Concurrency/interface/WaitingTaskList.h

 Description: Handles starting tasks once some resource becomes available.

 Usage:
    This class can be used to have tasks wait to be spawned until a resource is available.
 Tasks that want to use the resource are added to the list by calling add(oneapi::tbb::task*).
 When the resource becomes available one calls doneWaiting() and then any waiting tasks will
 be spawned. If a call to add() is made after doneWaiting() the newly added task will
 immediately be spawned.
 The class can be reused by calling reset(). However, reset() is not thread-safe so one 
 must be certain neither add(...) nor doneWaiting() is called while reset() is running.
 
 An example usage would be if you had a task doing a long calculation (the resource) and
 then several other tasks have been created in a different thread and before running those
 new tasks you need the result of the long calculation.
 \code
 class CalcTask : public edm::WaitingTask {
    public:
    CalcTask(edm::WaitingTaskList* iWL, Value* v):
    m_waitList(iWL), m_output(v) {}
 
    oneapi::tbb::task* execute() {
     std::exception_ptr ptr;
     try {
       *m_output = doCalculation();
     } catch(...) {
       ptr = std::current_exception();
     }
     m_waitList.doneWaiting(ptr);
     return nullptr;
    }
    private:
     edm::WaitingTaskList* m_waitList;
     Value* m_output;
 };
 \endcode
 
 In one part of the code we can setup the shared resource
 \code
 WaitingTaskList waitList;
 Value v;
 \endcode

 In another part we can start the calculation
 \code
 oneapi::tbb::task* calc = new(oneapi::tbb::task::allocate_root()) CalcTask(&waitList,&v);
 oneapi::tbb::task::spawn(calc);
 \endcode
 
 Finally in some unrelated part of the code we can create tasks that need the calculation
 \code
 oneapi::tbb::task* t1 = makeTask1(v);
 waitList.add(t1);
 oneapi::tbb::task* t2 = makeTask2(v);
 waitList.add(t2);
 \endcode

*/
//
// Original Author:  Chris Jones
//         Created:  Thu Feb 21 13:46:31 CST 2013
// $Id$
//

// system include files
#include <atomic>

// user include files
#include "FWCore/Concurrency/interface/WaitingTask.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

// forward declarations

namespace edm {
  class WaitingTaskList {
  public:
    ///Constructor
    /**The WaitingTaskList is initial set to waiting.
       * \param[in] iInitialSize specifies the initial size of the cache used to hold waiting tasks.
       * The value is only useful for optimization as the object can resize itself.
       */
    explicit WaitingTaskList(unsigned int iInitialSize = 2);
    WaitingTaskList(const WaitingTaskList&) = delete;                   // stop default
    const WaitingTaskList& operator=(const WaitingTaskList&) = delete;  // stop default
    ~WaitingTaskList() = default;

    // ---------- member functions ---------------------------

    /** Use in the case where you need to inform the parent task of a
       failure before some other child task which may be run later reports
       a different, but related failure. You must later call doneWaiting
       with same exception later in the same thread.
       */
    void presetTaskAsFailed(std::exception_ptr iExcept);

    ///Adds task to the waiting list
    /**If doneWaiting() has already been called then the added task will immediately be run.
       * If that is not the case then the task will be held until doneWaiting() is called and will
       * then be run.
       * Calls to add() and doneWaiting() can safely be done concurrently.
       */
    void add(oneapi::tbb::task_group*, WaitingTask*);

    ///Adds task to the waiting list
    /**Calls to add() and doneWaiting() can safely be done concurrently.
      */
    void add(WaitingTaskHolder);

    ///Signals that the resource is now available and tasks should be spawned
    /**The owner of the resource calls this function to allow the waiting tasks to
       * start accessing it.
       * If the task fails, a non 'null' std::exception_ptr should be used.
       * To have tasks wait again one must call reset().
       * Calls to add() and doneWaiting() can safely be done concurrently.
       */
    void doneWaiting(std::exception_ptr iPtr);

    ///Resets access to the resource so that added tasks will wait.
    /**The owner of the resouce calls reset() to make tasks wait.
       * Calling reset() is NOT thread safe. The system must guarantee that no tasks are
       * using the resource when reset() is called and neither add() nor doneWaiting() can
       * be called concurrently with reset().
       */
    void reset();

  private:
    /**Handles running the tasks,
       * safe to call from multiple threads
       */
    void announce();

    struct WaitNode {
      WaitingTask* m_task;
      oneapi::tbb::task_group* m_group;
      std::atomic<WaitNode*> m_next;
      bool m_fromCache;

      void setNextNode(WaitNode* iNext) { m_next = iNext; }

      WaitNode* nextNode() const { return m_next; }
    };

    WaitNode* createNode(oneapi::tbb::task_group* iGroup, WaitingTask* iTask);

    // ---------- member data --------------------------------
    std::atomic<WaitNode*> m_head;
    std::unique_ptr<WaitNode[]> m_nodeCache;
    CMS_THREAD_GUARD(m_waiting) std::exception_ptr m_exceptionPtr;
    unsigned int m_nodeCacheSize;
    std::atomic<unsigned int> m_lastAssignedCacheIndex;
    std::atomic<bool> m_waiting;
  };
}  // namespace edm

#endif