Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-10-25 09:50:23

0001 #include <alpaka/alpaka.hpp>
0002 
0003 #include "FWCore/Utilities/interface/EDMException.h"
0004 #include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h"
0005 
0006 namespace ALPAKA_ACCELERATOR_NAMESPACE {
0007 #ifndef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
0008   EDMetadata::~EDMetadata() {
0009     // Make sure that the production of the product in the GPU is
0010     // complete before destructing the product. This is to make sure
0011     // that the EDM stream does not move to the next event before all
0012     // asynchronous processing of the current is complete.
0013 
0014     // TODO: a callback notifying a WaitingTaskHolder (or similar)
0015     // would avoid blocking the CPU, but would also require more work.
0016 
0017     if (event_) {
0018       // Must not throw in a destructor, and if there were an
0019       // exception could not really propagate it anyway.
0020       CMS_SA_ALLOW try { alpaka::wait(*event_); } catch (...) {
0021       }
0022     }
0023   }
0024 
0025   void EDMetadata::enqueueCallback(edm::WaitingTaskWithArenaHolder holder) {
0026     alpaka::enqueue(*queue_, alpaka::HostOnlyTask([holder = std::move(holder)]() {
0027       // The functor is required to be const, but the original waitingTaskHolder_
0028       // needs to be notified...
0029       const_cast<edm::WaitingTaskWithArenaHolder&>(holder).doneWaiting(nullptr);
0030     }));
0031   }
0032 
0033   void EDMetadata::synchronize(EDMetadata& consumer, bool tryReuseQueue) const {
0034     if (*queue_ == *consumer.queue_) {
0035       return;
0036     }
0037 
0038     if (tryReuseQueue) {
0039       if (auto queue = tryReuseQueue_()) {
0040         consumer.queue_ = queue_;
0041         return;
0042       }
0043     }
0044 
0045     // TODO: how necessary this check is?
0046     if (alpaka::getDev(*queue_) != alpaka::getDev(*consumer.queue_)) {
0047       throw edm::Exception(edm::errors::LogicError) << "Handling data from multiple devices is not yet supported";
0048     }
0049 
0050     if (not alpaka::isComplete(*event_)) {
0051       // Event not yet occurred, so need to add synchronization
0052       // here. Sychronization is done by making the queue to wait
0053       // for an event, so all subsequent work in the queue will run
0054       // only after the event has "occurred" (i.e. data product
0055       // became available).
0056       alpaka::wait(*consumer.queue_, *event_);
0057     }
0058   }
0059 
0060   std::shared_ptr<Queue> EDMetadata::tryReuseQueue_() const {
0061     bool expected = true;
0062     if (mayReuseQueue_.compare_exchange_strong(expected, false)) {
0063       // If the current thread is the one flipping the flag, it may
0064       // reuse the queue.
0065       return queue_;
0066     }
0067     return nullptr;
0068   }
0069 #endif
0070 }  // namespace ALPAKA_ACCELERATOR_NAMESPACE