File indexing completed on 2024-04-06 12:11:56
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #include "oneapi/tbb/task.h"
0018 #include <cassert>
0019 #include <memory>
0020
0021 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0022 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0023 #include "FWCore/Concurrency/interface/hardware_pause.h"
0024 #include "FWCore/Utilities/interface/Likely.h"
0025
0026 using namespace edm;
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038 WaitingTaskList::WaitingTaskList(unsigned int iInitialSize)
0039 : m_head{nullptr},
0040 m_nodeCache{new WaitNode[iInitialSize]},
0041 m_nodeCacheSize{iInitialSize},
0042 m_lastAssignedCacheIndex{0},
0043 m_waiting{true} {
0044 auto nodeCache = m_nodeCache.get();
0045 for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
0046 it->m_fromCache = true;
0047 }
0048 }
0049
0050
0051
0052
0053 void WaitingTaskList::reset() {
0054 m_exceptionPtr = std::exception_ptr{};
0055 unsigned int nSeenTasks = m_lastAssignedCacheIndex;
0056 m_lastAssignedCacheIndex = 0;
0057 assert(m_head == nullptr);
0058 if (nSeenTasks > m_nodeCacheSize) {
0059
0060
0061 m_nodeCacheSize = nSeenTasks;
0062 m_nodeCache = std::make_unique<WaitNode[]>(nSeenTasks);
0063 auto nodeCache = m_nodeCache.get();
0064 for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
0065 it->m_fromCache = true;
0066 }
0067 }
0068
0069 m_waiting = true;
0070 }
0071
0072 WaitingTaskList::WaitNode* WaitingTaskList::createNode(oneapi::tbb::task_group* iGroup, WaitingTask* iTask) {
0073 unsigned int index = m_lastAssignedCacheIndex++;
0074
0075 WaitNode* returnValue;
0076 if (index < m_nodeCacheSize) {
0077 returnValue = m_nodeCache.get() + index;
0078 } else {
0079 returnValue = new WaitNode;
0080 returnValue->m_fromCache = false;
0081 }
0082 returnValue->m_task = iTask;
0083 returnValue->m_group = iGroup;
0084
0085
0086
0087 returnValue->m_next.store(returnValue, std::memory_order_relaxed);
0088
0089 return returnValue;
0090 }
0091
0092 void WaitingTaskList::add(WaitingTaskHolder iTask) {
0093 if (!m_waiting) {
0094 if (m_exceptionPtr) {
0095 iTask.doneWaiting(m_exceptionPtr);
0096 }
0097 } else {
0098 auto task = iTask.release_no_decrement();
0099 WaitNode* newHead = createNode(iTask.group(), task);
0100
0101
0102 WaitNode* oldHead = m_head.exchange(newHead);
0103 newHead->setNextNode(oldHead);
0104
0105
0106
0107
0108
0109
0110
0111 if (nullptr == oldHead) {
0112 newHead->setNextNode(nullptr);
0113 if (!m_waiting) {
0114
0115
0116
0117
0118
0119 announce();
0120 }
0121 }
0122 }
0123 }
0124
0125 void WaitingTaskList::add(oneapi::tbb::task_group* iGroup, WaitingTask* iTask) {
0126 iTask->increment_ref_count();
0127 if (!m_waiting) {
0128 if (UNLIKELY(bool(m_exceptionPtr))) {
0129 iTask->dependentTaskFailed(m_exceptionPtr);
0130 }
0131 if (0 == iTask->decrement_ref_count()) {
0132 iGroup->run([iTask]() {
0133 TaskSentry s{iTask};
0134 iTask->execute();
0135 });
0136 }
0137 } else {
0138 WaitNode* newHead = createNode(iGroup, iTask);
0139
0140
0141 WaitNode* oldHead = m_head.exchange(newHead);
0142 newHead->setNextNode(oldHead);
0143
0144
0145
0146
0147
0148
0149
0150 if (nullptr == oldHead) {
0151 if (!m_waiting) {
0152
0153
0154
0155
0156
0157 announce();
0158 }
0159 }
0160 }
0161 }
0162
0163 void WaitingTaskList::presetTaskAsFailed(std::exception_ptr iExcept) {
0164 if (iExcept and m_waiting) {
0165 WaitNode* node = m_head.load();
0166 while (node) {
0167 WaitNode* next;
0168 while (node == (next = node->nextNode())) {
0169 hardware_pause();
0170 }
0171 node->m_task->dependentTaskFailed(iExcept);
0172 node = next;
0173 }
0174 }
0175 }
0176
0177 void WaitingTaskList::announce() {
0178
0179
0180
0181 WaitNode* n = m_head.exchange(nullptr);
0182 WaitNode* next;
0183 while (n) {
0184
0185
0186
0187
0188 while (n == (next = n->nextNode())) {
0189 hardware_pause();
0190 }
0191 auto t = n->m_task;
0192 auto g = n->m_group;
0193 if (UNLIKELY(bool(m_exceptionPtr))) {
0194 t->dependentTaskFailed(m_exceptionPtr);
0195 }
0196 if (!n->m_fromCache) {
0197 delete n;
0198 }
0199 n = next;
0200
0201
0202
0203 if (0 == t->decrement_ref_count()) {
0204 g->run([t]() {
0205 TaskSentry s{t};
0206 t->execute();
0207 });
0208 }
0209 }
0210 }
0211
0212 void WaitingTaskList::doneWaiting(std::exception_ptr iPtr) {
0213 m_exceptionPtr = iPtr;
0214 m_waiting = false;
0215 announce();
0216 }