Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:13:14

0001 #include "FWCore/Utilities/interface/SingleConsumerQ.h"
0002 
0003 namespace edm {
0004 
0005   SingleConsumerQ::SingleConsumerQ(int max_event_size, int max_queue_depth)
0006       : max_event_size_(max_event_size),
0007         max_queue_depth_(max_queue_depth),
0008         pos_(max_queue_depth - 1),
0009         mem_(max_event_size * max_queue_depth),
0010         buffer_pool_(),
0011         queue_(max_queue_depth),
0012         fpos_(),
0013         bpos_(),
0014         pool_lock_(),
0015         queue_lock_(),
0016         pool_cond_(),
0017         pop_cond_(),
0018         push_cond_() {
0019     // throw if event size 0 or queue depth 0
0020 
0021     for (char* i = &mem_[0]; i < &mem_[mem_.size()]; i += max_event_size)
0022       buffer_pool_.push_back(i);
0023   }
0024 
0025   SingleConsumerQ::~SingleConsumerQ() {}
0026 
0027   SingleConsumerQ::Buffer SingleConsumerQ::getProducerBuffer() {
0028     // get lock
0029     std::unique_lock<std::mutex> sl(pool_lock_);
0030     // wait for buffer to appear
0031     while (pos_ < 0) {
0032       pool_cond_.wait(sl);
0033     }
0034     void* v = buffer_pool_[pos_];
0035     --pos_;
0036     return Buffer(v, max_event_size_);
0037   }
0038 
0039   void SingleConsumerQ::releaseProducerBuffer(void* v) {
0040     // get lock
0041     std::lock_guard<std::mutex> sl(pool_lock_);
0042     ++pos_;
0043     buffer_pool_[pos_] = v;
0044     pool_cond_.notify_all();
0045   }
0046 
0047   void SingleConsumerQ::commitProducerBuffer(void* v, int len) {
0048     // get lock
0049     std::unique_lock<std::mutex> sl(queue_lock_);
0050     // if full, wait for item to be removed
0051     while ((bpos_ + max_queue_depth_) == fpos_) {
0052       push_cond_.wait(sl);
0053     }
0054 
0055     // put buffer into queue
0056     queue_[fpos_ % max_queue_depth_] = Buffer(v, len);
0057     ++fpos_;
0058     // signal consumer
0059     pop_cond_.notify_all();
0060   }
0061 
0062   SingleConsumerQ::Buffer SingleConsumerQ::getConsumerBuffer() {
0063     // get lock
0064     std::unique_lock<std::mutex> sl(queue_lock_);
0065     // if empty, wait for item to appear
0066     while (bpos_ == fpos_) {
0067       pop_cond_.wait(sl);
0068     }
0069     // get a buffer from the queue and return it
0070     Buffer v = queue_[bpos_ % max_queue_depth_];
0071     ++bpos_;
0072     // note that these operations cannot throw
0073     // signal producer
0074     push_cond_.notify_all();
0075     return v;
0076   }
0077 
0078   void SingleConsumerQ::releaseConsumerBuffer(void* v) {
0079     // should the buffer be placed back onto the queue and not released?
0080     // we got here because a commit did to occur in the consumer.
0081     // we will allow consumers to call or not call commit for now, meaning
0082     // that we cannot distinguish between exception conditions and normal
0083     // return.  The buffer will always be released
0084     releaseProducerBuffer(v);
0085   }
0086 
0087   void SingleConsumerQ::commitConsumerBuffer(void* v, int) { releaseProducerBuffer(v); }
0088 }  // namespace edm