Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-12-24 02:18:39

0001 // -*- C++ -*-
0002 //
0003 // Package:     Concurrency
0004 // Class  :     SerialTaskQueue
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Thu Feb 21 11:31:52 CST 2013
0011 // $Id$
0012 //
0013 
0014 // system include files
0015 #include "oneapi/tbb/task.h"
0016 
0017 // user include files
0018 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0019 
0020 #include "FWCore/Utilities/interface/Likely.h"
0021 
0022 using namespace edm;
0023 
0024 //
0025 // member functions
0026 //
0027 SerialTaskQueue::~SerialTaskQueue() {
0028   //be certain all tasks have completed
0029   bool isEmpty = m_tasks.empty();
0030   bool isTaskChosen = m_taskChosen;
0031   if ((not isEmpty and not isPaused()) or isTaskChosen) {
0032     oneapi::tbb::task_group g;
0033     g.run([&g, this]() {
0034       oneapi::tbb::task::suspend([&g, this](oneapi::tbb::task::suspend_point tag) {
0035         push(g, [tag]() { oneapi::tbb::task::resume(tag); });
0036       });  //suspend
0037     });    //group run
0038     g.wait();
0039   }
0040 }
0041 
0042 void SerialTaskQueue::spawn(TaskBase& iTask) {
0043   auto pTask = &iTask;
0044   iTask.group()->run([pTask, this]() {
0045     TaskBase* t = pTask;
0046     auto g = pTask->group();
0047     do {
0048       t->execute();
0049       delete t;
0050       t = finishedTask();
0051       if (t and t->group() != g) {
0052         spawn(*t);
0053         t = nullptr;
0054       }
0055     } while (t != nullptr);
0056   });
0057 }
0058 
0059 bool SerialTaskQueue::resume() {
0060   if (0 == --m_pauseCount) {
0061     auto t = pickNextTask();
0062     if (nullptr != t) {
0063       spawn(*t);
0064     }
0065     return true;
0066   }
0067   return false;
0068 }
0069 
0070 void SerialTaskQueue::pushTask(TaskBase* iTask) {
0071   auto t = pushAndGetNextTask(iTask);
0072   if (nullptr != t) {
0073     spawn(*t);
0074   }
0075 }
0076 
0077 SerialTaskQueue::TaskBase* SerialTaskQueue::pushAndGetNextTask(TaskBase* iTask) {
0078   TaskBase* returnValue{nullptr};
0079   if LIKELY (nullptr != iTask) {
0080     m_tasks.push(iTask);
0081     returnValue = pickNextTask();
0082   }
0083   return returnValue;
0084 }
0085 
0086 SerialTaskQueue::TaskBase* SerialTaskQueue::finishedTask() {
0087   m_taskChosen.store(false);
0088   return pickNextTask();
0089 }
0090 
0091 SerialTaskQueue::TaskBase* SerialTaskQueue::pickNextTask() {
0092   bool expect = false;
0093   if LIKELY (0 == m_pauseCount and m_taskChosen.compare_exchange_strong(expect, true)) {
0094     TaskBase* t = nullptr;
0095     if LIKELY (m_tasks.try_pop(t)) {
0096       return t;
0097     }
0098     //no task was actually pulled
0099     m_taskChosen.store(false);
0100 
0101     //was a new entry added after we called 'try_pop' but before we did the clear?
0102     expect = false;
0103     if (not m_tasks.empty() and m_taskChosen.compare_exchange_strong(expect, true)) {
0104       t = nullptr;
0105       if (m_tasks.try_pop(t)) {
0106         return t;
0107       }
0108       //no task was still pulled since a different thread beat us to it
0109       m_taskChosen.store(false);
0110     }
0111   }
0112   return nullptr;
0113 }
0114 
0115 //
0116 // const member functions
0117 //
0118 
0119 //
0120 // static member functions
0121 //