SerialTaskQueueChain

Macros

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
#ifndef FWCore_Concurrency_SerialTaskQueueChain_h
#define FWCore_Concurrency_SerialTaskQueueChain_h
// -*- C++ -*-
//
// Package:     FWCore/Concurrency
// Class  :     SerialTaskQueueChain
//
/**\class SerialTaskQueueChain SerialTaskQueueChain.h "SerialTaskQueueChain.h"

 Description: [one line class summary]

 Usage:
    <usage>

*/
//
// Original Author:  root
//         Created:  Mon, 15 Aug 2016 18:04:02 GMT
//

// system include files
#include <cassert>
#include <memory>
#include <vector>

// user include files
#include "FWCore/Concurrency/interface/SerialTaskQueue.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

// forward declarations
namespace edm {
  class SerialTaskQueueChain {
  public:
    SerialTaskQueueChain() {}
    explicit SerialTaskQueueChain(std::vector<std::shared_ptr<SerialTaskQueue>> iQueues)
        : m_queues(std::move(iQueues)) {}

    SerialTaskQueueChain(const SerialTaskQueueChain&) = delete;
    SerialTaskQueueChain& operator=(const SerialTaskQueueChain&) = delete;
    SerialTaskQueueChain(SerialTaskQueueChain&& iOld)
        : m_queues(std::move(iOld.m_queues)), m_outstandingTasks{iOld.m_outstandingTasks.load()} {}

    SerialTaskQueueChain& operator=(SerialTaskQueueChain&& iOld) {
      m_queues = std::move(iOld.m_queues);
      m_outstandingTasks.store(iOld.m_outstandingTasks.load());
      return *this;
    }

    /// asynchronously pushes functor iAction into queue
    /**
     * The function will return immediately and iAction will either
     * process concurrently with the calling thread or wait until the
     * protected resource becomes available or until a CPU becomes available.
     * \param[in] iAction Must be a functor that takes no arguments and return no values.
     */
    template <typename T>
    void push(oneapi::tbb::task_group& iGroup, T&& iAction);

    unsigned long outstandingTasks() const { return m_outstandingTasks; }
    std::size_t numberOfQueues() const { return m_queues.size(); }

  private:
    // ---------- member data --------------------------------
    std::vector<std::shared_ptr<SerialTaskQueue>> m_queues;
    std::atomic<unsigned long> m_outstandingTasks{0};

    template <typename T>
    void passDownChain(unsigned int iIndex, oneapi::tbb::task_group& iGroup, T&& iAction);

    template <typename T>
    void actionToRun(T&& iAction);
  };

  template <typename T>
  void SerialTaskQueueChain::push(oneapi::tbb::task_group& iGroup, T&& iAction) {
    ++m_outstandingTasks;
    if (m_queues.size() == 1) {
      m_queues[0]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
    } else {
      assert(!m_queues.empty());
      m_queues[0]->push(iGroup, [this, &iGroup, iAction]() mutable { this->passDownChain(1, iGroup, iAction); });
    }
  }

  template <typename T>
  void SerialTaskQueueChain::passDownChain(unsigned int iQueueIndex, oneapi::tbb::task_group& iGroup, T&& iAction) {
    //Have to be sure the queue associated to this running task
    // does not attempt to start another task
    m_queues[iQueueIndex - 1]->pause();
    //is this the last queue?
    if (iQueueIndex + 1 == m_queues.size()) {
      m_queues[iQueueIndex]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
    } else {
      auto nextQueue = iQueueIndex + 1;
      m_queues[iQueueIndex]->push(
          iGroup, [this, nextQueue, &iGroup, iAction]() mutable { this->passDownChain(nextQueue, iGroup, iAction); });
    }
  }

  template <typename T>
  void SerialTaskQueueChain::actionToRun(T&& iAction) {
    //even if an exception happens we will resume the queues.
    using Queues = std::vector<std::shared_ptr<SerialTaskQueue>>;
    auto sentryAction = [](SerialTaskQueueChain* iChain) {
      auto& vec = iChain->m_queues;
      for (auto it = vec.rbegin() + 1; it != vec.rend(); ++it) {
        (*it)->resume();
      }
      --(iChain->m_outstandingTasks);
    };

    std::unique_ptr<SerialTaskQueueChain, decltype(sentryAction)> sentry(this, sentryAction);
    iAction();
  }
}  // namespace edm

#endif