File indexing completed on 2025-01-08 03:36:13
0001 #include "FWCore/Framework/interface/stream/EDProducer.h"
0002 #include "FWCore/Framework/interface/Event.h"
0003 #include "FWCore/Framework/interface/MakerMacros.h"
0004 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0005 #include "FWCore/Utilities/interface/Exception.h"
0006
0007 #include <thread>
0008 #include <chrono>
0009 namespace edmtest {
0010 class TransformAsyncIntStreamProducer : public edm::stream::EDProducer<edm::Transformer> {
0011 public:
0012 struct WorkCache {
0013 std::shared_ptr<std::thread> thread_;
0014 IntProduct value_;
0015 };
0016
0017 TransformAsyncIntStreamProducer(edm::ParameterSet const& iPSet)
0018 : getToken_(consumes(iPSet.getParameter<edm::InputTag>("get"))),
0019 offset_(iPSet.getParameter<unsigned int>("offset")),
0020 transformOffset_(iPSet.getParameter<unsigned int>("transformOffset")) {
0021 putToken_ = produces<IntProduct>();
0022 bool check = iPSet.getUntrackedParameter<bool>("checkTransformNotCalled");
0023 registerTransformAsync(
0024 putToken_,
0025 [offset = transformOffset_, check](edm::StreamID, auto const& iFrom, auto iTask) {
0026 if (check) {
0027 throw cms::Exception("TransformShouldNotBeCalled");
0028 }
0029 WorkCache ret;
0030 using namespace std::chrono_literals;
0031 ret.thread_ = std::make_shared<std::thread>([iTask] { std::this_thread::sleep_for(100ms); });
0032 ret.value_ = IntProduct(iFrom.value + offset);
0033 return ret;
0034 },
0035 [](edm::StreamID, auto const& iFrom) {
0036 iFrom.thread_->join();
0037 return iFrom.value_;
0038 },
0039 "transform");
0040 }
0041
0042 void produce(edm::Event& iEvent, edm::EventSetup const&) override {
0043 iEvent.emplace(putToken_, iEvent.get(getToken_).value + offset_);
0044 }
0045 static void fillDescriptions(edm::ConfigurationDescriptions& desc) {
0046 edm::ParameterSetDescription pset;
0047 pset.add<edm::InputTag>("get");
0048 pset.add<unsigned int>("offset", 0);
0049 pset.add<unsigned int>("transformOffset", 1);
0050 pset.addUntracked<bool>("checkTransformNotCalled", false);
0051
0052 desc.addDefault(pset);
0053 }
0054
0055 private:
0056 const edm::EDGetTokenT<IntProduct> getToken_;
0057 edm::EDPutTokenT<IntProduct> putToken_;
0058 const unsigned int offset_;
0059 const unsigned int transformOffset_;
0060 };
0061 }
0062
0063 using edmtest::TransformAsyncIntStreamProducer;
0064 DEFINE_FWK_MODULE(TransformAsyncIntStreamProducer);