Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-03-13 02:31:49

0001 #ifndef FWCore_Framework_InputSource_h
0002 #define FWCore_Framework_InputSource_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 InputSource: Abstract interface for all input sources.
0007 
0008 Some examples of InputSource subclasses are:
0009 
0010  1) PoolSource: Handles things related to reading from an EDM/ROOT file.
0011  This source provides for delayed loading of data.
0012  2) EmptySource: Handles similar tasks for the case where there is no
0013  data in the input.
0014 
0015 ----------------------------------------------------------------------*/
0016 
0017 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0018 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0020 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0021 #include "DataFormats/Provenance/interface/RunID.h"
0022 #include "DataFormats/Provenance/interface/Timestamp.h"
0023 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0024 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0025 #include "FWCore/Framework/interface/Frameworkfwd.h"
0026 #include "FWCore/Framework/interface/ProcessingController.h"
0027 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0028 #include "FWCore/Utilities/interface/RunIndex.h"
0029 #include "FWCore/Utilities/interface/Signal.h"
0030 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0031 #include "FWCore/Utilities/interface/StreamID.h"
0032 
0033 #include <memory>
0034 #include <string>
0035 #include <chrono>
0036 #include <mutex>
0037 
0038 namespace edm {
0039   class ActivityRegistry;
0040   class BranchIDListHelper;
0041   class ConfigurationDescriptions;
0042   class HistoryAppender;
0043   class ParameterSet;
0044   class ParameterSetDescription;
0045   class ProcessContext;
0046   class ProcessHistoryRegistry;
0047   class StreamContext;
0048   class ModuleCallingContext;
0049   class SharedResourcesAcquirer;
0050   class ThinnedAssociationsHelper;
0051 
0052   class InputSource {
0053   public:
0054     enum class ItemType : char { IsInvalid, IsStop, IsFile, IsRun, IsLumi, IsEvent, IsRepeat, IsSynchronize };
0055     enum class ItemPosition : char { Invalid, LastItemToBeMerged, NotLastItemToBeMerged };
0056 
0057     class ItemTypeInfo {
0058     public:
0059       constexpr ItemTypeInfo(ItemType type = ItemType::IsInvalid, ItemPosition position = ItemPosition::Invalid)
0060           : type_(type), position_(position) {}
0061       ItemType itemType() const { return type_; }
0062       ItemPosition itemPosition() const { return position_; }
0063 
0064       // Note that conversion to ItemType is defined and often used to
0065       // compare an ItemTypeInfo with an ItemType.
0066       // operator== of two ItemTypeInfo's is intentionally NOT defined.
0067       // The constructor also allows implicit conversion from ItemType and
0068       // often assignment from ItemType to ItemTypeInfo occurs.
0069       operator ItemType() const { return type_; }
0070 
0071     private:
0072       ItemType type_;
0073 
0074       // position_ should always be Invalid if the itemType_ is not IsRun or IsLumi.
0075       // Even for runs and lumis, it is OK to leave it Invalid because the
0076       // Framework can figure this out based on the next item. Offline it is
0077       // simplest to always leave it Invalid. For online sources, there are
0078       // optimizations that the Framework can use when it knows that a run or
0079       // lumi is the last to be merged before the following item is known. This
0080       // is useful in cases where the function named getNextItemType
0081       // might take a long time to return.
0082       ItemPosition position_;
0083     };
0084 
0085     enum ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents };
0086 
0087     /// Constructor
0088     explicit InputSource(ParameterSet const&, InputSourceDescription const&);
0089 
0090     /// Destructor
0091     virtual ~InputSource() noexcept(false);
0092 
0093     InputSource(InputSource const&) = delete;             // Disallow copying and moving
0094     InputSource& operator=(InputSource const&) = delete;  // Disallow copying and moving
0095 
0096     static void fillDescriptions(ConfigurationDescriptions& descriptions);
0097     static const std::string& baseType();
0098     static void fillDescription(ParameterSetDescription& desc);
0099     static void prevalidate(ConfigurationDescriptions&);
0100 
0101     /// Advances the source to the next item
0102     ItemTypeInfo nextItemType();
0103 
0104     /// Read next event
0105     void readEvent(EventPrincipal& ep, StreamContext&);
0106 
0107     /// Read a specific event
0108     bool readEvent(EventPrincipal& ep, EventID const&, StreamContext&);
0109 
0110     /// Read next luminosity block Auxilary
0111     std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary();
0112 
0113     /// Read next run Auxiliary
0114     std::shared_ptr<RunAuxiliary> readRunAuxiliary();
0115 
0116     /// Read next run (new run)
0117     void readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender);
0118 
0119     /// Read next run (same as a prior run)
0120     void readAndMergeRun(RunPrincipal& rp);
0121 
0122     /// Read next luminosity block (new lumi)
0123     void readLuminosityBlock(LuminosityBlockPrincipal& lumiPrincipal, HistoryAppender& historyAppender);
0124 
0125     /// Read next luminosity block (same as a prior lumi)
0126     void readAndMergeLumi(LuminosityBlockPrincipal& lbp);
0127 
0128     /// Fill the ProcessBlockHelper with info for the current file
0129     void fillProcessBlockHelper();
0130 
0131     /// Next process block, return false if there is none, sets the processName in the principal
0132     bool nextProcessBlock(ProcessBlockPrincipal&);
0133 
0134     /// Read next process block
0135     void readProcessBlock(ProcessBlockPrincipal&);
0136 
0137     /// Read next file
0138     std::shared_ptr<FileBlock> readFile();
0139 
0140     /// close current file
0141     void closeFile(FileBlock*, bool cleaningUpAfterException);
0142 
0143     /// Skip the number of events specified.
0144     /// Offset may be negative.
0145     void skipEvents(int offset);
0146 
0147     bool goToEvent(EventID const& eventID);
0148 
0149     /// Begin again at the first event
0150     void rewind();
0151 
0152     /// Set the run number
0153     void setRunNumber(RunNumber_t r) { setRun(r); }
0154 
0155     /// Set the luminosity block ID
0156     void setLuminosityBlockNumber_t(LuminosityBlockNumber_t lb) { setLumi(lb); }
0157 
0158     /// issue an event report
0159     void issueReports(EventID const& eventID, StreamID streamID);
0160 
0161     /// Register any produced products into source's registry
0162     virtual void registerProducts();
0163 
0164     /// Accessors for product registry
0165     ProductRegistry const& productRegistry() const { return productRegistry_; }
0166 
0167     /// Accessors for process history registry.
0168     ProcessHistoryRegistry const& processHistoryRegistry() const { return *processHistoryRegistry_; }
0169     ProcessHistoryRegistry& processHistoryRegistry() { return *processHistoryRegistry_; }
0170 
0171     /// Accessors for branchIDListHelper
0172     std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0173       return get_underlying_safe(branchIDListHelper_);
0174     }
0175     std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0176 
0177     /// Accessors for processBlockHelper
0178     std::shared_ptr<ProcessBlockHelper const> processBlockHelper() const {
0179       return get_underlying_safe(processBlockHelper_);
0180     }
0181     std::shared_ptr<ProcessBlockHelper>& processBlockHelper() { return get_underlying_safe(processBlockHelper_); }
0182 
0183     /// Accessors for thinnedAssociationsHelper
0184     std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0185       return get_underlying_safe(thinnedAssociationsHelper_);
0186     }
0187     std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0188       return get_underlying_safe(thinnedAssociationsHelper_);
0189     }
0190 
0191     /// Reset the remaining number of events/lumis to the maximum number.
0192     void repeat() {
0193       remainingEvents_ = maxEvents_;
0194       remainingLumis_ = maxLumis_;
0195     }
0196 
0197     /// Returns nullptr if no resource shared between the Source and a DelayedReader
0198     std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader();
0199 
0200     /// Accessor for maximum number of events to be read.
0201     /// -1 is used for unlimited.
0202     int maxEvents() const { return maxEvents_; }
0203 
0204     /// Accessor for remaining number of events to be read.
0205     /// -1 is used for unlimited.
0206     int remainingEvents() const { return remainingEvents_; }
0207 
0208     /// Accessor for maximum number of lumis to be read.
0209     /// -1 is used for unlimited.
0210     int maxLuminosityBlocks() const { return maxLumis_; }
0211 
0212     /// Accessor for remaining number of lumis to be read.
0213     /// -1 is used for unlimited.
0214     int remainingLuminosityBlocks() const { return remainingLumis_; }
0215 
0216     /// Accessor for 'module' description.
0217     ModuleDescription const& moduleDescription() const { return moduleDescription_; }
0218 
0219     /// Accessor for Process Configuration
0220     ProcessConfiguration const& processConfiguration() const { return moduleDescription().processConfiguration(); }
0221 
0222     /// Accessor for global process identifier
0223     std::string const& processGUID() const { return processGUID_; }
0224 
0225     /// Called by framework at beginning of job. The argument is the full product registry
0226     void doBeginJob(edm::ProductRegistry const&);
0227 
0228     /// Called by framework at end of job
0229     void doEndJob();
0230 
0231     /// Called by framework at beginning of lumi block
0232     virtual void doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*);
0233 
0234     /// Called by framework at beginning of run
0235     virtual void doBeginRun(RunPrincipal& rp, ProcessContext const*);
0236 
0237     /// Accessor for the current time, as seen by the input source
0238     Timestamp const& timestamp() const { return time_; }
0239 
0240     /// Accessor for the reduced process history ID of the current run.
0241     /// This is the ID of the input process history which does not include
0242     /// the current process.
0243     ProcessHistoryID const& reducedProcessHistoryID() const;
0244 
0245     /// Accessor for current run number
0246     RunNumber_t run() const;
0247 
0248     /// Accessor for current luminosity block number
0249     LuminosityBlockNumber_t luminosityBlock() const;
0250 
0251     /// RunsLumisAndEvents (default), RunsAndLumis, or Runs.
0252     ProcessingMode processingMode() const { return processingMode_; }
0253 
0254     /// Accessor for Activity Registry
0255     std::shared_ptr<ActivityRegistry> actReg() const { return actReg_; }
0256 
0257     /// Called by the framework to merge or insert run in principal cache.
0258     std::shared_ptr<RunAuxiliary> runAuxiliary() const { return runAuxiliary_; }
0259 
0260     /// Called by the framework to merge or insert lumi in principal cache.
0261     std::shared_ptr<LuminosityBlockAuxiliary> luminosityBlockAuxiliary() const { return lumiAuxiliary_; }
0262 
0263     bool randomAccess() const;
0264     ProcessingController::ForwardState forwardState() const;
0265     ProcessingController::ReverseState reverseState() const;
0266 
0267     class EventSourceSentry {
0268     public:
0269       EventSourceSentry(InputSource const& source, StreamContext& sc);
0270       ~EventSourceSentry();
0271 
0272       EventSourceSentry(EventSourceSentry const&) = delete;             // Disallow copying and moving
0273       EventSourceSentry& operator=(EventSourceSentry const&) = delete;  // Disallow copying and moving
0274 
0275     private:
0276       InputSource const& source_;
0277       StreamContext& sc_;
0278     };
0279 
0280     class LumiSourceSentry {
0281     public:
0282       LumiSourceSentry(InputSource const& source, LuminosityBlockIndex id);
0283       ~LumiSourceSentry();
0284 
0285       LumiSourceSentry(LumiSourceSentry const&) = delete;             // Disallow copying and moving
0286       LumiSourceSentry& operator=(LumiSourceSentry const&) = delete;  // Disallow copying and moving
0287 
0288     private:
0289       InputSource const& source_;
0290       LuminosityBlockIndex index_;
0291     };
0292 
0293     class RunSourceSentry {
0294     public:
0295       RunSourceSentry(InputSource const& source, RunIndex id);
0296       ~RunSourceSentry();
0297 
0298       RunSourceSentry(RunSourceSentry const&) = delete;             // Disallow copying and moving
0299       RunSourceSentry& operator=(RunSourceSentry const&) = delete;  // Disallow copying and moving
0300 
0301     private:
0302       InputSource const& source_;
0303       RunIndex index_;
0304     };
0305 
0306     class ProcessBlockSourceSentry {
0307     public:
0308       ProcessBlockSourceSentry(InputSource const&, std::string const&);
0309       ~ProcessBlockSourceSentry();
0310 
0311       ProcessBlockSourceSentry(ProcessBlockSourceSentry const&) = delete;             // Disallow copying and moving
0312       ProcessBlockSourceSentry& operator=(ProcessBlockSourceSentry const&) = delete;  // Disallow copying and moving
0313 
0314     private:
0315       InputSource const& source_;
0316       std::string const& processName_;
0317     };
0318 
0319     class FileOpenSentry {
0320     public:
0321       typedef signalslot::Signal<void(std::string const&)> Sig;
0322       explicit FileOpenSentry(InputSource const& source, std::string const& lfn);
0323       ~FileOpenSentry();
0324 
0325       FileOpenSentry(FileOpenSentry const&) = delete;             // Disallow copying and moving
0326       FileOpenSentry& operator=(FileOpenSentry const&) = delete;  // Disallow copying and moving
0327 
0328     private:
0329       Sig& post_;
0330       std::string const& lfn_;
0331     };
0332 
0333     class FileCloseSentry {
0334     public:
0335       typedef signalslot::Signal<void(std::string const&)> Sig;
0336       explicit FileCloseSentry(InputSource const& source, std::string const& lfn);
0337       ~FileCloseSentry();
0338 
0339       FileCloseSentry(FileCloseSentry const&) = delete;             // Disallow copying and moving
0340       FileCloseSentry& operator=(FileCloseSentry const&) = delete;  // Disallow copying and moving
0341 
0342     private:
0343       Sig& post_;
0344       std::string const& lfn_;
0345     };
0346 
0347     signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> preEventReadFromSourceSignal_;
0348     signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> postEventReadFromSourceSignal_;
0349 
0350   protected:
0351     virtual void skip(int offset);
0352 
0353     /// To set the current time, as seen by the input source
0354     void setTimestamp(Timestamp const& theTime) { time_ = theTime; }
0355 
0356     ProductRegistry& productRegistryUpdate() { return productRegistry_; }
0357     ProcessHistoryRegistry& processHistoryRegistryForUpdate() { return *processHistoryRegistry_; }
0358     ItemTypeInfo state() const { return state_; }
0359     void setRunAuxiliary(RunAuxiliary* rp) {
0360       runAuxiliary_.reset(rp);
0361       newRun_ = newLumi_ = true;
0362     }
0363     void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary* lbp) {
0364       lumiAuxiliary_.reset(lbp);
0365       newLumi_ = true;
0366     }
0367     void resetRunAuxiliary(bool isNewRun = true) const {
0368       runAuxiliary_.reset();
0369       newRun_ = newLumi_ = isNewRun;
0370     }
0371     void resetLuminosityBlockAuxiliary(bool isNewLumi = true) const {
0372       lumiAuxiliary_.reset();
0373       newLumi_ = isNewLumi;
0374     }
0375     void reset() const {
0376       resetLuminosityBlockAuxiliary();
0377       resetRunAuxiliary();
0378       state_ = ItemTypeInfo();
0379     }
0380     bool newRun() const { return newRun_; }
0381     void setNewRun() { newRun_ = true; }
0382     void resetNewRun() { newRun_ = false; }
0383     bool newLumi() const { return newLumi_; }
0384     void setNewLumi() { newLumi_ = true; }
0385     void resetNewLumi() { newLumi_ = false; }
0386     bool eventCached() const { return eventCached_; }
0387     /// Called by the framework to merge or ached() const {return eventCached_;}
0388     void setEventCached() { eventCached_ = true; }
0389     void resetEventCached() { eventCached_ = false; }
0390 
0391     ///Called by inheriting classes when running multicore when the receiver has told them to
0392     /// skip some events.
0393     void decreaseRemainingEventsBy(int iSkipped);
0394 
0395     ///Begin protected makes it easier to do template programming
0396     virtual void beginJob(edm::ProductRegistry const&);
0397 
0398   private:
0399     bool eventLimitReached() const { return remainingEvents_ == 0; }
0400     bool lumiLimitReached() const {
0401       if (remainingLumis_ == 0) {
0402         return true;
0403       }
0404       if (maxSecondsUntilRampdown_ <= 0) {
0405         return false;
0406       }
0407       auto end = std::chrono::steady_clock::now();
0408       auto elapsed = end - processingStart_;
0409       if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > maxSecondsUntilRampdown_) {
0410         return true;
0411       }
0412       return false;
0413     }
0414     bool limitReached() const { return eventLimitReached() || lumiLimitReached(); }
0415     virtual ItemTypeInfo getNextItemType() = 0;
0416     ItemTypeInfo nextItemType_();
0417     virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_() = 0;
0418     virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() = 0;
0419     virtual void fillProcessBlockHelper_();
0420     virtual bool nextProcessBlock_(ProcessBlockPrincipal&);
0421     virtual void readProcessBlock_(ProcessBlockPrincipal&);
0422     virtual void readRun_(RunPrincipal& runPrincipal);
0423     virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal);
0424     virtual void readEvent_(EventPrincipal& eventPrincipal) = 0;
0425     virtual bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext);
0426     virtual std::shared_ptr<FileBlock> readFile_();
0427     virtual void closeFile_() {}
0428     virtual bool goToEvent_(EventID const& eventID);
0429     virtual void setRun(RunNumber_t r);
0430     virtual void setLumi(LuminosityBlockNumber_t lb);
0431     virtual void rewind_();
0432     virtual void endJob();
0433     virtual std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_();
0434 
0435     virtual bool randomAccess_() const;
0436     virtual ProcessingController::ForwardState forwardState_() const;
0437     virtual ProcessingController::ReverseState reverseState_() const;
0438 
0439   private:
0440     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0441     int maxEvents_;
0442     int remainingEvents_;
0443     int maxLumis_;
0444     int remainingLumis_;
0445     int readCount_;
0446     int maxSecondsUntilRampdown_;
0447     std::chrono::time_point<std::chrono::steady_clock> processingStart_;
0448     ProcessingMode processingMode_;
0449     ModuleDescription const moduleDescription_;
0450     ProductRegistry productRegistry_;
0451     edm::propagate_const<std::unique_ptr<ProcessHistoryRegistry>> processHistoryRegistry_;
0452     edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0453     edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0454     edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0455     std::string processGUID_;
0456     Timestamp time_;
0457     mutable bool newRun_;
0458     mutable bool newLumi_;
0459     bool eventCached_;
0460     mutable ItemTypeInfo state_;
0461     mutable std::shared_ptr<RunAuxiliary> runAuxiliary_;
0462     mutable std::shared_ptr<LuminosityBlockAuxiliary> lumiAuxiliary_;
0463     std::string statusFileName_;
0464 
0465     unsigned int numberOfEventsBeforeBigSkip_;
0466   };
0467 }  // namespace edm
0468 
0469 #endif