File indexing completed on 2024-04-06 12:12:32
0001 #include "DataFormats/Common/interface/Handle.h"
0002 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0003 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/Framework/interface/global/EDFilter.h"
0006 #include "FWCore/Framework/interface/MakerMacros.h"
0007 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0008 #include "WaitingServer.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0010 #include "FWCore/Utilities/interface/EDGetToken.h"
0011 #include "FWCore/Utilities/interface/InputTag.h"
0012 #include "FWCore/Utilities/interface/StreamID.h"
0013
0014 #include <memory>
0015 #include <vector>
0016
0017 namespace edm {
0018 class EventSetup;
0019 }
0020
0021 namespace edmtest {
0022
0023 class AcquireIntFilter : public edm::global::EDFilter<edm::ExternalWork, edm::StreamCache<test_acquire::Cache>> {
0024 public:
0025 explicit AcquireIntFilter(edm::ParameterSet const& pset);
0026 ~AcquireIntFilter() override;
0027
0028 std::unique_ptr<test_acquire::Cache> beginStream(edm::StreamID) const override;
0029
0030 void acquire(edm::StreamID,
0031 edm::Event const&,
0032 edm::EventSetup const&,
0033 edm::WaitingTaskWithArenaHolder) const override;
0034
0035 bool filter(edm::StreamID, edm::Event&, edm::EventSetup const&) const override;
0036
0037 void endJob() override;
0038
0039 private:
0040 void preallocate(edm::PreallocationConfiguration const&) override;
0041
0042 std::vector<edm::EDGetTokenT<IntProduct>> m_tokens;
0043 edm::EDGetTokenT<IntProduct> m_tokenForProduce;
0044 std::unique_ptr<test_acquire::WaitingServer> m_server;
0045 const unsigned int m_numberOfStreamsToAccumulate;
0046 const unsigned int m_secondsToWaitForWork;
0047 };
0048
0049 AcquireIntFilter::AcquireIntFilter(edm::ParameterSet const& pset)
0050 : m_numberOfStreamsToAccumulate(pset.getUntrackedParameter<unsigned int>("streamsToAccumulate", 8)),
0051 m_secondsToWaitForWork(pset.getUntrackedParameter<unsigned int>("secondsToWaitForWork", 1)) {
0052 for (auto const& tag : pset.getParameter<std::vector<edm::InputTag>>("tags")) {
0053 m_tokens.emplace_back(consumes<IntProduct>(tag));
0054 }
0055 m_tokenForProduce = consumes<IntProduct>(pset.getParameter<edm::InputTag>("produceTag"));
0056 produces<IntProduct>();
0057 }
0058
0059 AcquireIntFilter::~AcquireIntFilter() {
0060 if (m_server) {
0061 m_server->stop();
0062 }
0063 }
0064
0065 void AcquireIntFilter::preallocate(edm::PreallocationConfiguration const& iPrealloc) {
0066 m_server = std::make_unique<test_acquire::WaitingServer>(
0067 iPrealloc.numberOfStreams(), m_numberOfStreamsToAccumulate, m_secondsToWaitForWork);
0068 m_server->start();
0069 }
0070
0071 std::unique_ptr<test_acquire::Cache> AcquireIntFilter::beginStream(edm::StreamID) const {
0072 return std::make_unique<test_acquire::Cache>();
0073 }
0074
0075 void AcquireIntFilter::acquire(edm::StreamID streamID,
0076 edm::Event const& event,
0077 edm::EventSetup const&,
0078 edm::WaitingTaskWithArenaHolder holder) const {
0079 test_acquire::Cache* streamCacheData = streamCache(streamID);
0080 streamCacheData->retrieved().clear();
0081 streamCacheData->processed().clear();
0082
0083 for (auto const& token : m_tokens) {
0084 streamCacheData->retrieved().push_back(event.get(token).value);
0085 }
0086
0087 m_server->requestValuesAsync(streamID, &streamCacheData->retrieved(), &streamCacheData->processed(), holder);
0088 }
0089
0090 bool AcquireIntFilter::filter(edm::StreamID streamID, edm::Event& event, edm::EventSetup const&) const {
0091 int sum = 0;
0092 for (auto v : streamCache(streamID)->processed()) {
0093 sum += v;
0094 }
0095 event.put(std::make_unique<IntProduct>(sum));
0096
0097
0098 (void)event.get(m_tokenForProduce);
0099
0100 return true;
0101 }
0102
0103 void AcquireIntFilter::endJob() {
0104 if (m_server) {
0105 m_server->stop();
0106 }
0107 m_server.reset();
0108 }
0109 }
0110
0111 using edmtest::AcquireIntFilter;
0112 DEFINE_FWK_MODULE(AcquireIntFilter);