Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:55

0001 #ifndef FWCore_Concurrency_LimitedTaskQueue_h
0002 #define FWCore_Concurrency_LimitedTaskQueue_h
0003 // -*- C++ -*-
0004 //
0005 // Package:     Concurrency
0006 // Class  :     LimitedTaskQueue
0007 //
0008 /**\class LimitedTaskQueue LimitedTaskQueue.h "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0009 
0010  Description: Runs a set number of tasks from the queue at a time
0011 
0012  Usage:
0013     A LimitedTaskQueue is used to provide access to a limited thread-safe resource. You create a LimitedTaskQueue
0014  for the resource. When every you need to perform an operation on the resource, you push a 'task' that
0015  does that operation onto the queue. The queue then makes sure to run a limited number of tasks at a time.
0016  
0017     The 'tasks' managed by the LimitedTaskQueue are just functor objects who which take no arguments and
0018  return no values. The simplest way to create a task is to use a C++11 lambda.
0019  
0020 */
0021 //
0022 // Original Author:  Chris Jones
0023 //         Created:  Thu Feb 21 11:14:39 CST 2013
0024 // $Id$
0025 //
0026 
0027 // system include files
0028 #include <atomic>
0029 #include <vector>
0030 #include <memory>
0031 
0032 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0033 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0034 
0035 // user include files
0036 
0037 // forward declarations
0038 namespace edm {
0039   class LimitedTaskQueue {
0040   public:
0041     LimitedTaskQueue(unsigned int iLimit) : m_queues{iLimit} {}
0042     LimitedTaskQueue(const LimitedTaskQueue&) = delete;
0043     const LimitedTaskQueue& operator=(const LimitedTaskQueue&) = delete;
0044 
0045     // ---------- member functions ---------------------------
0046 
0047     /// asynchronously pushes functor iAction into queue
0048     /**
0049        * The function will return immediately and iAction will either
0050        * process concurrently with the calling thread or wait until the
0051        * protected resource becomes available or until a CPU becomes available.
0052        * \param[in] iAction Must be a functor that takes no arguments and return no values.
0053        */
0054     template <typename T>
0055     void push(oneapi::tbb::task_group& iGroup, T&& iAction);
0056 
0057     class Resumer {
0058     public:
0059       friend class LimitedTaskQueue;
0060 
0061       Resumer() = default;
0062       ~Resumer() { resume(); }
0063 
0064       Resumer(Resumer&& iOther) : m_queue(iOther.m_queue) { iOther.m_queue = nullptr; }
0065 
0066       Resumer(Resumer const& iOther) : m_queue(iOther.m_queue) {
0067         if (m_queue) {
0068           m_queue->pause();
0069         }
0070       }
0071 
0072       Resumer& operator=(Resumer const& iOther) {
0073         auto t = iOther;
0074         return (*this = std::move(t));
0075       }
0076       Resumer& operator=(Resumer&& iOther) {
0077         if (m_queue) {
0078           m_queue->resume();
0079         }
0080         m_queue = iOther.m_queue;
0081         iOther.m_queue = nullptr;
0082         return *this;
0083       }
0084 
0085       bool resume() {
0086         if (m_queue) {
0087           auto q = m_queue;
0088           m_queue = nullptr;
0089           return q->resume();
0090         }
0091         return false;
0092       }
0093 
0094     private:
0095       Resumer(SerialTaskQueue* iQueue) : m_queue{iQueue} {}
0096       SerialTaskQueue* m_queue = nullptr;
0097     };
0098 
0099     /// asynchronously pushes functor iAction into queue then pause the queue and run iAction
0100     /** iAction must take as argument a copy of a LimitedTaskQueue::Resumer. To resume
0101        the queue let the last copy of the Resumer go out of scope, or call Resumer::resume().
0102        Using this function will decrease the allowed concurrency limit by 1.
0103        */
0104     template <typename T>
0105     void pushAndPause(oneapi::tbb::task_group& iGroup, T&& iAction);
0106 
0107     unsigned int concurrencyLimit() const { return m_queues.size(); }
0108 
0109   private:
0110     // ---------- member data --------------------------------
0111     std::vector<SerialTaskQueue> m_queues;
0112   };
0113 
0114   template <typename T>
0115   void LimitedTaskQueue::push(oneapi::tbb::task_group& iGroup, T&& iAction) {
0116     auto set_to_run = std::make_shared<std::atomic<bool>>(false);
0117     for (auto& q : m_queues) {
0118       q.push(iGroup, [set_to_run, iAction]() mutable {
0119         bool expected = false;
0120         if (set_to_run->compare_exchange_strong(expected, true)) {
0121           iAction();
0122         }
0123       });
0124     }
0125   }
0126 
0127   template <typename T>
0128   void LimitedTaskQueue::pushAndPause(oneapi::tbb::task_group& iGroup, T&& iAction) {
0129     auto set_to_run = std::make_shared<std::atomic<bool>>(false);
0130     for (auto& q : m_queues) {
0131       q.push(iGroup, [&q, set_to_run, iAction]() mutable {
0132         bool expected = false;
0133         if (set_to_run->compare_exchange_strong(expected, true)) {
0134           q.pause();
0135           iAction(Resumer(&q));
0136         }
0137       });
0138     }
0139   }
0140 
0141 }  // namespace edm
0142 
0143 #endif