Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-08-04 22:45:06

0001 #include "FWCore/Framework/interface/Frameworkfwd.h"
0002 #include "FWCore/Framework/interface/InputSourceMacros.h"
0003 #include "FWCore/Framework/interface/InputSourceDescription.h"
0004 #include "FWCore/Framework/interface/DelayedReader.h"
0005 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0006 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0007 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0009 #include "FWCore/Framework/interface/InputSource.h"
0010 #include "FWCore/Sources/interface/IDGeneratorSourceBase.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0013 
0014 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0015 
0016 #include "FWCore/Utilities/interface/GetPassID.h"
0017 
0018 namespace edm {
0019   namespace {
0020     class ThrowingDelayedReader : public DelayedReader {
0021     public:
0022       ThrowingDelayedReader(
0023           signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadSource,
0024           signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadSource)
0025           : preEventReadFromSourceSignal_(preEventReadSource),
0026             postEventReadFromSourceSignal_(postEventReadSource),
0027             e_(std::make_exception_ptr(cms::Exception("TEST"))) {}
0028 
0029       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal()
0030           const final {
0031         return preEventReadFromSourceSignal_;
0032       }
0033       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal()
0034           const final {
0035         return postEventReadFromSourceSignal_;
0036       }
0037 
0038     private:
0039       std::shared_ptr<WrapperBase> getProduct_(BranchID const& k, EDProductGetter const* ep) final {
0040         try {
0041           std::rethrow_exception(e_);
0042         } catch (cms::Exception const& iE) {
0043           //avoid adding to the context for each call
0044           auto copyException = iE;
0045           copyException.addContext("called ThrowingDelayedReader");
0046           throw copyException;
0047         }
0048       }
0049       void mergeReaders_(DelayedReader*) final{};
0050       void reset_() final{};
0051 
0052       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal_ =
0053           nullptr;
0054       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal_ =
0055           nullptr;
0056       std::exception_ptr e_;
0057     };
0058   }  // namespace
0059 
0060   class DelayedReaderThrowingSource : public IDGeneratorSourceBase<InputSource> {
0061   public:
0062     explicit DelayedReaderThrowingSource(ParameterSet const&, InputSourceDescription const&);
0063     static void fillDescriptions(ConfigurationDescriptions& descriptions);
0064 
0065   private:
0066     bool setRunAndEventInfo(EventID& id, TimeValue_t& time, edm::EventAuxiliary::ExperimentType&) override;
0067     void readEvent_(edm::EventPrincipal&) override;
0068 
0069     std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_() override {
0070       return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
0071     }
0072 
0073     ThrowingDelayedReader delayedReader_;
0074     ProcessHistoryID historyID_;
0075 
0076     std::unique_ptr<SharedResourcesAcquirer>
0077         resourceSharedWithDelayedReaderPtr_;  // We do not use propagate_const because the acquirer is itself mutable.
0078     std::shared_ptr<std::recursive_mutex> mutexSharedWithDelayedReader_;
0079   };
0080 
0081   DelayedReaderThrowingSource::DelayedReaderThrowingSource(ParameterSet const& pset, InputSourceDescription const& desc)
0082       : IDGeneratorSourceBase<InputSource>(pset, desc, false),
0083         delayedReader_(&preEventReadFromSourceSignal_, &postEventReadFromSourceSignal_) {
0084     auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
0085     resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
0086     mutexSharedWithDelayedReader_ = resources.second;
0087 
0088     ParameterSet dummy;
0089     dummy.registerIt();
0090     auto twd = TypeWithDict::byTypeInfo(typeid(edmtest::IntProduct));
0091 
0092     std::vector<BranchDescription> branches;
0093     for (auto const& label : pset.getUntrackedParameter<std::vector<std::string>>("labels")) {
0094       branches.push_back(BranchDescription(InEvent,
0095                                            label,        //module label
0096                                            "INPUTTEST",  //can't be the present process name
0097                                            twd.userClassName(),
0098                                            twd.friendlyClassName(),
0099                                            "",  //product instance name
0100                                            "",  //module name which isn't set for items not produced
0101                                            dummy.id(),
0102                                            twd,
0103                                            false  //not produced
0104                                            ));
0105       branches.back().setOnDemand(true);  //says we use delayed reader
0106     }
0107     productRegistryUpdate().updateFromInput(branches);
0108 
0109     ProcessHistory ph;
0110     ph.emplace_back("INPUTTEST", dummy.id(), PROJECT_VERSION, getPassID());
0111     processHistoryRegistry().registerProcessHistory(ph);
0112     historyID_ = ph.id();
0113 
0114     BranchIDLists bilists(1);
0115     for (auto const& branch : branches) {
0116       bilists[0].emplace_back(branch.branchID().id());
0117     }
0118     branchIDListHelper()->updateFromInput(bilists);
0119   }
0120 
0121   bool DelayedReaderThrowingSource::setRunAndEventInfo(EventID&, TimeValue_t&, edm::EventAuxiliary::ExperimentType&) {
0122     return true;
0123   }
0124 
0125   void DelayedReaderThrowingSource::readEvent_(edm::EventPrincipal& e) {
0126     BranchListIndexes indexes(1, static_cast<unsigned short>(0));
0127     branchIDListHelper()->fixBranchListIndexes(indexes);
0128     doReadEventWithDelayedReader(e, historyID_, EventSelectionIDVector(), std::move(indexes), &delayedReader_);
0129   }
0130 
0131   void DelayedReaderThrowingSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0132     ParameterSetDescription desc;
0133     desc.setComment("Throws an exception when the DelayedReader is used.");
0134     IDGeneratorSourceBase<InputSource>::fillDescription(desc);
0135     desc.addUntracked<std::vector<std::string>>("labels", {{"test"}});
0136     descriptions.add("source", desc);
0137   }
0138 }  // namespace edm
0139 
0140 using edm::DelayedReaderThrowingSource;
0141 DEFINE_FWK_INPUT_SOURCE(DelayedReaderThrowingSource);