Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:55

0001 #ifndef FWCore_Concurrency_SerialTaskQueueChain_h
0002 #define FWCore_Concurrency_SerialTaskQueueChain_h
0003 // -*- C++ -*-
0004 //
0005 // Package:     FWCore/Concurrency
0006 // Class  :     SerialTaskQueueChain
0007 //
0008 /**\class SerialTaskQueueChain SerialTaskQueueChain.h "SerialTaskQueueChain.h"
0009 
0010  Description: [one line class summary]
0011 
0012  Usage:
0013     <usage>
0014 
0015 */
0016 //
0017 // Original Author:  root
0018 //         Created:  Mon, 15 Aug 2016 18:04:02 GMT
0019 //
0020 
0021 // system include files
0022 #include <cassert>
0023 #include <memory>
0024 #include <vector>
0025 
0026 // user include files
0027 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0028 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0029 
0030 // forward declarations
0031 namespace edm {
0032   class SerialTaskQueueChain {
0033   public:
0034     SerialTaskQueueChain() {}
0035     explicit SerialTaskQueueChain(std::vector<std::shared_ptr<SerialTaskQueue>> iQueues)
0036         : m_queues(std::move(iQueues)) {}
0037 
0038     SerialTaskQueueChain(const SerialTaskQueueChain&) = delete;
0039     SerialTaskQueueChain& operator=(const SerialTaskQueueChain&) = delete;
0040     SerialTaskQueueChain(SerialTaskQueueChain&& iOld)
0041         : m_queues(std::move(iOld.m_queues)), m_outstandingTasks{iOld.m_outstandingTasks.load()} {}
0042 
0043     SerialTaskQueueChain& operator=(SerialTaskQueueChain&& iOld) {
0044       m_queues = std::move(iOld.m_queues);
0045       m_outstandingTasks.store(iOld.m_outstandingTasks.load());
0046       return *this;
0047     }
0048 
0049     /// asynchronously pushes functor iAction into queue
0050     /**
0051      * The function will return immediately and iAction will either
0052      * process concurrently with the calling thread or wait until the
0053      * protected resource becomes available or until a CPU becomes available.
0054      * \param[in] iAction Must be a functor that takes no arguments and return no values.
0055      */
0056     template <typename T>
0057     void push(oneapi::tbb::task_group& iGroup, T&& iAction);
0058 
0059     unsigned long outstandingTasks() const { return m_outstandingTasks; }
0060     std::size_t numberOfQueues() const { return m_queues.size(); }
0061 
0062   private:
0063     // ---------- member data --------------------------------
0064     std::vector<std::shared_ptr<SerialTaskQueue>> m_queues;
0065     std::atomic<unsigned long> m_outstandingTasks{0};
0066 
0067     template <typename T>
0068     void passDownChain(unsigned int iIndex, oneapi::tbb::task_group& iGroup, T&& iAction);
0069 
0070     template <typename T>
0071     void actionToRun(T&& iAction);
0072   };
0073 
0074   template <typename T>
0075   void SerialTaskQueueChain::push(oneapi::tbb::task_group& iGroup, T&& iAction) {
0076     ++m_outstandingTasks;
0077     if (m_queues.size() == 1) {
0078       m_queues[0]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
0079     } else {
0080       assert(!m_queues.empty());
0081       m_queues[0]->push(iGroup, [this, &iGroup, iAction]() mutable { this->passDownChain(1, iGroup, iAction); });
0082     }
0083   }
0084 
0085   template <typename T>
0086   void SerialTaskQueueChain::passDownChain(unsigned int iQueueIndex, oneapi::tbb::task_group& iGroup, T&& iAction) {
0087     //Have to be sure the queue associated to this running task
0088     // does not attempt to start another task
0089     m_queues[iQueueIndex - 1]->pause();
0090     //is this the last queue?
0091     if (iQueueIndex + 1 == m_queues.size()) {
0092       m_queues[iQueueIndex]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
0093     } else {
0094       auto nextQueue = iQueueIndex + 1;
0095       m_queues[iQueueIndex]->push(
0096           iGroup, [this, nextQueue, &iGroup, iAction]() mutable { this->passDownChain(nextQueue, iGroup, iAction); });
0097     }
0098   }
0099 
0100   template <typename T>
0101   void SerialTaskQueueChain::actionToRun(T&& iAction) {
0102     //even if an exception happens we will resume the queues.
0103     using Queues = std::vector<std::shared_ptr<SerialTaskQueue>>;
0104     auto sentryAction = [](SerialTaskQueueChain* iChain) {
0105       auto& vec = iChain->m_queues;
0106       for (auto it = vec.rbegin() + 1; it != vec.rend(); ++it) {
0107         (*it)->resume();
0108       }
0109       --(iChain->m_outstandingTasks);
0110     };
0111 
0112     std::unique_ptr<SerialTaskQueueChain, decltype(sentryAction)> sentry(this, sentryAction);
0113     iAction();
0114   }
0115 }  // namespace edm
0116 
0117 #endif