Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-02-27 07:20:00

0001 #include "FWCore/Framework/interface/Frameworkfwd.h"
0002 #include "FWCore/Framework/interface/InputSourceMacros.h"
0003 #include "FWCore/Framework/interface/InputSourceDescription.h"
0004 #include "FWCore/Framework/interface/DelayedReader.h"
0005 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0006 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0007 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0009 #include "FWCore/Framework/interface/InputSource.h"
0010 #include "FWCore/Sources/interface/IDGeneratorSourceBase.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0013 
0014 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0015 
0016 namespace edm {
0017   namespace {
0018     class ThrowingDelayedReader : public DelayedReader {
0019     public:
0020       ThrowingDelayedReader(
0021           signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadSource,
0022           signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadSource)
0023           : preEventReadFromSourceSignal_(preEventReadSource),
0024             postEventReadFromSourceSignal_(postEventReadSource),
0025             e_(std::make_exception_ptr(cms::Exception("TEST"))) {}
0026 
0027       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal()
0028           const final {
0029         return preEventReadFromSourceSignal_;
0030       }
0031       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal()
0032           const final {
0033         return postEventReadFromSourceSignal_;
0034       }
0035 
0036     private:
0037       std::shared_ptr<WrapperBase> getProduct_(BranchID const& k, EDProductGetter const* ep) final {
0038         try {
0039           std::rethrow_exception(e_);
0040         } catch (cms::Exception const& iE) {
0041           //avoid adding to the context for each call
0042           auto copyException = iE;
0043           copyException.addContext("called ThrowingDelayedReader");
0044           throw copyException;
0045         }
0046       }
0047       void mergeReaders_(DelayedReader*) final {}
0048       void reset_() final {}
0049 
0050       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal_ =
0051           nullptr;
0052       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal_ =
0053           nullptr;
0054       std::exception_ptr e_;
0055     };
0056   }  // namespace
0057 
0058   class DelayedReaderThrowingSource : public IDGeneratorSourceBase<InputSource> {
0059   public:
0060     explicit DelayedReaderThrowingSource(ParameterSet const&, InputSourceDescription const&);
0061     static void fillDescriptions(ConfigurationDescriptions& descriptions);
0062 
0063   private:
0064     bool setRunAndEventInfo(EventID& id, TimeValue_t& time, edm::EventAuxiliary::ExperimentType&) override;
0065     void readEvent_(edm::EventPrincipal&) override;
0066 
0067     std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_() override {
0068       return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
0069     }
0070 
0071     ThrowingDelayedReader delayedReader_;
0072     ProcessHistoryID historyID_;
0073 
0074     std::unique_ptr<SharedResourcesAcquirer>
0075         resourceSharedWithDelayedReaderPtr_;  // We do not use propagate_const because the acquirer is itself mutable.
0076     std::shared_ptr<std::recursive_mutex> mutexSharedWithDelayedReader_;
0077   };
0078 
0079   DelayedReaderThrowingSource::DelayedReaderThrowingSource(ParameterSet const& pset, InputSourceDescription const& desc)
0080       : IDGeneratorSourceBase<InputSource>(pset, desc, false),
0081         delayedReader_(&preEventReadFromSourceSignal_, &postEventReadFromSourceSignal_) {
0082     auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
0083     resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
0084     mutexSharedWithDelayedReader_ = resources.second;
0085 
0086     ParameterSet dummy;
0087     dummy.registerIt();
0088     auto twd = TypeWithDict::byTypeInfo(typeid(edmtest::IntProduct));
0089 
0090     std::vector<ProductDescription> branches;
0091     for (auto const& label : pset.getUntrackedParameter<std::vector<std::string>>("labels")) {
0092       branches.push_back(ProductDescription(InEvent,
0093                                             label,        //module label
0094                                             "INPUTTEST",  //can't be the present process name
0095                                             twd.userClassName(),
0096                                             twd.friendlyClassName(),
0097                                             "",  //product instance name
0098                                             twd,
0099                                             false  //not produced
0100                                             ));
0101       branches.back().setOnDemand(true);  //says we use delayed reader
0102     }
0103     productRegistryUpdate().updateFromInput(branches);
0104 
0105     ProcessHistory ph;
0106     ph.emplace_back("INPUTTEST", dummy.id(), PROJECT_VERSION, HardwareResourcesDescription());
0107     processHistoryRegistry().registerProcessHistory(ph);
0108     historyID_ = ph.id();
0109 
0110     BranchIDLists bilists(1);
0111     for (auto const& branch : branches) {
0112       bilists[0].emplace_back(branch.branchID().id());
0113     }
0114     branchIDListHelper()->updateFromInput(bilists);
0115   }
0116 
0117   bool DelayedReaderThrowingSource::setRunAndEventInfo(EventID&, TimeValue_t&, edm::EventAuxiliary::ExperimentType&) {
0118     return true;
0119   }
0120 
0121   void DelayedReaderThrowingSource::readEvent_(edm::EventPrincipal& e) {
0122     BranchListIndexes indexes(1, static_cast<unsigned short>(0));
0123     branchIDListHelper()->fixBranchListIndexes(indexes);
0124     doReadEventWithDelayedReader(e, historyID_, EventSelectionIDVector(), std::move(indexes), &delayedReader_);
0125   }
0126 
0127   void DelayedReaderThrowingSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0128     ParameterSetDescription desc;
0129     desc.setComment("Throws an exception when the DelayedReader is used.");
0130     IDGeneratorSourceBase<InputSource>::fillDescription(desc);
0131     desc.addUntracked<std::vector<std::string>>("labels", {{"test"}});
0132     descriptions.add("source", desc);
0133   }
0134 }  // namespace edm
0135 
0136 using edm::DelayedReaderThrowingSource;
0137 DEFINE_FWK_INPUT_SOURCE(DelayedReaderThrowingSource);