Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-08-13 23:27:21

0001 #include "DataFormats/Common/interface/Handle.h"
0002 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0003 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/Framework/interface/global/EDFilter.h"
0006 #include "FWCore/Framework/interface/MakerMacros.h"
0007 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0008 #include "FWCore/Integration/test/WaitingServer.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0010 #include "FWCore/Utilities/interface/EDGetToken.h"
0011 #include "FWCore/Utilities/interface/InputTag.h"
0012 #include "FWCore/Utilities/interface/StreamID.h"
0013 
0014 #include <memory>
0015 #include <vector>
0016 
0017 namespace edm {
0018   class EventSetup;
0019 }
0020 
0021 namespace edmtest {
0022 
0023   class AcquireIntFilter : public edm::global::EDFilter<edm::ExternalWork, edm::StreamCache<test_acquire::Cache>> {
0024   public:
0025     explicit AcquireIntFilter(edm::ParameterSet const& pset);
0026     ~AcquireIntFilter() override;
0027 
0028     std::unique_ptr<test_acquire::Cache> beginStream(edm::StreamID) const override;
0029 
0030     void acquire(edm::StreamID,
0031                  edm::Event const&,
0032                  edm::EventSetup const&,
0033                  edm::WaitingTaskWithArenaHolder) const override;
0034 
0035     bool filter(edm::StreamID, edm::Event&, edm::EventSetup const&) const override;
0036 
0037     void endJob() override;
0038 
0039   private:
0040     void preallocate(edm::PreallocationConfiguration const&) override;
0041 
0042     std::vector<edm::EDGetTokenT<IntProduct>> m_tokens;
0043     edm::EDGetTokenT<IntProduct> m_tokenForProduce;
0044     std::unique_ptr<test_acquire::WaitingServer> m_server;
0045     const unsigned int m_numberOfStreamsToAccumulate;
0046     const unsigned int m_secondsToWaitForWork;
0047   };
0048 
0049   AcquireIntFilter::AcquireIntFilter(edm::ParameterSet const& pset)
0050       : m_numberOfStreamsToAccumulate(pset.getUntrackedParameter<unsigned int>("streamsToAccumulate", 8)),
0051         m_secondsToWaitForWork(pset.getUntrackedParameter<unsigned int>("secondsToWaitForWork", 1)) {
0052     for (auto const& tag : pset.getParameter<std::vector<edm::InputTag>>("tags")) {
0053       m_tokens.emplace_back(consumes<IntProduct>(tag));
0054     }
0055     m_tokenForProduce = consumes<IntProduct>(pset.getParameter<edm::InputTag>("produceTag"));
0056     produces<IntProduct>();
0057   }
0058 
0059   AcquireIntFilter::~AcquireIntFilter() {
0060     if (m_server) {
0061       m_server->stop();
0062     }
0063   }
0064 
0065   void AcquireIntFilter::preallocate(edm::PreallocationConfiguration const& iPrealloc) {
0066     m_server = std::make_unique<test_acquire::WaitingServer>(
0067         iPrealloc.numberOfStreams(), m_numberOfStreamsToAccumulate, m_secondsToWaitForWork);
0068     m_server->start();
0069   }
0070 
0071   std::unique_ptr<test_acquire::Cache> AcquireIntFilter::beginStream(edm::StreamID) const {
0072     return std::make_unique<test_acquire::Cache>();
0073   }
0074 
0075   void AcquireIntFilter::acquire(edm::StreamID streamID,
0076                                  edm::Event const& event,
0077                                  edm::EventSetup const&,
0078                                  edm::WaitingTaskWithArenaHolder holder) const {
0079     test_acquire::Cache* streamCacheData = streamCache(streamID);
0080     streamCacheData->retrieved().clear();
0081     streamCacheData->processed().clear();
0082 
0083     for (auto const& token : m_tokens) {
0084       streamCacheData->retrieved().push_back(event.get(token).value);
0085     }
0086 
0087     m_server->requestValuesAsync(streamID, &streamCacheData->retrieved(), &streamCacheData->processed(), holder);
0088   }
0089 
0090   bool AcquireIntFilter::filter(edm::StreamID streamID, edm::Event& event, edm::EventSetup const&) const {
0091     int sum = 0;
0092     for (auto v : streamCache(streamID)->processed()) {
0093       sum += v;
0094     }
0095     event.put(std::make_unique<IntProduct>(sum));
0096 
0097     // This part is here only for the Parentage test.
0098     (void)event.get(m_tokenForProduce);
0099 
0100     return true;
0101   }
0102 
0103   void AcquireIntFilter::endJob() {
0104     if (m_server) {
0105       m_server->stop();
0106     }
0107     m_server.reset();
0108   }
0109 }  // namespace edmtest
0110 
0111 using edmtest::AcquireIntFilter;
0112 DEFINE_FWK_MODULE(AcquireIntFilter);