Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-07 02:29:40

0001 #include <alpaka/alpaka.hpp>
0002 
0003 #include "FWCore/Concurrency/interface/Async.h"
0004 #include "FWCore/ServiceRegistry/interface/Service.h"
0005 #include "FWCore/Utilities/interface/EDMException.h"
0006 #include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h"
0007 
0008 namespace ALPAKA_ACCELERATOR_NAMESPACE {
0009 #ifndef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
0010   EDMetadata::~EDMetadata() {
0011     // Make sure that the production of the product in the GPU is
0012     // complete before destructing the product. This is to make sure
0013     // that the EDM stream does not move to the next event before all
0014     // asynchronous processing of the current is complete.
0015 
0016     // TODO: a callback notifying a WaitingTaskHolder (or similar)
0017     // would avoid blocking the CPU, but would also require more work.
0018 
0019     // If event_ is null, the EDMetadata was either
0020     // default-constructed, or fully synchronized before leaving the
0021     // produce() call, so no synchronization is needed.
0022     // If the queue was re-used, then some other EDMetadata object in
0023     // the same edm::Event records the event_ (in the same queue) and
0024     // calls alpaka::wait(), and therefore this wait() call can be
0025     // skipped).
0026     if (event_ and not eventComplete_ and mayReuseQueue_) {
0027       // Must not throw in a destructor, and if there were an
0028       // exception could not really propagate it anyway.
0029       CMS_SA_ALLOW try { alpaka::wait(*event_); } catch (...) {
0030       }
0031     }
0032   }
0033 
0034   void EDMetadata::enqueueCallback(edm::WaitingTaskWithArenaHolder holder) {
0035     edm::Service<edm::Async> async;
0036     recordEvent();
0037     async->runAsync(
0038         std::move(holder),
0039         [event = event_]() mutable { alpaka::wait(*event); },
0040         []() { return "Enqueued via " EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE) "::EDMetadata::enqueueCallback()"; });
0041   }
0042 
0043   void EDMetadata::synchronize(EDMetadata& consumer, bool tryReuseQueue) const {
0044     if (*queue_ == *consumer.queue_) {
0045       return;
0046     }
0047 
0048     if (tryReuseQueue) {
0049       if (auto queue = tryReuseQueue_()) {
0050         consumer.queue_ = queue_;
0051         return;
0052       }
0053     }
0054 
0055     if (eventComplete_) {
0056       return;
0057     }
0058 
0059     // TODO: how necessary this check is?
0060     if (alpaka::getDev(*queue_) != alpaka::getDev(*consumer.queue_)) {
0061       throw edm::Exception(edm::errors::LogicError) << "Handling data from multiple devices is not yet supported";
0062     }
0063 
0064     // If the event has been discarded, the produce() function that
0065     // constructed this EDMetadata object did not launch any
0066     // asynchronous work.
0067     if (not event_) {
0068       return;
0069     }
0070 
0071     if (alpaka::isComplete(*event_)) {
0072       eventComplete_ = true;
0073     } else {
0074       // Event not yet occurred, so need to add synchronization
0075       // here. Sychronization is done by making the queue to wait
0076       // for an event, so all subsequent work in the queue will run
0077       // only after the event has "occurred" (i.e. data product
0078       // became available).
0079       alpaka::wait(*consumer.queue_, *event_);
0080     }
0081   }
0082 
0083   std::shared_ptr<Queue> EDMetadata::tryReuseQueue_() const {
0084     bool expected = true;
0085     if (mayReuseQueue_.compare_exchange_strong(expected, false)) {
0086       // If the current thread is the one flipping the flag, it may
0087       // reuse the queue.
0088       return queue_;
0089     }
0090     return nullptr;
0091   }
0092 #endif
0093 }  // namespace ALPAKA_ACCELERATOR_NAMESPACE