LimitedTaskQueue

Resumer

Macros

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
#ifndef FWCore_Concurrency_LimitedTaskQueue_h
#define FWCore_Concurrency_LimitedTaskQueue_h
// -*- C++ -*-
//
// Package:     Concurrency
// Class  :     LimitedTaskQueue
//
/**\class LimitedTaskQueue LimitedTaskQueue.h "FWCore/Concurrency/interface/LimitedTaskQueue.h"

 Description: Runs a set number of tasks from the queue at a time

 Usage:
    A LimitedTaskQueue is used to provide access to a limited thread-safe resource. You create a LimitedTaskQueue
 for the resource. When every you need to perform an operation on the resource, you push a 'task' that
 does that operation onto the queue. The queue then makes sure to run a limited number of tasks at a time.
 
    The 'tasks' managed by the LimitedTaskQueue are just functor objects who which take no arguments and
 return no values. The simplest way to create a task is to use a C++11 lambda.
 
*/
//
// Original Author:  Chris Jones
//         Created:  Thu Feb 21 11:14:39 CST 2013
// $Id$
//

// system include files
#include <atomic>
#include <vector>
#include <memory>

#include "FWCore/Concurrency/interface/SerialTaskQueue.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

// user include files

// forward declarations
namespace edm {
  class LimitedTaskQueue {
  public:
    LimitedTaskQueue(unsigned int iLimit) : m_queues{iLimit} {}
    LimitedTaskQueue(const LimitedTaskQueue&) = delete;
    const LimitedTaskQueue& operator=(const LimitedTaskQueue&) = delete;

    // ---------- member functions ---------------------------

    /// asynchronously pushes functor iAction into queue
    /**
       * The function will return immediately and iAction will either
       * process concurrently with the calling thread or wait until the
       * protected resource becomes available or until a CPU becomes available.
       * \param[in] iAction Must be a functor that takes no arguments and return no values.
       */
    template <typename T>
    void push(oneapi::tbb::task_group& iGroup, T&& iAction);

    class Resumer {
    public:
      friend class LimitedTaskQueue;

      Resumer() = default;
      ~Resumer() { resume(); }

      Resumer(Resumer&& iOther) : m_queue(iOther.m_queue) { iOther.m_queue = nullptr; }

      Resumer(Resumer const& iOther) : m_queue(iOther.m_queue) {
        if (m_queue) {
          m_queue->pause();
        }
      }

      Resumer& operator=(Resumer const& iOther) {
        auto t = iOther;
        return (*this = std::move(t));
      }
      Resumer& operator=(Resumer&& iOther) {
        if (m_queue) {
          m_queue->resume();
        }
        m_queue = iOther.m_queue;
        iOther.m_queue = nullptr;
        return *this;
      }

      bool resume() {
        if (m_queue) {
          auto q = m_queue;
          m_queue = nullptr;
          return q->resume();
        }
        return false;
      }

    private:
      Resumer(SerialTaskQueue* iQueue) : m_queue{iQueue} {}
      SerialTaskQueue* m_queue = nullptr;
    };

    /// asynchronously pushes functor iAction into queue then pause the queue and run iAction
    /** iAction must take as argument a copy of a LimitedTaskQueue::Resumer. To resume
       the queue let the last copy of the Resumer go out of scope, or call Resumer::resume().
       Using this function will decrease the allowed concurrency limit by 1.
       */
    template <typename T>
    void pushAndPause(oneapi::tbb::task_group& iGroup, T&& iAction);

    unsigned int concurrencyLimit() const { return m_queues.size(); }

  private:
    // ---------- member data --------------------------------
    std::vector<SerialTaskQueue> m_queues;
  };

  template <typename T>
  void LimitedTaskQueue::push(oneapi::tbb::task_group& iGroup, T&& iAction) {
    auto set_to_run = std::make_shared<std::atomic<bool>>(false);
    for (auto& q : m_queues) {
      q.push(iGroup, [set_to_run, iAction]() mutable {
        bool expected = false;
        if (set_to_run->compare_exchange_strong(expected, true)) {
          iAction();
        }
      });
    }
  }

  template <typename T>
  void LimitedTaskQueue::pushAndPause(oneapi::tbb::task_group& iGroup, T&& iAction) {
    auto set_to_run = std::make_shared<std::atomic<bool>>(false);
    for (auto& q : m_queues) {
      q.push(iGroup, [&q, set_to_run, iAction]() mutable {
        bool expected = false;
        if (set_to_run->compare_exchange_strong(expected, true)) {
          q.pause();
          iAction(Resumer(&q));
        }
      });
    }
  }

}  // namespace edm

#endif