Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-08 03:36:13

0001 #include "FWCore/Framework/interface/stream/EDProducer.h"
0002 #include "FWCore/Framework/interface/Event.h"
0003 #include "FWCore/Framework/interface/MakerMacros.h"
0004 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0005 #include "FWCore/Utilities/interface/Exception.h"
0006 
0007 #include <thread>
0008 #include <chrono>
0009 namespace edmtest {
0010   class TransformAsyncIntStreamProducer : public edm::stream::EDProducer<edm::Transformer> {
0011   public:
0012     struct WorkCache {
0013       std::shared_ptr<std::thread> thread_;
0014       IntProduct value_;
0015     };
0016 
0017     TransformAsyncIntStreamProducer(edm::ParameterSet const& iPSet)
0018         : getToken_(consumes(iPSet.getParameter<edm::InputTag>("get"))),
0019           offset_(iPSet.getParameter<unsigned int>("offset")),
0020           transformOffset_(iPSet.getParameter<unsigned int>("transformOffset")) {
0021       putToken_ = produces<IntProduct>();
0022       bool check = iPSet.getUntrackedParameter<bool>("checkTransformNotCalled");
0023       registerTransformAsync(
0024           putToken_,
0025           [offset = transformOffset_, check](edm::StreamID, auto const& iFrom, auto iTask) {
0026             if (check) {
0027               throw cms::Exception("TransformShouldNotBeCalled");
0028             }
0029             WorkCache ret;
0030             using namespace std::chrono_literals;
0031             ret.thread_ = std::make_shared<std::thread>([iTask] { std::this_thread::sleep_for(100ms); });
0032             ret.value_ = IntProduct(iFrom.value + offset);
0033             return ret;
0034           },
0035           [](edm::StreamID, auto const& iFrom) {
0036             iFrom.thread_->join();
0037             return iFrom.value_;
0038           },
0039           "transform");
0040     }
0041 
0042     void produce(edm::Event& iEvent, edm::EventSetup const&) override {
0043       iEvent.emplace(putToken_, iEvent.get(getToken_).value + offset_);
0044     }
0045     static void fillDescriptions(edm::ConfigurationDescriptions& desc) {
0046       edm::ParameterSetDescription pset;
0047       pset.add<edm::InputTag>("get");
0048       pset.add<unsigned int>("offset", 0);
0049       pset.add<unsigned int>("transformOffset", 1);
0050       pset.addUntracked<bool>("checkTransformNotCalled", false);
0051 
0052       desc.addDefault(pset);
0053     }
0054 
0055   private:
0056     const edm::EDGetTokenT<IntProduct> getToken_;
0057     edm::EDPutTokenT<IntProduct> putToken_;
0058     const unsigned int offset_;
0059     const unsigned int transformOffset_;
0060   };
0061 }  // namespace edmtest
0062 
0063 using edmtest::TransformAsyncIntStreamProducer;
0064 DEFINE_FWK_MODULE(TransformAsyncIntStreamProducer);