Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:56

0001 // -*- C++ -*-
0002 //
0003 // Package:     Concurrency
0004 // Class  :     SerialTaskQueue
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Thu Feb 21 11:31:52 CST 2013
0011 // $Id$
0012 //
0013 
0014 // system include files
0015 #include "oneapi/tbb/task_group.h"
0016 
0017 // user include files
0018 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0019 
0020 #include "FWCore/Utilities/interface/Likely.h"
0021 
0022 using namespace edm;
0023 
0024 //
0025 // member functions
0026 //
0027 SerialTaskQueue::~SerialTaskQueue() {
0028   //be certain all tasks have completed
0029   bool isEmpty = m_tasks.empty();
0030   bool isTaskChosen = m_taskChosen;
0031   if ((not isEmpty and not isPaused()) or isTaskChosen) {
0032     oneapi::tbb::task_group g;
0033     tbb::task_handle last{g.defer([]() {})};
0034     push(g, [&g, &last]() { g.run(std::move(last)); });
0035     g.wait();
0036   }
0037 }
0038 
0039 void SerialTaskQueue::spawn(TaskBase& iTask) {
0040   auto pTask = &iTask;
0041   iTask.group()->run([pTask, this]() {
0042     TaskBase* t = pTask;
0043     auto g = pTask->group();
0044     do {
0045       t->execute();
0046       delete t;
0047       t = finishedTask();
0048       if (t and t->group() != g) {
0049         spawn(*t);
0050         t = nullptr;
0051       }
0052     } while (t != nullptr);
0053   });
0054 }
0055 
0056 bool SerialTaskQueue::resume() {
0057   if (0 == --m_pauseCount) {
0058     auto t = pickNextTask();
0059     if (nullptr != t) {
0060       spawn(*t);
0061     }
0062     return true;
0063   }
0064   return false;
0065 }
0066 
0067 void SerialTaskQueue::pushTask(TaskBase* iTask) {
0068   auto t = pushAndGetNextTask(iTask);
0069   if (nullptr != t) {
0070     spawn(*t);
0071   }
0072 }
0073 
0074 SerialTaskQueue::TaskBase* SerialTaskQueue::pushAndGetNextTask(TaskBase* iTask) {
0075   TaskBase* returnValue{nullptr};
0076   if LIKELY (nullptr != iTask) {
0077     m_tasks.push(iTask);
0078     returnValue = pickNextTask();
0079   }
0080   return returnValue;
0081 }
0082 
0083 SerialTaskQueue::TaskBase* SerialTaskQueue::finishedTask() {
0084   m_taskChosen.store(false);
0085   return pickNextTask();
0086 }
0087 
0088 SerialTaskQueue::TaskBase* SerialTaskQueue::pickNextTask() {
0089   bool expect = false;
0090   if LIKELY (0 == m_pauseCount and m_taskChosen.compare_exchange_strong(expect, true)) {
0091     TaskBase* t = nullptr;
0092     if LIKELY (m_tasks.try_pop(t)) {
0093       return t;
0094     }
0095     //no task was actually pulled
0096     m_taskChosen.store(false);
0097 
0098     //was a new entry added after we called 'try_pop' but before we did the clear?
0099     expect = false;
0100     if (not m_tasks.empty() and m_taskChosen.compare_exchange_strong(expect, true)) {
0101       t = nullptr;
0102       if (m_tasks.try_pop(t)) {
0103         return t;
0104       }
0105       //no task was still pulled since a different thread beat us to it
0106       m_taskChosen.store(false);
0107     }
0108   }
0109   return nullptr;
0110 }
0111 
0112 //
0113 // const member functions
0114 //
0115 
0116 //
0117 // static member functions
0118 //