1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
#include "WaitingServer.h"
#include <exception>
namespace edmtest {
namespace test_acquire {
void WaitingServer::requestValuesAsync(unsigned int dataID,
std::vector<int> const* iIn,
std::vector<int>* iOut,
edm::WaitingTaskWithArenaHolder holder) {
auto& streamData = m_perStream.at(dataID);
streamData.in_ = iIn;
streamData.out_ = iOut;
streamData.holder_ = std::move(holder);
{
std::lock_guard<std::mutex> guard(m_mutex);
m_waitingStreams.push_back(dataID);
}
m_cond.notify_one(); //wakes up the server thread
}
void WaitingServer::start() {
m_thread = std::make_unique<std::thread>([this]() { serverDoWork(); });
}
void WaitingServer::stop() {
m_shouldStop = true;
if (m_thread) {
m_thread->join();
m_thread.reset();
}
}
bool WaitingServer::readyForWork() const { return m_waitingStreams.size() >= m_minNumStreamsBeforeDoingWork; }
void WaitingServer::serverDoWork() {
while (not m_shouldStop) {
std::vector<unsigned int> streamsToUse;
{
std::unique_lock<std::mutex> lk(m_mutex);
m_cond.wait_for(lk, std::chrono::seconds(m_secondsToWait), [this]() -> bool { return readyForWork(); });
// Once we know which streams have given us data
// we can release the lock and let other streams
// set their data
streamsToUse.swap(m_waitingStreams);
lk.unlock();
}
// Here is the work that the server does for the modules
// it will just add 1 to each value it has been given
for (auto index : streamsToUse) {
auto& streamData = m_perStream.at(index);
std::exception_ptr exceptionPtr;
try {
for (auto v : *streamData.in_) {
streamData.out_->push_back(v + 1);
}
} catch (...) {
exceptionPtr = std::current_exception();
}
streamData.holder_.doneWaiting(exceptionPtr);
}
}
}
} // namespace test_acquire
} // namespace edmtest
|