File indexing completed on 2024-04-06 12:13:14
0001 #include "FWCore/Utilities/interface/SingleConsumerQ.h"
0002
0003 namespace edm {
0004
0005 SingleConsumerQ::SingleConsumerQ(int max_event_size, int max_queue_depth)
0006 : max_event_size_(max_event_size),
0007 max_queue_depth_(max_queue_depth),
0008 pos_(max_queue_depth - 1),
0009 mem_(max_event_size * max_queue_depth),
0010 buffer_pool_(),
0011 queue_(max_queue_depth),
0012 fpos_(),
0013 bpos_(),
0014 pool_lock_(),
0015 queue_lock_(),
0016 pool_cond_(),
0017 pop_cond_(),
0018 push_cond_() {
0019
0020
0021 for (char* i = &mem_[0]; i < &mem_[mem_.size()]; i += max_event_size)
0022 buffer_pool_.push_back(i);
0023 }
0024
0025 SingleConsumerQ::~SingleConsumerQ() {}
0026
0027 SingleConsumerQ::Buffer SingleConsumerQ::getProducerBuffer() {
0028
0029 std::unique_lock<std::mutex> sl(pool_lock_);
0030
0031 while (pos_ < 0) {
0032 pool_cond_.wait(sl);
0033 }
0034 void* v = buffer_pool_[pos_];
0035 --pos_;
0036 return Buffer(v, max_event_size_);
0037 }
0038
0039 void SingleConsumerQ::releaseProducerBuffer(void* v) {
0040
0041 std::lock_guard<std::mutex> sl(pool_lock_);
0042 ++pos_;
0043 buffer_pool_[pos_] = v;
0044 pool_cond_.notify_all();
0045 }
0046
0047 void SingleConsumerQ::commitProducerBuffer(void* v, int len) {
0048
0049 std::unique_lock<std::mutex> sl(queue_lock_);
0050
0051 while ((bpos_ + max_queue_depth_) == fpos_) {
0052 push_cond_.wait(sl);
0053 }
0054
0055
0056 queue_[fpos_ % max_queue_depth_] = Buffer(v, len);
0057 ++fpos_;
0058
0059 pop_cond_.notify_all();
0060 }
0061
0062 SingleConsumerQ::Buffer SingleConsumerQ::getConsumerBuffer() {
0063
0064 std::unique_lock<std::mutex> sl(queue_lock_);
0065
0066 while (bpos_ == fpos_) {
0067 pop_cond_.wait(sl);
0068 }
0069
0070 Buffer v = queue_[bpos_ % max_queue_depth_];
0071 ++bpos_;
0072
0073
0074 push_cond_.notify_all();
0075 return v;
0076 }
0077
0078 void SingleConsumerQ::releaseConsumerBuffer(void* v) {
0079
0080
0081
0082
0083
0084 releaseProducerBuffer(v);
0085 }
0086
0087 void SingleConsumerQ::commitConsumerBuffer(void* v, int) { releaseProducerBuffer(v); }
0088 }