1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
#ifndef FWCore_Concurrency_WaitingThreadPool_h
#define FWCore_Concurrency_WaitingThreadPool_h
#include "FWCore/Utilities/interface/ConvertException.h"
#include "FWCore/Utilities/interface/ReusableObjectHolder.h"
#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
#include <condition_variable>
#include <mutex>
#include <thread>
namespace edm {
namespace impl {
class WaitingThread {
public:
WaitingThread();
~WaitingThread() noexcept;
WaitingThread(WaitingThread const&) = delete;
WaitingThread& operator=(WaitingThread&&) = delete;
WaitingThread(WaitingThread&&) = delete;
WaitingThread& operator=(WaitingThread const&) = delete;
template <typename F, typename G>
void run(WaitingTaskWithArenaHolder holder,
F&& func,
G&& errorContextFunc,
std::shared_ptr<WaitingThread> thisPtr) {
std::unique_lock lk(mutex_);
func_ = [holder = std::move(holder),
func = std::forward<F>(func),
errorContext = std::forward<G>(errorContextFunc)]() mutable {
try {
convertException::wrap([&func]() { func(); });
} catch (cms::Exception& e) {
e.addContext(errorContext());
// doneWaiting() is intentionally not called here. The
// reference count decrement must be done only in
// threadLoop() (see the comment there)
holder.presetTaskAsFailed(std::current_exception());
}
};
thisPtr_ = std::move(thisPtr);
cond_.notify_one();
}
private:
void stopThread() {
std::unique_lock lk(mutex_);
stopThread_ = true;
cond_.notify_one();
}
void threadLoop() noexcept;
std::thread thread_;
std::mutex mutex_;
std::condition_variable cond_;
CMS_THREAD_GUARD(mutex_) std::function<void()> func_;
// The purpose of thisPtr_ is to keep the WaitingThread object
// outside of the WaitingThreadPool until the func_ has returned.
CMS_THREAD_GUARD(mutex_) std::shared_ptr<WaitingThread> thisPtr_;
CMS_THREAD_GUARD(mutex_) bool stopThread_ = false;
};
} // namespace impl
// Provides a mechanism to run the function 'func' asynchronously,
// i.e. without the calling thread to wait for the func() to return.
// The func should do as little work (outside of the TBB threadpool)
// as possible. The func must terminate eventually. The intended use
// case are blocking synchronization calls with external entities,
// where the calling thread is suspended while waiting.
//
// The func() is run in a thread that belongs to a separate pool of
// threads than the calling thread. Remotely similar to
// std::async(), but instead of dealing with std::futures, takes an
// edm::WaitingTaskWithArenaHolder object, that is signaled upon the
// func() returning or throwing an exception.
//
// The caller is responsible for keeping the WaitingThreadPool
// object alive at least as long as all asynchronous calls finish.
class WaitingThreadPool {
public:
WaitingThreadPool() = default;
WaitingThreadPool(WaitingThreadPool const&) = delete;
WaitingThreadPool& operator=(WaitingThreadPool const&) = delete;
WaitingThreadPool(WaitingThreadPool&&) = delete;
WaitingThreadPool& operator=(WaitingThreadPool&&) = delete;
/**
* \param holder WaitingTaskWithArenaHolder object to signal the completion of 'func'
* \param func Function to run in a separate thread
* \param errorContextFunc Function returning a string-like object
* that is added to the context of
* cms::Exception in case 'func' throws an
* exception
*/
template <typename F, typename G>
void runAsync(WaitingTaskWithArenaHolder holder, F&& func, G&& errorContextFunc) {
auto thread = pool_.makeOrGet([]() { return std::make_unique<impl::WaitingThread>(); });
thread->run(std::move(holder), std::forward<F>(func), std::forward<G>(errorContextFunc), std::move(thread));
}
private:
edm::ReusableObjectHolder<impl::WaitingThread> pool_;
};
} // namespace edm
#endif
|