Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
#include <alpaka/alpaka.hpp>

#include "FWCore/Concurrency/interface/Async.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/Utilities/interface/EDMException.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h"

namespace ALPAKA_ACCELERATOR_NAMESPACE {
#ifndef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
  EDMetadata::~EDMetadata() {
    // Make sure that the production of the product in the GPU is
    // complete before destructing the product. This is to make sure
    // that the EDM stream does not move to the next event before all
    // asynchronous processing of the current is complete.

    // TODO: a callback notifying a WaitingTaskHolder (or similar)
    // would avoid blocking the CPU, but would also require more work.

    // If event_ is null, the EDMetadata was either
    // default-constructed, or fully synchronized before leaving the
    // produce() call, so no synchronization is needed.
    // If the queue was re-used, then some other EDMetadata object in
    // the same edm::Event records the event_ (in the same queue) and
    // calls alpaka::wait(), and therefore this wait() call can be
    // skipped).
    if (event_ and not eventComplete_ and mayReuseQueue_) {
      // Must not throw in a destructor, and if there were an
      // exception could not really propagate it anyway.
      CMS_SA_ALLOW try { alpaka::wait(*event_); } catch (...) {
      }
    }
  }

  void EDMetadata::enqueueCallback(edm::WaitingTaskWithArenaHolder holder) {
    edm::Service<edm::Async> async;
    recordEvent();
    async->runAsync(
        std::move(holder),
        [event = event_]() mutable { alpaka::wait(*event); },
        []() { return "Enqueued via " EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE) "::EDMetadata::enqueueCallback()"; });
  }

  void EDMetadata::synchronize(EDMetadata& consumer, bool tryReuseQueue) const {
    if (*queue_ == *consumer.queue_) {
      return;
    }

    if (tryReuseQueue) {
      if (auto queue = tryReuseQueue_()) {
        consumer.queue_ = queue_;
        return;
      }
    }

    if (eventComplete_) {
      return;
    }

    // TODO: how necessary this check is?
    if (alpaka::getDev(*queue_) != alpaka::getDev(*consumer.queue_)) {
      throw edm::Exception(edm::errors::LogicError) << "Handling data from multiple devices is not yet supported";
    }

    // If the event has been discarded, the produce() function that
    // constructed this EDMetadata object did not launch any
    // asynchronous work.
    if (not event_) {
      return;
    }

    if (alpaka::isComplete(*event_)) {
      eventComplete_ = true;
    } else {
      // Event not yet occurred, so need to add synchronization
      // here. Sychronization is done by making the queue to wait
      // for an event, so all subsequent work in the queue will run
      // only after the event has "occurred" (i.e. data product
      // became available).
      alpaka::wait(*consumer.queue_, *event_);
    }
  }

  std::shared_ptr<Queue> EDMetadata::tryReuseQueue_() const {
    bool expected = true;
    if (mayReuseQueue_.compare_exchange_strong(expected, false)) {
      // If the current thread is the one flipping the flag, it may
      // reuse the queue.
      return queue_;
    }
    return nullptr;
  }
#endif
}  // namespace ALPAKA_ACCELERATOR_NAMESPACE