File indexing completed on 2024-04-06 12:13:12
0001 #ifndef FWCore_Utilities_SingleConsumerQ_h
0002 #define FWCore_Utilities_SingleConsumerQ_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037 #include <vector>
0038 #include <mutex>
0039 #include <condition_variable>
0040
0041 namespace edm {
0042
0043 class SingleConsumerQ {
0044 public:
0045 struct Buffer {
0046 Buffer() : ptr_(), len_() {}
0047 Buffer(void* p, int len) : ptr_(p), len_(len) {}
0048
0049 void* ptr_;
0050 int len_;
0051 };
0052
0053 SingleConsumerQ(const SingleConsumerQ&) = delete;
0054
0055 SingleConsumerQ(int max_event_size, int max_queue_depth);
0056 ~SingleConsumerQ();
0057
0058 struct ConsumerType {
0059 static SingleConsumerQ::Buffer get(SingleConsumerQ& b) { return b.getConsumerBuffer(); }
0060 static void release(SingleConsumerQ& b, void* v) { b.releaseConsumerBuffer(v); }
0061 static void commit(SingleConsumerQ& b, void* v, int size) { b.commitConsumerBuffer(v, size); }
0062 };
0063 struct ProducerType {
0064 static SingleConsumerQ::Buffer get(SingleConsumerQ& b) { return b.getProducerBuffer(); }
0065 static void release(SingleConsumerQ& b, void* v) { b.releaseProducerBuffer(v); }
0066 static void commit(SingleConsumerQ& b, void* v, int size) { b.commitProducerBuffer(v, size); }
0067 };
0068
0069 template <class T>
0070 class OperateBuffer {
0071 public:
0072 explicit OperateBuffer(SingleConsumerQ& b) : b_(b), v_(T::get(b)), committed_(false) {}
0073 ~OperateBuffer() {
0074 if (!committed_)
0075 T::release(b_, v_.ptr_);
0076 }
0077
0078 void* buffer() const { return v_.ptr_; }
0079 int size() const { return v_.len_; }
0080 void commit(int theSize = 0) {
0081 T::commit(b_, v_.ptr_, theSize);
0082 committed_ = true;
0083 }
0084
0085 private:
0086 SingleConsumerQ& b_;
0087 SingleConsumerQ::Buffer v_;
0088 bool committed_;
0089 };
0090
0091 typedef OperateBuffer<ConsumerType> ConsumerBuffer;
0092 typedef OperateBuffer<ProducerType> ProducerBuffer;
0093
0094 Buffer getProducerBuffer();
0095 void releaseProducerBuffer(void*);
0096 void commitProducerBuffer(void*, int);
0097
0098 Buffer getConsumerBuffer();
0099 void releaseConsumerBuffer(void*);
0100 void commitConsumerBuffer(void*, int);
0101
0102 int maxEventSize() const { return max_event_size_; }
0103 int maxQueueDepth() const { return max_queue_depth_; }
0104
0105 private:
0106
0107 typedef std::vector<char> ByteArray;
0108
0109 typedef std::vector<void*> Pool;
0110
0111 typedef std::vector<Buffer> Queue;
0112
0113 int max_event_size_;
0114 int max_queue_depth_;
0115 int pos_;
0116 ByteArray mem_;
0117 Pool buffer_pool_;
0118 Queue queue_;
0119 unsigned int fpos_, bpos_;
0120
0121 std::mutex pool_lock_;
0122 std::mutex queue_lock_;
0123 std::condition_variable pool_cond_;
0124 std::condition_variable pop_cond_;
0125 std::condition_variable push_cond_;
0126 };
0127
0128 }
0129 #endif