Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:56

0001 // -*- C++ -*-
0002 //
0003 // Package:     Concurrency
0004 // Class  :     WaitingTaskList
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Thu Feb 21 13:46:45 CST 2013
0011 // $Id$
0012 //
0013 
0014 // system include files
0015 
0016 // user include files
0017 #include "oneapi/tbb/task.h"
0018 #include <cassert>
0019 #include <memory>
0020 
0021 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0022 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0023 #include "FWCore/Concurrency/interface/hardware_pause.h"
0024 #include "FWCore/Utilities/interface/Likely.h"
0025 
0026 using namespace edm;
0027 //
0028 // constants, enums and typedefs
0029 //
0030 
0031 //
0032 // static data member definitions
0033 //
0034 
0035 //
0036 // constructors and destructor
0037 //
0038 WaitingTaskList::WaitingTaskList(unsigned int iInitialSize)
0039     : m_head{nullptr},
0040       m_nodeCache{new WaitNode[iInitialSize]},
0041       m_nodeCacheSize{iInitialSize},
0042       m_lastAssignedCacheIndex{0},
0043       m_waiting{true} {
0044   auto nodeCache = m_nodeCache.get();
0045   for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
0046     it->m_fromCache = true;
0047   }
0048 }
0049 
0050 //
0051 // member functions
0052 //
0053 void WaitingTaskList::reset() {
0054   m_exceptionPtr = std::exception_ptr{};
0055   unsigned int nSeenTasks = m_lastAssignedCacheIndex;
0056   m_lastAssignedCacheIndex = 0;
0057   assert(m_head == nullptr);
0058   if (nSeenTasks > m_nodeCacheSize) {
0059     //need to expand so next time we don't have to do any
0060     // memory requests
0061     m_nodeCacheSize = nSeenTasks;
0062     m_nodeCache = std::make_unique<WaitNode[]>(nSeenTasks);
0063     auto nodeCache = m_nodeCache.get();
0064     for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
0065       it->m_fromCache = true;
0066     }
0067   }
0068   //this will make sure all cores see the changes
0069   m_waiting = true;
0070 }
0071 
0072 WaitingTaskList::WaitNode* WaitingTaskList::createNode(oneapi::tbb::task_group* iGroup, WaitingTask* iTask) {
0073   unsigned int index = m_lastAssignedCacheIndex++;
0074 
0075   WaitNode* returnValue;
0076   if (index < m_nodeCacheSize) {
0077     returnValue = m_nodeCache.get() + index;
0078   } else {
0079     returnValue = new WaitNode;
0080     returnValue->m_fromCache = false;
0081   }
0082   returnValue->m_task = iTask;
0083   returnValue->m_group = iGroup;
0084   //No other thread can see m_next yet. The caller to create node
0085   // will be doing a synchronization operation anyway which will
0086   // make sure m_task and m_next are synched across threads
0087   returnValue->m_next.store(returnValue, std::memory_order_relaxed);
0088 
0089   return returnValue;
0090 }
0091 
0092 void WaitingTaskList::add(WaitingTaskHolder iTask) {
0093   if (!m_waiting) {
0094     if (m_exceptionPtr) {
0095       iTask.doneWaiting(m_exceptionPtr);
0096     }
0097   } else {
0098     auto task = iTask.release_no_decrement();
0099     WaitNode* newHead = createNode(iTask.group(), task);
0100     //This exchange is sequentially consistent thereby
0101     // ensuring ordering between it and setNextNode
0102     WaitNode* oldHead = m_head.exchange(newHead);
0103     newHead->setNextNode(oldHead);
0104 
0105     //For the case where oldHead != nullptr,
0106     // even if 'm_waiting' changed, we don't
0107     // have to recheck since we beat 'announce()' in
0108     // the ordering of 'm_head.exchange' call so iTask
0109     // is guaranteed to be in the link list
0110 
0111     if (nullptr == oldHead) {
0112       newHead->setNextNode(nullptr);
0113       if (!m_waiting) {
0114         //if finished waiting right before we did the
0115         // exchange our task will not be run. Also,
0116         // additional threads may be calling add() and swapping
0117         // heads and linking us to the new head.
0118         // It is safe to call announce from multiple threads
0119         announce();
0120       }
0121     }
0122   }
0123 }
0124 
0125 void WaitingTaskList::add(oneapi::tbb::task_group* iGroup, WaitingTask* iTask) {
0126   iTask->increment_ref_count();
0127   if (!m_waiting) {
0128     if (UNLIKELY(bool(m_exceptionPtr))) {
0129       iTask->dependentTaskFailed(m_exceptionPtr);
0130     }
0131     if (0 == iTask->decrement_ref_count()) {
0132       iGroup->run([iTask]() {
0133         TaskSentry s{iTask};
0134         iTask->execute();
0135       });
0136     }
0137   } else {
0138     WaitNode* newHead = createNode(iGroup, iTask);
0139     //This exchange is sequentially consistent thereby
0140     // ensuring ordering between it and setNextNode
0141     WaitNode* oldHead = m_head.exchange(newHead);
0142     newHead->setNextNode(oldHead);
0143 
0144     //For the case where oldHead != nullptr,
0145     // even if 'm_waiting' changed, we don't
0146     // have to recheck since we beat 'announce()' in
0147     // the ordering of 'm_head.exchange' call so iTask
0148     // is guaranteed to be in the link list
0149 
0150     if (nullptr == oldHead) {
0151       if (!m_waiting) {
0152         //if finished waiting right before we did the
0153         // exchange our task will not be run. Also,
0154         // additional threads may be calling add() and swapping
0155         // heads and linking us to the new head.
0156         // It is safe to call announce from multiple threads
0157         announce();
0158       }
0159     }
0160   }
0161 }
0162 
0163 void WaitingTaskList::presetTaskAsFailed(std::exception_ptr iExcept) {
0164   if (iExcept and m_waiting) {
0165     WaitNode* node = m_head.load();
0166     while (node) {
0167       WaitNode* next;
0168       while (node == (next = node->nextNode())) {
0169         hardware_pause();
0170       }
0171       node->m_task->dependentTaskFailed(iExcept);
0172       node = next;
0173     }
0174   }
0175 }
0176 
0177 void WaitingTaskList::announce() {
0178   //Need a temporary storage since one of these tasks could
0179   // cause the next event to start processing which would refill
0180   // this waiting list after it has been reset
0181   WaitNode* n = m_head.exchange(nullptr);
0182   WaitNode* next;
0183   while (n) {
0184     //it is possible that 'WaitingTaskList::add' is running in a different
0185     // thread and we have a new 'head' but the old head has not yet been
0186     // attached to the new head (we identify this since 'nextNode' will return itself).
0187     //  In that case we have to wait until the link has been established before going on.
0188     while (n == (next = n->nextNode())) {
0189       hardware_pause();
0190     }
0191     auto t = n->m_task;
0192     auto g = n->m_group;
0193     if (UNLIKELY(bool(m_exceptionPtr))) {
0194       t->dependentTaskFailed(m_exceptionPtr);
0195     }
0196     if (!n->m_fromCache) {
0197       delete n;
0198     }
0199     n = next;
0200 
0201     //the task may indirectly call WaitingTaskList::reset
0202     // so we need to call spawn after we are done using the node.
0203     if (0 == t->decrement_ref_count()) {
0204       g->run([t]() {
0205         TaskSentry s{t};
0206         t->execute();
0207       });
0208     }
0209   }
0210 }
0211 
0212 void WaitingTaskList::doneWaiting(std::exception_ptr iPtr) {
0213   m_exceptionPtr = iPtr;
0214   m_waiting = false;
0215   announce();
0216 }