TransformAsyncIntStreamProducer

WorkCache

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
#include "FWCore/Framework/interface/stream/EDProducer.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/MakerMacros.h"
#include "DataFormats/TestObjects/interface/ToyProducts.h"
#include "FWCore/Utilities/interface/Exception.h"

#include <thread>
#include <chrono>
namespace edmtest {
  class TransformAsyncIntStreamProducer : public edm::stream::EDProducer<edm::Transformer> {
  public:
    struct WorkCache {
      std::shared_ptr<std::thread> thread_;
      IntProduct value_;
    };

    TransformAsyncIntStreamProducer(edm::ParameterSet const& iPSet)
        : getToken_(consumes(iPSet.getParameter<edm::InputTag>("get"))),
          offset_(iPSet.getParameter<unsigned int>("offset")),
          transformOffset_(iPSet.getParameter<unsigned int>("transformOffset")) {
      putToken_ = produces<IntProduct>();
      bool check = iPSet.getUntrackedParameter<bool>("checkTransformNotCalled");
      registerTransformAsync(
          putToken_,
          [offset = transformOffset_, check](edm::StreamID, auto const& iFrom, auto iTask) {
            if (check) {
              throw cms::Exception("TransformShouldNotBeCalled");
            }
            WorkCache ret;
            using namespace std::chrono_literals;
            ret.thread_ = std::make_shared<std::thread>([iTask] { std::this_thread::sleep_for(100ms); });
            ret.value_ = IntProduct(iFrom.value + offset);
            return ret;
          },
          [](edm::StreamID, auto const& iFrom) {
            iFrom.thread_->join();
            return iFrom.value_;
          },
          "transform");
    }

    void produce(edm::Event& iEvent, edm::EventSetup const&) override {
      iEvent.emplace(putToken_, iEvent.get(getToken_).value + offset_);
    }
    static void fillDescriptions(edm::ConfigurationDescriptions& desc) {
      edm::ParameterSetDescription pset;
      pset.add<edm::InputTag>("get");
      pset.add<unsigned int>("offset", 0);
      pset.add<unsigned int>("transformOffset", 1);
      pset.addUntracked<bool>("checkTransformNotCalled", false);

      desc.addDefault(pset);
    }

  private:
    const edm::EDGetTokenT<IntProduct> getToken_;
    edm::EDPutTokenT<IntProduct> putToken_;
    const unsigned int offset_;
    const unsigned int transformOffset_;
  };
}  // namespace edmtest

using edmtest::TransformAsyncIntStreamProducer;
DEFINE_FWK_MODULE(TransformAsyncIntStreamProducer);