WaitingThread

WaitingThreadPool

Macros

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
#ifndef FWCore_Concurrency_WaitingThreadPool_h
#define FWCore_Concurrency_WaitingThreadPool_h

#include "FWCore/Utilities/interface/ConvertException.h"
#include "FWCore/Utilities/interface/ReusableObjectHolder.h"
#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"

#include <condition_variable>
#include <mutex>
#include <thread>

namespace edm {
  namespace impl {
    class WaitingThread {
    public:
      WaitingThread();
      ~WaitingThread() noexcept;

      WaitingThread(WaitingThread const&) = delete;
      WaitingThread& operator=(WaitingThread&&) = delete;
      WaitingThread(WaitingThread&&) = delete;
      WaitingThread& operator=(WaitingThread const&) = delete;

      template <typename F, typename G>
      void run(WaitingTaskWithArenaHolder holder,
               F&& func,
               G&& errorContextFunc,
               std::shared_ptr<WaitingThread> thisPtr) {
        std::unique_lock lk(mutex_);
        func_ = [holder = std::move(holder),
                 func = std::forward<F>(func),
                 errorContext = std::forward<G>(errorContextFunc)]() mutable {
          try {
            convertException::wrap([&func]() { func(); });
          } catch (cms::Exception& e) {
            e.addContext(errorContext());
            // doneWaiting() is intentionally not called here. The
            // reference count decrement must be done only in
            // threadLoop() (see the comment there)
            holder.presetTaskAsFailed(std::current_exception());
          }
        };
        thisPtr_ = std::move(thisPtr);
        cond_.notify_one();
      }

    private:
      void stopThread() {
        std::unique_lock lk(mutex_);
        stopThread_ = true;
        cond_.notify_one();
      }

      void threadLoop() noexcept;

      std::thread thread_;
      std::mutex mutex_;
      std::condition_variable cond_;
      CMS_THREAD_GUARD(mutex_) std::function<void()> func_;
      // The purpose of thisPtr_ is to keep the WaitingThread object
      // outside of the WaitingThreadPool until the func_ has returned.
      CMS_THREAD_GUARD(mutex_) std::shared_ptr<WaitingThread> thisPtr_;
      CMS_THREAD_GUARD(mutex_) bool stopThread_ = false;
    };
  }  // namespace impl

  // Provides a mechanism to run the function 'func' asynchronously,
  // i.e. without the calling thread to wait for the func() to return.
  // The func should do as little work (outside of the TBB threadpool)
  // as possible. The func must terminate eventually. The intended use
  // case are blocking synchronization calls with external entities,
  // where the calling thread is suspended while waiting.
  //
  // The func() is run in a thread that belongs to a separate pool of
  // threads than the calling thread. Remotely similar to
  // std::async(), but instead of dealing with std::futures, takes an
  // edm::WaitingTaskWithArenaHolder object, that is signaled upon the
  // func() returning or throwing an exception.
  //
  // The caller is responsible for keeping the WaitingThreadPool
  // object alive at least as long as all asynchronous calls finish.
  class WaitingThreadPool {
  public:
    WaitingThreadPool() = default;
    WaitingThreadPool(WaitingThreadPool const&) = delete;
    WaitingThreadPool& operator=(WaitingThreadPool const&) = delete;
    WaitingThreadPool(WaitingThreadPool&&) = delete;
    WaitingThreadPool& operator=(WaitingThreadPool&&) = delete;

    /**
     * \param holder           WaitingTaskWithArenaHolder object to signal the completion of 'func'
     * \param func             Function to run in a separate thread
     * \param errorContextFunc Function returning a string-like object
     *                         that is added to the context of
     *                         cms::Exception in case 'func' throws an
     *                         exception
     */
    template <typename F, typename G>
    void runAsync(WaitingTaskWithArenaHolder holder, F&& func, G&& errorContextFunc) {
      auto thread = pool_.makeOrGet([]() { return std::make_unique<impl::WaitingThread>(); });
      thread->run(std::move(holder), std::forward<F>(func), std::forward<G>(errorContextFunc), std::move(thread));
    }

  private:
    edm::ReusableObjectHolder<impl::WaitingThread> pool_;
  };
}  // namespace edm

#endif