Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-11-24 00:02:23

0001 #include "FWCore/Utilities/interface/EDMException.h"
0002 #include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h"
0003 
0004 namespace ALPAKA_ACCELERATOR_NAMESPACE {
0005 #ifndef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
0006   EDMetadata::~EDMetadata() {
0007     // Make sure that the production of the product in the GPU is
0008     // complete before destructing the product. This is to make sure
0009     // that the EDM stream does not move to the next event before all
0010     // asynchronous processing of the current is complete.
0011 
0012     // TODO: a callback notifying a WaitingTaskHolder (or similar)
0013     // would avoid blocking the CPU, but would also require more work.
0014 
0015     if (event_) {
0016       // Must not throw in a destructor, and if there were an
0017       // exception could not really propagate it anyway.
0018       CMS_SA_ALLOW try { alpaka::wait(*event_); } catch (...) {
0019       }
0020     }
0021   }
0022 
0023   void EDMetadata::enqueueCallback(edm::WaitingTaskWithArenaHolder holder) {
0024     alpaka::enqueue(*queue_, alpaka::HostOnlyTask([holder = std::move(holder)]() {
0025       // The functor is required to be const, but the original waitingTaskHolder_
0026       // needs to be notified...
0027       const_cast<edm::WaitingTaskWithArenaHolder&>(holder).doneWaiting(nullptr);
0028     }));
0029   }
0030 
0031   void EDMetadata::synchronize(EDMetadata& consumer, bool tryReuseQueue) const {
0032     if (*queue_ == *consumer.queue_) {
0033       return;
0034     }
0035 
0036     if (tryReuseQueue) {
0037       if (auto queue = tryReuseQueue_()) {
0038         consumer.queue_ = queue_;
0039         return;
0040       }
0041     }
0042 
0043     // TODO: how necessary this check is?
0044     if (alpaka::getDev(*queue_) != alpaka::getDev(*consumer.queue_)) {
0045       throw edm::Exception(edm::errors::LogicError) << "Handling data from multiple devices is not yet supported";
0046     }
0047 
0048     if (not alpaka::isComplete(*event_)) {
0049       // Event not yet occurred, so need to add synchronization
0050       // here. Sychronization is done by making the queue to wait
0051       // for an event, so all subsequent work in the queue will run
0052       // only after the event has "occurred" (i.e. data product
0053       // became available).
0054       alpaka::wait(*consumer.queue_, *event_);
0055     }
0056   }
0057 
0058   std::shared_ptr<Queue> EDMetadata::tryReuseQueue_() const {
0059     bool expected = true;
0060     if (mayReuseQueue_.compare_exchange_strong(expected, false)) {
0061       // If the current thread is the one flipping the flag, it may
0062       // reuse the queue.
0063       return queue_;
0064     }
0065     return nullptr;
0066   }
0067 #endif
0068 }  // namespace ALPAKA_ACCELERATOR_NAMESPACE