Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-02-14 13:28:38

0001 
0002 #include "FWCore/Integration/test/WaitingServer.h"
0003 
0004 #include <exception>
0005 
0006 namespace edmtest {
0007   namespace test_acquire {
0008 
0009     void WaitingServer::requestValuesAsync(unsigned int dataID,
0010                                            std::vector<int> const* iIn,
0011                                            std::vector<int>* iOut,
0012                                            edm::WaitingTaskWithArenaHolder holder) {
0013       auto& streamData = m_perStream.at(dataID);
0014 
0015       streamData.in_ = iIn;
0016       streamData.out_ = iOut;
0017       streamData.holder_ = std::move(holder);
0018       {
0019         std::lock_guard<std::mutex> guard(m_mutex);
0020         m_waitingStreams.push_back(dataID);
0021       }
0022       m_cond.notify_one();  //wakes up the server thread
0023     }
0024 
0025     void WaitingServer::start() {
0026       m_thread = std::make_unique<std::thread>([this]() { serverDoWork(); });
0027     }
0028 
0029     void WaitingServer::stop() {
0030       m_shouldStop = true;
0031       if (m_thread) {
0032         m_thread->join();
0033         m_thread.reset();
0034       }
0035     }
0036 
0037     bool WaitingServer::readyForWork() const { return m_waitingStreams.size() >= m_minNumStreamsBeforeDoingWork; }
0038 
0039     void WaitingServer::serverDoWork() {
0040       while (not m_shouldStop) {
0041         std::vector<unsigned int> streamsToUse;
0042         {
0043           std::unique_lock<std::mutex> lk(m_mutex);
0044 
0045           m_cond.wait_for(lk, std::chrono::seconds(m_secondsToWait), [this]() -> bool { return readyForWork(); });
0046 
0047           // Once we know which streams have given us data
0048           // we can release the lock and let other streams
0049           // set their data
0050           streamsToUse.swap(m_waitingStreams);
0051           lk.unlock();
0052         }
0053 
0054         // Here is the work that the server does for the modules
0055         // it will just add 1 to each value it has been given
0056         for (auto index : streamsToUse) {
0057           auto& streamData = m_perStream.at(index);
0058 
0059           std::exception_ptr exceptionPtr;
0060           try {
0061             for (auto v : *streamData.in_) {
0062               streamData.out_->push_back(v + 1);
0063             }
0064           } catch (...) {
0065             exceptionPtr = std::current_exception();
0066           }
0067           streamData.holder_.doneWaiting(exceptionPtr);
0068         }
0069       }
0070     }
0071   }  // namespace test_acquire
0072 }  // namespace edmtest