Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:55

0001 #ifndef FWCore_Concurrency_SerialTaskQueue_h
0002 #define FWCore_Concurrency_SerialTaskQueue_h
0003 // -*- C++ -*-
0004 //
0005 // Package:     Concurrency
0006 // Class  :     SerialTaskQueue
0007 //
0008 /**\class SerialTaskQueue SerialTaskQueue.h "FWCore/Concurrency/interface/SerialTaskQueue.h"
0009 
0010  Description: Runs only one task from the queue at a time
0011 
0012  Usage:
0013     A SerialTaskQueue is used to provide thread-safe access to a resource. You create a SerialTaskQueue
0014  for the resource. When every you need to perform an operation on the resource, you push a 'task' that
0015  does that operation onto the queue. The queue then makes sure to run one and only one task at a time.
0016  This guarantees serial access to the resource and therefore thread-safety.
0017  
0018     The 'tasks' managed by the SerialTaskQueue are just functor objects who which take no arguments and
0019  return no values. The simplest way to create a task is to use a C++11 lambda.
0020  
0021  Example: Imagine we have the following data structures.
0022  \code
0023  std::vector<int> values;
0024  edm::SerialTaskQueue queue;
0025  \endcode
0026 
0027  On thread 1 we can fill the vector
0028  \code
0029  for(int i=0; i<1000;++i) {
0030    queue.pushAndWait( [&values,i]{ values.push_back(i);} );
0031  }
0032  \endcode
0033  
0034  While on thread 2 we periodically print and stop when the vector is filled
0035  \code
0036  bool stop = false;
0037  while(not stop) {
0038    queue.pushAndWait([&false,&values] {
0039      if( 0 == (values.size() % 100) ) {
0040         std::cout <<values.size()<<std::endl;
0041      }
0042      if(values.size()>999) {
0043        stop = true;
0044      }
0045    });
0046  }
0047 \endcode
0048 */
0049 //
0050 // Original Author:  Chris Jones
0051 //         Created:  Thu Feb 21 11:14:39 CST 2013
0052 // $Id$
0053 //
0054 
0055 // system include files
0056 #include <atomic>
0057 #include <cassert>
0058 
0059 #include "oneapi/tbb/task_group.h"
0060 #include "oneapi/tbb/concurrent_queue.h"
0061 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0062 
0063 // user include files
0064 
0065 // forward declarations
0066 namespace edm {
0067   class SerialTaskQueue {
0068   public:
0069     SerialTaskQueue() : m_taskChosen(false), m_pauseCount{0} {}
0070 
0071     SerialTaskQueue(SerialTaskQueue&& iOther)
0072         : m_tasks(std::move(iOther.m_tasks)),
0073           m_taskChosen(iOther.m_taskChosen.exchange(false)),
0074           m_pauseCount(iOther.m_pauseCount.exchange(0)) {
0075       assert(m_tasks.empty() and m_taskChosen == false);
0076     }
0077     SerialTaskQueue(const SerialTaskQueue&) = delete;
0078     const SerialTaskQueue& operator=(const SerialTaskQueue&) = delete;
0079 
0080     ~SerialTaskQueue();
0081 
0082     // ---------- const member functions ---------------------
0083     /// Checks to see if the queue has been paused.
0084     /**\return true if the queue is paused
0085        * \sa pause(), resume()
0086        */
0087     bool isPaused() const { return m_pauseCount.load() != 0; }
0088 
0089     // ---------- member functions ---------------------------
0090     /// Pauses processing of additional tasks from the queue.
0091     /**
0092        * Any task already running will not be paused however once that
0093        * running task finishes no further tasks will be started.
0094        * Multiple calls to pause() are allowed, however each call to 
0095        * pause() must be balanced by a call to resume().
0096        * \return false if queue was already paused.
0097        * \sa resume(), isPaused()
0098        */
0099     bool pause() { return 1 == ++m_pauseCount; }
0100 
0101     /// Resumes processing if the queue was paused.
0102     /**
0103        * Multiple calls to resume() are allowed if there
0104        * were multiple calls to pause(). Only when we reach as
0105        * many resume() calls as pause() calls will the queue restart.
0106        * \return true if the call really restarts the queue
0107        * \sa pause(), isPaused()
0108        */
0109     bool resume();
0110 
0111     /// asynchronously pushes functor iAction into queue
0112     /**
0113        * The function will return immediately and iAction will either
0114        * process concurrently with the calling thread or wait until the
0115        * protected resource becomes available or until a CPU becomes available.
0116        * \param[in] iAction Must be a functor that takes no arguments and return no values.
0117        */
0118     template <typename T>
0119     void push(oneapi::tbb::task_group&, const T& iAction);
0120 
0121   private:
0122     /** Base class for all tasks held by the SerialTaskQueue */
0123     class TaskBase {
0124       friend class SerialTaskQueue;
0125 
0126       oneapi::tbb::task_group* group() { return m_group; }
0127       virtual void execute() = 0;
0128 
0129     public:
0130       virtual ~TaskBase() = default;
0131 
0132     protected:
0133       explicit TaskBase(oneapi::tbb::task_group* iGroup) : m_group(iGroup) {}
0134 
0135     private:
0136       oneapi::tbb::task_group* m_group;
0137     };
0138 
0139     template <typename T>
0140     class QueuedTask : public TaskBase {
0141     public:
0142       QueuedTask(oneapi::tbb::task_group& iGroup, const T& iAction) : TaskBase(&iGroup), m_action(iAction) {}
0143 
0144     private:
0145       void execute() final;
0146 
0147       T m_action;
0148     };
0149 
0150     friend class TaskBase;
0151 
0152     void pushTask(TaskBase*);
0153     TaskBase* pushAndGetNextTask(TaskBase*);
0154     TaskBase* finishedTask();
0155     //returns nullptr if a task is already being processed
0156     TaskBase* pickNextTask();
0157 
0158     void spawn(TaskBase&);
0159 
0160     // ---------- member data --------------------------------
0161     oneapi::tbb::concurrent_queue<TaskBase*> m_tasks;
0162     std::atomic<bool> m_taskChosen;
0163     std::atomic<unsigned long> m_pauseCount;
0164   };
0165 
0166   template <typename T>
0167   void SerialTaskQueue::push(oneapi::tbb::task_group& iGroup, const T& iAction) {
0168     QueuedTask<T>* pTask{new QueuedTask<T>{iGroup, iAction}};
0169     pushTask(pTask);
0170   }
0171 
0172   template <typename T>
0173   void SerialTaskQueue::QueuedTask<T>::execute() {
0174     // Exception has to swallowed in order to avoid throwing from execute(). The user of SerialTaskQueue should handle exceptions within m_action().
0175     CMS_SA_ALLOW try { this->m_action(); } catch (...) {
0176     }
0177   }
0178 
0179 }  // namespace edm
0180 
0181 #endif