Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
// -*- C++ -*-
//
// Package:     Concurrency
// Class  :     SerialTaskQueue
//
// Implementation:
//     [Notes on implementation]
//
// Original Author:  Chris Jones
//         Created:  Thu Feb 21 11:31:52 CST 2013
// $Id$
//

// system include files
#include "oneapi/tbb/task_group.h"

// user include files
#include "FWCore/Concurrency/interface/SerialTaskQueue.h"

#include "FWCore/Utilities/interface/Likely.h"

using namespace edm;

//
// member functions
//
SerialTaskQueue::~SerialTaskQueue() {
  //be certain all tasks have completed
  bool isEmpty = m_tasks.empty();
  bool isTaskChosen = m_taskChosen;
  if ((not isEmpty and not isPaused()) or isTaskChosen) {
    oneapi::tbb::task_group g;
    tbb::task_handle last{g.defer([]() {})};
    push(g, [&g, &last]() { g.run(std::move(last)); });
    g.wait();
  }
}

void SerialTaskQueue::spawn(TaskBase& iTask) {
  auto pTask = &iTask;
  iTask.group()->run([pTask, this]() {
    TaskBase* t = pTask;
    auto g = pTask->group();
    do {
      t->execute();
      delete t;
      t = finishedTask();
      if (t and t->group() != g) {
        spawn(*t);
        t = nullptr;
      }
    } while (t != nullptr);
  });
}

bool SerialTaskQueue::resume() {
  if (0 == --m_pauseCount) {
    auto t = pickNextTask();
    if (nullptr != t) {
      spawn(*t);
    }
    return true;
  }
  return false;
}

void SerialTaskQueue::pushTask(TaskBase* iTask) {
  auto t = pushAndGetNextTask(iTask);
  if (nullptr != t) {
    spawn(*t);
  }
}

SerialTaskQueue::TaskBase* SerialTaskQueue::pushAndGetNextTask(TaskBase* iTask) {
  TaskBase* returnValue{nullptr};
  if LIKELY (nullptr != iTask) {
    m_tasks.push(iTask);
    returnValue = pickNextTask();
  }
  return returnValue;
}

SerialTaskQueue::TaskBase* SerialTaskQueue::finishedTask() {
  m_taskChosen.store(false);
  return pickNextTask();
}

SerialTaskQueue::TaskBase* SerialTaskQueue::pickNextTask() {
  bool expect = false;
  if LIKELY (0 == m_pauseCount and m_taskChosen.compare_exchange_strong(expect, true)) {
    TaskBase* t = nullptr;
    if LIKELY (m_tasks.try_pop(t)) {
      return t;
    }
    //no task was actually pulled
    m_taskChosen.store(false);

    //was a new entry added after we called 'try_pop' but before we did the clear?
    expect = false;
    if (not m_tasks.empty() and m_taskChosen.compare_exchange_strong(expect, true)) {
      t = nullptr;
      if (m_tasks.try_pop(t)) {
        return t;
      }
      //no task was still pulled since a different thread beat us to it
      m_taskChosen.store(false);
    }
  }
  return nullptr;
}

//
// const member functions
//

//
// static member functions
//