DelayedReaderThrowingSource

ThrowingDelayedReader

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/InputSourceMacros.h"
#include "FWCore/Framework/interface/InputSourceDescription.h"
#include "FWCore/Framework/interface/DelayedReader.h"
#include "FWCore/Framework/interface/SharedResourcesRegistry.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Framework/interface/InputSource.h"
#include "FWCore/Sources/interface/IDGeneratorSourceBase.h"
#include "DataFormats/Provenance/interface/ProductRegistry.h"
#include "DataFormats/Provenance/interface/BranchIDListHelper.h"

#include "DataFormats/TestObjects/interface/ToyProducts.h"

namespace edm {
  namespace {
    class ThrowingDelayedReader : public DelayedReader {
    public:
      ThrowingDelayedReader(
          signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadSource,
          signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadSource)
          : preEventReadFromSourceSignal_(preEventReadSource),
            postEventReadFromSourceSignal_(postEventReadSource),
            e_(std::make_exception_ptr(cms::Exception("TEST"))) {}

      signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal()
          const final {
        return preEventReadFromSourceSignal_;
      }
      signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal()
          const final {
        return postEventReadFromSourceSignal_;
      }

    private:
      std::shared_ptr<WrapperBase> getProduct_(BranchID const& k, EDProductGetter const* ep) final {
        try {
          std::rethrow_exception(e_);
        } catch (cms::Exception const& iE) {
          //avoid adding to the context for each call
          auto copyException = iE;
          copyException.addContext("called ThrowingDelayedReader");
          throw copyException;
        }
      }
      void mergeReaders_(DelayedReader*) final {}
      void reset_() final {}

      signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal_ =
          nullptr;
      signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal_ =
          nullptr;
      std::exception_ptr e_;
    };
  }  // namespace

  class DelayedReaderThrowingSource : public IDGeneratorSourceBase<InputSource> {
  public:
    explicit DelayedReaderThrowingSource(ParameterSet const&, InputSourceDescription const&);
    static void fillDescriptions(ConfigurationDescriptions& descriptions);

  private:
    bool setRunAndEventInfo(EventID& id, TimeValue_t& time, edm::EventAuxiliary::ExperimentType&) override;
    void readEvent_(edm::EventPrincipal&) override;

    std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_() override {
      return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
    }

    ThrowingDelayedReader delayedReader_;
    ProcessHistoryID historyID_;

    std::unique_ptr<SharedResourcesAcquirer>
        resourceSharedWithDelayedReaderPtr_;  // We do not use propagate_const because the acquirer is itself mutable.
    std::shared_ptr<std::recursive_mutex> mutexSharedWithDelayedReader_;
  };

  DelayedReaderThrowingSource::DelayedReaderThrowingSource(ParameterSet const& pset, InputSourceDescription const& desc)
      : IDGeneratorSourceBase<InputSource>(pset, desc, false),
        delayedReader_(&preEventReadFromSourceSignal_, &postEventReadFromSourceSignal_) {
    auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
    resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
    mutexSharedWithDelayedReader_ = resources.second;

    ParameterSet dummy;
    dummy.registerIt();
    auto twd = TypeWithDict::byTypeInfo(typeid(edmtest::IntProduct));

    std::vector<ProductDescription> branches;
    for (auto const& label : pset.getUntrackedParameter<std::vector<std::string>>("labels")) {
      branches.push_back(ProductDescription(InEvent,
                                            label,        //module label
                                            "INPUTTEST",  //can't be the present process name
                                            twd.userClassName(),
                                            twd.friendlyClassName(),
                                            "",  //product instance name
                                            twd,
                                            false  //not produced
                                            ));
      branches.back().setOnDemand(true);  //says we use delayed reader
    }
    productRegistryUpdate().updateFromInput(branches);

    ProcessHistory ph;
    ph.emplace_back("INPUTTEST", dummy.id(), PROJECT_VERSION, HardwareResourcesDescription());
    processHistoryRegistry().registerProcessHistory(ph);
    historyID_ = ph.id();

    BranchIDLists bilists(1);
    for (auto const& branch : branches) {
      bilists[0].emplace_back(branch.branchID().id());
    }
    branchIDListHelper()->updateFromInput(bilists);
  }

  bool DelayedReaderThrowingSource::setRunAndEventInfo(EventID&, TimeValue_t&, edm::EventAuxiliary::ExperimentType&) {
    return true;
  }

  void DelayedReaderThrowingSource::readEvent_(edm::EventPrincipal& e) {
    BranchListIndexes indexes(1, static_cast<unsigned short>(0));
    branchIDListHelper()->fixBranchListIndexes(indexes);
    doReadEventWithDelayedReader(e, historyID_, EventSelectionIDVector(), std::move(indexes), &delayedReader_);
  }

  void DelayedReaderThrowingSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
    ParameterSetDescription desc;
    desc.setComment("Throws an exception when the DelayedReader is used.");
    IDGeneratorSourceBase<InputSource>::fillDescription(desc);
    desc.addUntracked<std::vector<std::string>>("labels", {{"test"}});
    descriptions.add("source", desc);
  }
}  // namespace edm

using edm::DelayedReaderThrowingSource;
DEFINE_FWK_INPUT_SOURCE(DelayedReaderThrowingSource);