Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:13:12

0001 #ifndef FWCore_Utilities_SingleConsumerQ_h
0002 #define FWCore_Utilities_SingleConsumerQ_h
0003 
0004 // -*- C++ -*-
0005 
0006 /*
0007  A bounded queue for use in a multi-threaded producer/consumer application.
0008  This is a simple design.  It is only meant to be used where there is
0009  one consumer and one or more producers using the a queue instance.
0010 
0011  The problem with multiple consumers is the separate front/pop
0012  member functions.  If they are pulled together into one function,
0013  multiple consumers may be possible, but exception safety would then
0014  be a problem - popping an item off the queue to be held as a local
0015  variable, followed by its removal from the queue.  Having fixed size
0016  buffers within a fixed size pool and using a circular buffer as the
0017  queue alleviates most of this problem because exceptions will not
0018  occur during manipulation.  The only problem left to be checked is
0019  how (or if) the boost mutex manipulation can throw and when.
0020 
0021  Note: the current implementation has no protection again unsigned int
0022  overflows
0023 
0024  Missing:
0025   - the ring buffer is really not used to its fullest extent
0026   - the buffer sizes are fixed and cannot grow
0027   - a simple Buffer object is returned that has the pointer and len
0028     separate.  The length should be stored as the first word of the
0029     buffer itself
0030   - timeouts for consumer
0031   - good way to signal to consumer to end
0032   - keeping the instance of this thing around until all using threads are
0033     done with it
0034 
0035 */
0036 
0037 #include <vector>
0038 #include <mutex>
0039 #include <condition_variable>
0040 
0041 namespace edm {
0042 
0043   class SingleConsumerQ {
0044   public:
0045     struct Buffer {
0046       Buffer() : ptr_(), len_() {}
0047       Buffer(void* p, int len) : ptr_(p), len_(len) {}
0048 
0049       void* ptr_;
0050       int len_;
0051     };
0052     // no copy
0053     SingleConsumerQ(const SingleConsumerQ&) = delete;
0054 
0055     SingleConsumerQ(int max_event_size, int max_queue_depth);
0056     ~SingleConsumerQ();
0057 
0058     struct ConsumerType {
0059       static SingleConsumerQ::Buffer get(SingleConsumerQ& b) { return b.getConsumerBuffer(); }
0060       static void release(SingleConsumerQ& b, void* v) { b.releaseConsumerBuffer(v); }
0061       static void commit(SingleConsumerQ& b, void* v, int size) { b.commitConsumerBuffer(v, size); }
0062     };
0063     struct ProducerType {
0064       static SingleConsumerQ::Buffer get(SingleConsumerQ& b) { return b.getProducerBuffer(); }
0065       static void release(SingleConsumerQ& b, void* v) { b.releaseProducerBuffer(v); }
0066       static void commit(SingleConsumerQ& b, void* v, int size) { b.commitProducerBuffer(v, size); }
0067     };
0068 
0069     template <class T>
0070     class OperateBuffer {
0071     public:
0072       explicit OperateBuffer(SingleConsumerQ& b) : b_(b), v_(T::get(b)), committed_(false) {}
0073       ~OperateBuffer() {
0074         if (!committed_)
0075           T::release(b_, v_.ptr_);
0076       }
0077 
0078       void* buffer() const { return v_.ptr_; }
0079       int size() const { return v_.len_; }
0080       void commit(int theSize = 0) {
0081         T::commit(b_, v_.ptr_, theSize);
0082         committed_ = true;
0083       }
0084 
0085     private:
0086       SingleConsumerQ& b_;
0087       SingleConsumerQ::Buffer v_;
0088       bool committed_;
0089     };
0090 
0091     typedef OperateBuffer<ConsumerType> ConsumerBuffer;
0092     typedef OperateBuffer<ProducerType> ProducerBuffer;
0093 
0094     Buffer getProducerBuffer();
0095     void releaseProducerBuffer(void*);
0096     void commitProducerBuffer(void*, int);
0097 
0098     Buffer getConsumerBuffer();
0099     void releaseConsumerBuffer(void*);
0100     void commitConsumerBuffer(void*, int);
0101 
0102     int maxEventSize() const { return max_event_size_; }
0103     int maxQueueDepth() const { return max_queue_depth_; }
0104 
0105   private:
0106     // the memory for the buffers
0107     typedef std::vector<char> ByteArray;
0108     // the pool of buffers
0109     typedef std::vector<void*> Pool;
0110     // the queue
0111     typedef std::vector<Buffer> Queue;
0112 
0113     int max_event_size_;
0114     int max_queue_depth_;
0115     int pos_;  // use pool as stack of avaiable buffers
0116     ByteArray mem_;
0117     Pool buffer_pool_;
0118     Queue queue_;
0119     unsigned int fpos_, bpos_;  // positions for queue - front and back
0120 
0121     std::mutex pool_lock_;
0122     std::mutex queue_lock_;
0123     std::condition_variable pool_cond_;
0124     std::condition_variable pop_cond_;
0125     std::condition_variable push_cond_;
0126   };
0127 
0128 }  // namespace edm
0129 #endif