File indexing completed on 2024-04-06 12:12:38
0001
0002 #include "WaitingServer.h"
0003
0004 #include <exception>
0005
0006 namespace edmtest {
0007 namespace test_acquire {
0008
0009 void WaitingServer::requestValuesAsync(unsigned int dataID,
0010 std::vector<int> const* iIn,
0011 std::vector<int>* iOut,
0012 edm::WaitingTaskWithArenaHolder holder) {
0013 auto& streamData = m_perStream.at(dataID);
0014
0015 streamData.in_ = iIn;
0016 streamData.out_ = iOut;
0017 streamData.holder_ = std::move(holder);
0018 {
0019 std::lock_guard<std::mutex> guard(m_mutex);
0020 m_waitingStreams.push_back(dataID);
0021 }
0022 m_cond.notify_one();
0023 }
0024
0025 void WaitingServer::start() {
0026 m_thread = std::make_unique<std::thread>([this]() { serverDoWork(); });
0027 }
0028
0029 void WaitingServer::stop() {
0030 m_shouldStop = true;
0031 if (m_thread) {
0032 m_thread->join();
0033 m_thread.reset();
0034 }
0035 }
0036
0037 bool WaitingServer::readyForWork() const { return m_waitingStreams.size() >= m_minNumStreamsBeforeDoingWork; }
0038
0039 void WaitingServer::serverDoWork() {
0040 while (not m_shouldStop) {
0041 std::vector<unsigned int> streamsToUse;
0042 {
0043 std::unique_lock<std::mutex> lk(m_mutex);
0044
0045 m_cond.wait_for(lk, std::chrono::seconds(m_secondsToWait), [this]() -> bool { return readyForWork(); });
0046
0047
0048
0049
0050 streamsToUse.swap(m_waitingStreams);
0051 lk.unlock();
0052 }
0053
0054
0055
0056 for (auto index : streamsToUse) {
0057 auto& streamData = m_perStream.at(index);
0058
0059 std::exception_ptr exceptionPtr;
0060 try {
0061 for (auto v : *streamData.in_) {
0062 streamData.out_->push_back(v + 1);
0063 }
0064 } catch (...) {
0065 exceptionPtr = std::current_exception();
0066 }
0067 streamData.holder_.doneWaiting(exceptionPtr);
0068 }
0069 }
0070 }
0071 }
0072 }