Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:33

0001 #include "DataFormats/Common/interface/Handle.h"
0002 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0003 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/Framework/interface/global/EDProducer.h"
0006 #include "FWCore/Framework/interface/MakerMacros.h"
0007 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0008 #include "WaitingServer.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0010 #include "FWCore/Utilities/interface/EDGetToken.h"
0011 #include "FWCore/Utilities/interface/InputTag.h"
0012 #include "FWCore/Utilities/interface/StreamID.h"
0013 
0014 #include <memory>
0015 #include <unistd.h>
0016 #include <vector>
0017 
0018 namespace edm {
0019   class EventSetup;
0020 }
0021 
0022 namespace edmtest {
0023 
0024   class AcquireIntProducer : public edm::global::EDProducer<edm::ExternalWork, edm::StreamCache<test_acquire::Cache>> {
0025   public:
0026     explicit AcquireIntProducer(edm::ParameterSet const& pset);
0027     ~AcquireIntProducer() override;
0028 
0029     std::unique_ptr<test_acquire::Cache> beginStream(edm::StreamID) const override;
0030 
0031     void acquire(edm::StreamID,
0032                  edm::Event const&,
0033                  edm::EventSetup const&,
0034                  edm::WaitingTaskWithArenaHolder) const override;
0035 
0036     void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const override;
0037 
0038     void endJob() override;
0039 
0040   private:
0041     void preallocate(edm::PreallocationConfiguration const&) override;
0042 
0043     std::vector<edm::EDGetTokenT<IntProduct>> m_tokens;
0044     edm::EDGetTokenT<IntProduct> m_tokenForProduce;
0045     std::unique_ptr<test_acquire::WaitingServer> m_server;
0046     const unsigned int m_numberOfStreamsToAccumulate;
0047     const unsigned int m_secondsToWaitForWork;
0048   };
0049 
0050   AcquireIntProducer::AcquireIntProducer(edm::ParameterSet const& pset)
0051       : m_numberOfStreamsToAccumulate(pset.getUntrackedParameter<unsigned int>("streamsToAccumulate", 8)),
0052         m_secondsToWaitForWork(pset.getUntrackedParameter<unsigned int>("secondsToWaitForWork", 1)) {
0053     for (auto const& tag : pset.getParameter<std::vector<edm::InputTag>>("tags")) {
0054       m_tokens.emplace_back(consumes<IntProduct>(tag));
0055     }
0056     m_tokenForProduce = consumes<IntProduct>(pset.getParameter<edm::InputTag>("produceTag"));
0057     produces<IntProduct>();
0058   }
0059 
0060   AcquireIntProducer::~AcquireIntProducer() {
0061     if (m_server) {
0062       m_server->stop();
0063     }
0064   }
0065 
0066   void AcquireIntProducer::preallocate(edm::PreallocationConfiguration const& iPrealloc) {
0067     m_server = std::make_unique<test_acquire::WaitingServer>(
0068         iPrealloc.numberOfStreams(), m_numberOfStreamsToAccumulate, m_secondsToWaitForWork);
0069     m_server->start();
0070   }
0071 
0072   std::unique_ptr<test_acquire::Cache> AcquireIntProducer::beginStream(edm::StreamID) const {
0073     return std::make_unique<test_acquire::Cache>();
0074   }
0075 
0076   void AcquireIntProducer::acquire(edm::StreamID streamID,
0077                                    edm::Event const& event,
0078                                    edm::EventSetup const&,
0079                                    edm::WaitingTaskWithArenaHolder holder) const {
0080     usleep(1000000);
0081 
0082     test_acquire::Cache* streamCacheData = streamCache(streamID);
0083     streamCacheData->retrieved().clear();
0084     streamCacheData->processed().clear();
0085 
0086     for (auto const& token : m_tokens) {
0087       streamCacheData->retrieved().push_back(event.get(token).value);
0088     }
0089     m_server->requestValuesAsync(
0090         streamID.value(), &streamCacheData->retrieved(), &streamCacheData->processed(), holder);
0091   }
0092 
0093   void AcquireIntProducer::produce(edm::StreamID streamID, edm::Event& event, edm::EventSetup const&) const {
0094     usleep(1000000);
0095 
0096     int sum = 0;
0097     for (auto v : streamCache(streamID)->processed()) {
0098       sum += v;
0099     }
0100     event.put(std::make_unique<IntProduct>(sum));
0101 
0102     // This part is here only for the Parentage test.
0103     (void)event.get(m_tokenForProduce);
0104   }
0105 
0106   void AcquireIntProducer::endJob() {
0107     if (m_server) {
0108       m_server->stop();
0109     }
0110     m_server.reset();
0111   }
0112 }  // namespace edmtest
0113 
0114 using edmtest::AcquireIntProducer;
0115 DEFINE_FWK_MODULE(AcquireIntProducer);