Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-02-14 13:28:39

0001 #ifndef FWCore_Integration_WaitingServer_h
0002 #define FWCore_Integration_WaitingServer_h
0003 
0004 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0005 
0006 #include <atomic>
0007 #include <condition_variable>
0008 #include <memory>
0009 #include <mutex>
0010 #include <thread>
0011 #include <vector>
0012 
0013 namespace edmtest {
0014   namespace test_acquire {
0015 
0016     struct StreamData {
0017       StreamData() : in_(nullptr), out_(nullptr) {}
0018 
0019       std::vector<int> const* in_;
0020       std::vector<int>* out_;
0021       edm::WaitingTaskWithArenaHolder holder_;
0022     };
0023 
0024     class WaitingServer {
0025     public:
0026       WaitingServer(unsigned int iNumberOfStreams,
0027                     unsigned int iMinNumberOfStreamsBeforeDoingWork,
0028                     unsigned int iSecondsToWait)
0029           : m_perStream(iNumberOfStreams),
0030             m_minNumStreamsBeforeDoingWork(iMinNumberOfStreamsBeforeDoingWork),
0031             m_secondsToWait(iSecondsToWait),
0032             m_shouldStop(false) {}
0033 
0034       void start();
0035       void stop();
0036 
0037       void requestValuesAsync(unsigned int dataID,
0038                               std::vector<int> const* iIn,
0039                               std::vector<int>* iOut,
0040                               edm::WaitingTaskWithArenaHolder holder);
0041 
0042     private:
0043       void serverDoWork();
0044 
0045       bool readyForWork() const;
0046 
0047       std::mutex m_mutex;  //needed by m_cond
0048       std::condition_variable m_cond;
0049       std::unique_ptr<std::thread> m_thread;
0050       std::vector<StreamData> m_perStream;
0051       std::vector<unsigned int> m_waitingStreams;
0052       const unsigned int m_minNumStreamsBeforeDoingWork;
0053       const unsigned int m_secondsToWait;
0054       std::atomic<bool> m_shouldStop;
0055     };
0056 
0057     class Cache {
0058     public:
0059       std::vector<int> const& retrieved() const { return m_retrieved; }
0060       std::vector<int>& retrieved() { return m_retrieved; }
0061 
0062       std::vector<int> const& processed() const { return m_processed; }
0063       std::vector<int>& processed() { return m_processed; }
0064 
0065     private:
0066       std::vector<int> m_retrieved;
0067       std::vector<int> m_processed;
0068     };
0069   }  // namespace test_acquire
0070 }  // namespace edmtest
0071 #endif