File indexing completed on 2024-07-12 02:41:40
0001 #ifndef FWCore_Concurrency_WaitingThreadPool_h
0002 #define FWCore_Concurrency_WaitingThreadPool_h
0003
0004 #include "FWCore/Utilities/interface/ConvertException.h"
0005 #include "FWCore/Utilities/interface/ReusableObjectHolder.h"
0006 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0007
0008 #include <condition_variable>
0009 #include <mutex>
0010 #include <thread>
0011
0012 namespace edm {
0013 namespace impl {
0014 class WaitingThread {
0015 public:
0016 WaitingThread();
0017 ~WaitingThread() noexcept;
0018
0019 WaitingThread(WaitingThread const&) = delete;
0020 WaitingThread& operator=(WaitingThread&&) = delete;
0021 WaitingThread(WaitingThread&&) = delete;
0022 WaitingThread& operator=(WaitingThread const&) = delete;
0023
0024 template <typename F, typename G>
0025 void run(WaitingTaskWithArenaHolder holder,
0026 F&& func,
0027 G&& errorContextFunc,
0028 std::shared_ptr<WaitingThread> thisPtr) {
0029 std::unique_lock lk(mutex_);
0030 func_ = [holder = std::move(holder),
0031 func = std::forward<F>(func),
0032 errorContext = std::forward<G>(errorContextFunc)]() mutable {
0033 try {
0034 convertException::wrap([&func]() { func(); });
0035 } catch (cms::Exception& e) {
0036 e.addContext(errorContext());
0037
0038
0039
0040 holder.presetTaskAsFailed(std::current_exception());
0041 }
0042 };
0043 thisPtr_ = std::move(thisPtr);
0044 cond_.notify_one();
0045 }
0046
0047 private:
0048 void stopThread() {
0049 std::unique_lock lk(mutex_);
0050 stopThread_ = true;
0051 cond_.notify_one();
0052 }
0053
0054 void threadLoop() noexcept;
0055
0056 std::thread thread_;
0057 std::mutex mutex_;
0058 std::condition_variable cond_;
0059 CMS_THREAD_GUARD(mutex_) std::function<void()> func_;
0060
0061
0062 CMS_THREAD_GUARD(mutex_) std::shared_ptr<WaitingThread> thisPtr_;
0063 CMS_THREAD_GUARD(mutex_) bool stopThread_ = false;
0064 };
0065 }
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082 class WaitingThreadPool {
0083 public:
0084 WaitingThreadPool() = default;
0085 WaitingThreadPool(WaitingThreadPool const&) = delete;
0086 WaitingThreadPool& operator=(WaitingThreadPool const&) = delete;
0087 WaitingThreadPool(WaitingThreadPool&&) = delete;
0088 WaitingThreadPool& operator=(WaitingThreadPool&&) = delete;
0089
0090
0091
0092
0093
0094
0095
0096
0097
0098 template <typename F, typename G>
0099 void runAsync(WaitingTaskWithArenaHolder holder, F&& func, G&& errorContextFunc) {
0100 auto thread = pool_.makeOrGet([]() { return std::make_unique<impl::WaitingThread>(); });
0101 thread->run(std::move(holder), std::forward<F>(func), std::forward<G>(errorContextFunc), std::move(thread));
0102 }
0103
0104 private:
0105 edm::ReusableObjectHolder<impl::WaitingThread> pool_;
0106 };
0107 }
0108
0109 #endif