File indexing completed on 2024-10-03 05:27:01
0001 #include "DataFormats/Common/interface/Handle.h"
0002 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0003 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/Framework/interface/global/EDProducer.h"
0006 #include "FWCore/Framework/interface/MakerMacros.h"
0007 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0008 #include "WaitingServer.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0010 #include "FWCore/Utilities/interface/EDGetToken.h"
0011 #include "FWCore/Utilities/interface/InputTag.h"
0012 #include "FWCore/Utilities/interface/StreamID.h"
0013
0014 #include <memory>
0015 #include <unistd.h>
0016 #include <vector>
0017
0018 namespace edm {
0019 class EventSetup;
0020 }
0021
0022 namespace edmtest {
0023 using namespace std::chrono_literals;
0024 class AcquireIntProducer : public edm::global::EDProducer<edm::ExternalWork, edm::StreamCache<test_acquire::Cache>> {
0025 public:
0026 explicit AcquireIntProducer(edm::ParameterSet const& pset);
0027 ~AcquireIntProducer() override;
0028
0029 std::unique_ptr<test_acquire::Cache> beginStream(edm::StreamID) const override;
0030
0031 void acquire(edm::StreamID,
0032 edm::Event const&,
0033 edm::EventSetup const&,
0034 edm::WaitingTaskWithArenaHolder) const override;
0035
0036 void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const override;
0037
0038 void endJob() override;
0039
0040 private:
0041 void preallocate(edm::PreallocationConfiguration const&) override;
0042
0043 std::vector<edm::EDGetTokenT<IntProduct>> m_tokens;
0044 edm::EDGetTokenT<IntProduct> m_tokenForProduce;
0045 std::unique_ptr<test_acquire::WaitingServer> m_server;
0046 const unsigned int m_numberOfStreamsToAccumulate;
0047 const unsigned int m_secondsToWaitForWork;
0048 };
0049
0050 AcquireIntProducer::AcquireIntProducer(edm::ParameterSet const& pset)
0051 : m_numberOfStreamsToAccumulate(pset.getUntrackedParameter<unsigned int>("streamsToAccumulate", 8)),
0052 m_secondsToWaitForWork(pset.getUntrackedParameter<unsigned int>("secondsToWaitForWork", 1)) {
0053 for (auto const& tag : pset.getParameter<std::vector<edm::InputTag>>("tags")) {
0054 m_tokens.emplace_back(consumes<IntProduct>(tag));
0055 }
0056 m_tokenForProduce = consumes<IntProduct>(pset.getParameter<edm::InputTag>("produceTag"));
0057 produces<IntProduct>();
0058 }
0059
0060 AcquireIntProducer::~AcquireIntProducer() {
0061 if (m_server) {
0062 m_server->stop();
0063 }
0064 }
0065
0066 void AcquireIntProducer::preallocate(edm::PreallocationConfiguration const& iPrealloc) {
0067 m_server = std::make_unique<test_acquire::WaitingServer>(
0068 iPrealloc.numberOfStreams(), m_numberOfStreamsToAccumulate, m_secondsToWaitForWork);
0069 m_server->start();
0070 }
0071
0072 std::unique_ptr<test_acquire::Cache> AcquireIntProducer::beginStream(edm::StreamID) const {
0073 return std::make_unique<test_acquire::Cache>();
0074 }
0075
0076 void AcquireIntProducer::acquire(edm::StreamID streamID,
0077 edm::Event const& event,
0078 edm::EventSetup const&,
0079 edm::WaitingTaskWithArenaHolder holder) const {
0080 std::this_thread::sleep_for(1s);
0081
0082 test_acquire::Cache* streamCacheData = streamCache(streamID);
0083 streamCacheData->retrieved().clear();
0084 streamCacheData->processed().clear();
0085
0086 for (auto const& token : m_tokens) {
0087 streamCacheData->retrieved().push_back(event.get(token).value);
0088 }
0089 m_server->requestValuesAsync(
0090 streamID.value(), &streamCacheData->retrieved(), &streamCacheData->processed(), holder);
0091 }
0092
0093 void AcquireIntProducer::produce(edm::StreamID streamID, edm::Event& event, edm::EventSetup const&) const {
0094 std::this_thread::sleep_for(1s);
0095
0096 int sum = 0;
0097 for (auto v : streamCache(streamID)->processed()) {
0098 sum += v;
0099 }
0100 event.put(std::make_unique<IntProduct>(sum));
0101
0102
0103 (void)event.get(m_tokenForProduce);
0104 }
0105
0106 void AcquireIntProducer::endJob() {
0107 if (m_server) {
0108 m_server->stop();
0109 }
0110 m_server.reset();
0111 }
0112 }
0113
0114 using edmtest::AcquireIntProducer;
0115 DEFINE_FWK_MODULE(AcquireIntProducer);