TransformAsyncIntProducer

WorkCache

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
#include "FWCore/Framework/interface/global/EDProducer.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/MakerMacros.h"
#include "DataFormats/TestObjects/interface/ToyProducts.h"
#include "FWCore/Utilities/interface/Exception.h"

#include <thread>
#include <chrono>
namespace edmtest {
  class TransformAsyncIntProducer : public edm::global::EDProducer<edm::Transformer> {
  public:
    struct WorkCache {
      std::shared_ptr<std::thread> thread_;
      IntProduct value_;
    };

    TransformAsyncIntProducer(edm::ParameterSet const& iPSet)
        : getToken_(consumes(iPSet.getParameter<edm::InputTag>("get"))),
          offset_(iPSet.getParameter<unsigned int>("offset")),
          transformOffset_(iPSet.getParameter<unsigned int>("transformOffset")),
          noPut_(iPSet.getParameter<bool>("noPut")) {
      putToken_ = produces<IntProduct>();
      bool check = iPSet.getUntrackedParameter<bool>("checkTransformNotCalled");
      registerTransformAsync(
          putToken_,
          [offset = transformOffset_, check](edm::StreamID, auto const& iFrom, auto iTask) {
            if (check) {
              throw cms::Exception("TransformShouldNotBeCalled");
            }
            WorkCache ret;
            using namespace std::chrono_literals;
            ret.thread_ = std::make_shared<std::thread>([iTask] { std::this_thread::sleep_for(100ms); });
            ret.value_ = IntProduct(iFrom.value + offset);
            return ret;
          },
          [](edm::StreamID, auto const& iFrom) {
            iFrom.thread_->join();
            return iFrom.value_;
          },
          "transform");
    }

    void produce(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const override {
      if (not noPut_) {
        iEvent.emplace(putToken_, iEvent.get(getToken_).value + offset_);
      }
    }
    static void fillDescriptions(edm::ConfigurationDescriptions& desc) {
      edm::ParameterSetDescription pset;
      pset.add<edm::InputTag>("get");
      pset.add<unsigned int>("offset", 0);
      pset.add<unsigned int>("transformOffset", 1);
      pset.addUntracked<bool>("checkTransformNotCalled", false);
      pset.add<bool>("noPut", false);

      desc.addDefault(pset);
    }

  private:
    const edm::EDGetTokenT<IntProduct> getToken_;
    edm::EDPutTokenT<IntProduct> putToken_;
    const unsigned int offset_;
    const unsigned int transformOffset_;
    const bool noPut_;
  };
}  // namespace edmtest

using edmtest::TransformAsyncIntProducer;
DEFINE_FWK_MODULE(TransformAsyncIntProducer);