Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:03

0001 #ifndef FWCore_Framework_InputSource_h
0002 #define FWCore_Framework_InputSource_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 InputSource: Abstract interface for all input sources.
0007 
0008 Some examples of InputSource subclasses are:
0009 
0010  1) PoolSource: Handles things related to reading from an EDM/ROOT file.
0011  This source provides for delayed loading of data.
0012  2) EmptySource: Handles similar tasks for the case where there is no
0013  data in the input.
0014 
0015 ----------------------------------------------------------------------*/
0016 
0017 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0018 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0020 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0021 #include "DataFormats/Provenance/interface/RunID.h"
0022 #include "DataFormats/Provenance/interface/Timestamp.h"
0023 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0024 #include "FWCore/Framework/interface/Frameworkfwd.h"
0025 #include "FWCore/Framework/interface/ProcessingController.h"
0026 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0027 #include "FWCore/Utilities/interface/RunIndex.h"
0028 #include "FWCore/Utilities/interface/Signal.h"
0029 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0030 #include "FWCore/Utilities/interface/StreamID.h"
0031 
0032 #include <memory>
0033 #include <string>
0034 #include <chrono>
0035 #include <mutex>
0036 
0037 namespace edm {
0038   class ActivityRegistry;
0039   class BranchIDListHelper;
0040   class ConfigurationDescriptions;
0041   class HistoryAppender;
0042   class ParameterSet;
0043   class ParameterSetDescription;
0044   class ProcessContext;
0045   class ProcessHistoryRegistry;
0046   class ProductRegistry;
0047   class StreamContext;
0048   class ModuleCallingContext;
0049   class SharedResourcesAcquirer;
0050   class ThinnedAssociationsHelper;
0051 
0052   class InputSource {
0053   public:
0054     enum class ItemType : char { IsInvalid, IsStop, IsFile, IsRun, IsLumi, IsEvent, IsRepeat, IsSynchronize };
0055     enum class ItemPosition : char { Invalid, LastItemToBeMerged, NotLastItemToBeMerged };
0056 
0057     class ItemTypeInfo {
0058     public:
0059       constexpr ItemTypeInfo(ItemType type = ItemType::IsInvalid, ItemPosition position = ItemPosition::Invalid)
0060           : type_(type), position_(position) {}
0061       ItemType itemType() const { return type_; }
0062       ItemPosition itemPosition() const { return position_; }
0063 
0064       // Note that conversion to ItemType is defined and often used to
0065       // compare an ItemTypeInfo with an ItemType.
0066       // operator== of two ItemTypeInfo's is intentionally NOT defined.
0067       // The constructor also allows implicit conversion from ItemType and
0068       // often assignment from ItemType to ItemTypeInfo occurs.
0069       operator ItemType() const { return type_; }
0070 
0071     private:
0072       ItemType type_;
0073 
0074       // position_ should always be Invalid if the itemType_ is not IsRun or IsLumi.
0075       // Even for runs and lumis, it is OK to leave it Invalid because the
0076       // Framework can figure this out based on the next item. Offline it is
0077       // simplest to always leave it Invalid. For online sources, there are
0078       // optimizations that the Framework can use when it knows that a run or
0079       // lumi is the last to be merged before the following item is known. This
0080       // is useful in cases where the function named getNextItemType
0081       // might take a long time to return.
0082       ItemPosition position_;
0083     };
0084 
0085     enum ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents };
0086 
0087     /// Constructor
0088     explicit InputSource(ParameterSet const&, InputSourceDescription const&);
0089 
0090     /// Destructor
0091     virtual ~InputSource() noexcept(false);
0092 
0093     InputSource(InputSource const&) = delete;             // Disallow copying and moving
0094     InputSource& operator=(InputSource const&) = delete;  // Disallow copying and moving
0095 
0096     static void fillDescriptions(ConfigurationDescriptions& descriptions);
0097     static const std::string& baseType();
0098     static void fillDescription(ParameterSetDescription& desc);
0099     static void prevalidate(ConfigurationDescriptions&);
0100 
0101     /// Advances the source to the next item
0102     ItemTypeInfo nextItemType();
0103 
0104     /// Read next event
0105     void readEvent(EventPrincipal& ep, StreamContext&);
0106 
0107     /// Read a specific event
0108     bool readEvent(EventPrincipal& ep, EventID const&, StreamContext&);
0109 
0110     /// Read next luminosity block Auxilary
0111     std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary();
0112 
0113     /// Read next run Auxiliary
0114     std::shared_ptr<RunAuxiliary> readRunAuxiliary();
0115 
0116     /// Read next run (new run)
0117     void readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender);
0118 
0119     /// Read next run (same as a prior run)
0120     void readAndMergeRun(RunPrincipal& rp);
0121 
0122     /// Read next luminosity block (new lumi)
0123     void readLuminosityBlock(LuminosityBlockPrincipal& lumiPrincipal, HistoryAppender& historyAppender);
0124 
0125     /// Read next luminosity block (same as a prior lumi)
0126     void readAndMergeLumi(LuminosityBlockPrincipal& lbp);
0127 
0128     /// Fill the ProcessBlockHelper with info for the current file
0129     void fillProcessBlockHelper();
0130 
0131     /// Next process block, return false if there is none, sets the processName in the principal
0132     bool nextProcessBlock(ProcessBlockPrincipal&);
0133 
0134     /// Read next process block
0135     void readProcessBlock(ProcessBlockPrincipal&);
0136 
0137     /// Read next file
0138     std::shared_ptr<FileBlock> readFile();
0139 
0140     /// close current file
0141     void closeFile(FileBlock*, bool cleaningUpAfterException);
0142 
0143     /// Skip the number of events specified.
0144     /// Offset may be negative.
0145     void skipEvents(int offset);
0146 
0147     bool goToEvent(EventID const& eventID);
0148 
0149     /// Begin again at the first event
0150     void rewind();
0151 
0152     /// Set the run number
0153     void setRunNumber(RunNumber_t r) { setRun(r); }
0154 
0155     /// Set the luminosity block ID
0156     void setLuminosityBlockNumber_t(LuminosityBlockNumber_t lb) { setLumi(lb); }
0157 
0158     /// issue an event report
0159     void issueReports(EventID const& eventID, StreamID streamID);
0160 
0161     /// Register any produced products
0162     virtual void registerProducts();
0163 
0164     /// Accessors for product registry
0165     std::shared_ptr<ProductRegistry const> productRegistry() const { return get_underlying_safe(productRegistry_); }
0166 
0167     /// Accessors for process history registry.
0168     ProcessHistoryRegistry const& processHistoryRegistry() const { return *processHistoryRegistry_; }
0169     ProcessHistoryRegistry& processHistoryRegistry() { return *processHistoryRegistry_; }
0170 
0171     /// Accessors for branchIDListHelper
0172     std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0173       return get_underlying_safe(branchIDListHelper_);
0174     }
0175     std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0176 
0177     /// Accessors for processBlockHelper
0178     std::shared_ptr<ProcessBlockHelper const> processBlockHelper() const {
0179       return get_underlying_safe(processBlockHelper_);
0180     }
0181     std::shared_ptr<ProcessBlockHelper>& processBlockHelper() { return get_underlying_safe(processBlockHelper_); }
0182 
0183     /// Accessors for thinnedAssociationsHelper
0184     std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0185       return get_underlying_safe(thinnedAssociationsHelper_);
0186     }
0187     std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0188       return get_underlying_safe(thinnedAssociationsHelper_);
0189     }
0190 
0191     /// Reset the remaining number of events/lumis to the maximum number.
0192     void repeat() {
0193       remainingEvents_ = maxEvents_;
0194       remainingLumis_ = maxLumis_;
0195     }
0196 
0197     /// Returns nullptr if no resource shared between the Source and a DelayedReader
0198     std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader();
0199 
0200     /// switch to a different ProductRegistry.
0201     void switchTo(std::shared_ptr<ProductRegistry> iOther) { productRegistry_ = iOther; }
0202 
0203     /// Accessor for maximum number of events to be read.
0204     /// -1 is used for unlimited.
0205     int maxEvents() const { return maxEvents_; }
0206 
0207     /// Accessor for remaining number of events to be read.
0208     /// -1 is used for unlimited.
0209     int remainingEvents() const { return remainingEvents_; }
0210 
0211     /// Accessor for maximum number of lumis to be read.
0212     /// -1 is used for unlimited.
0213     int maxLuminosityBlocks() const { return maxLumis_; }
0214 
0215     /// Accessor for remaining number of lumis to be read.
0216     /// -1 is used for unlimited.
0217     int remainingLuminosityBlocks() const { return remainingLumis_; }
0218 
0219     /// Accessor for 'module' description.
0220     ModuleDescription const& moduleDescription() const { return moduleDescription_; }
0221 
0222     /// Accessor for Process Configuration
0223     ProcessConfiguration const& processConfiguration() const { return moduleDescription().processConfiguration(); }
0224 
0225     /// Accessor for global process identifier
0226     std::string const& processGUID() const { return processGUID_; }
0227 
0228     /// Called by framework at beginning of job
0229     void doBeginJob();
0230 
0231     /// Called by framework at end of job
0232     void doEndJob();
0233 
0234     /// Called by framework at beginning of lumi block
0235     virtual void doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*);
0236 
0237     /// Called by framework at beginning of run
0238     virtual void doBeginRun(RunPrincipal& rp, ProcessContext const*);
0239 
0240     /// Accessor for the current time, as seen by the input source
0241     Timestamp const& timestamp() const { return time_; }
0242 
0243     /// Accessor for the reduced process history ID of the current run.
0244     /// This is the ID of the input process history which does not include
0245     /// the current process.
0246     ProcessHistoryID const& reducedProcessHistoryID() const;
0247 
0248     /// Accessor for current run number
0249     RunNumber_t run() const;
0250 
0251     /// Accessor for current luminosity block number
0252     LuminosityBlockNumber_t luminosityBlock() const;
0253 
0254     /// RunsLumisAndEvents (default), RunsAndLumis, or Runs.
0255     ProcessingMode processingMode() const { return processingMode_; }
0256 
0257     /// Accessor for Activity Registry
0258     std::shared_ptr<ActivityRegistry> actReg() const { return actReg_; }
0259 
0260     /// Called by the framework to merge or insert run in principal cache.
0261     std::shared_ptr<RunAuxiliary> runAuxiliary() const { return runAuxiliary_; }
0262 
0263     /// Called by the framework to merge or insert lumi in principal cache.
0264     std::shared_ptr<LuminosityBlockAuxiliary> luminosityBlockAuxiliary() const { return lumiAuxiliary_; }
0265 
0266     bool randomAccess() const;
0267     ProcessingController::ForwardState forwardState() const;
0268     ProcessingController::ReverseState reverseState() const;
0269 
0270     class EventSourceSentry {
0271     public:
0272       EventSourceSentry(InputSource const& source, StreamContext& sc);
0273       ~EventSourceSentry();
0274 
0275       EventSourceSentry(EventSourceSentry const&) = delete;             // Disallow copying and moving
0276       EventSourceSentry& operator=(EventSourceSentry const&) = delete;  // Disallow copying and moving
0277 
0278     private:
0279       InputSource const& source_;
0280       StreamContext& sc_;
0281     };
0282 
0283     class LumiSourceSentry {
0284     public:
0285       LumiSourceSentry(InputSource const& source, LuminosityBlockIndex id);
0286       ~LumiSourceSentry();
0287 
0288       LumiSourceSentry(LumiSourceSentry const&) = delete;             // Disallow copying and moving
0289       LumiSourceSentry& operator=(LumiSourceSentry const&) = delete;  // Disallow copying and moving
0290 
0291     private:
0292       InputSource const& source_;
0293       LuminosityBlockIndex index_;
0294     };
0295 
0296     class RunSourceSentry {
0297     public:
0298       RunSourceSentry(InputSource const& source, RunIndex id);
0299       ~RunSourceSentry();
0300 
0301       RunSourceSentry(RunSourceSentry const&) = delete;             // Disallow copying and moving
0302       RunSourceSentry& operator=(RunSourceSentry const&) = delete;  // Disallow copying and moving
0303 
0304     private:
0305       InputSource const& source_;
0306       RunIndex index_;
0307     };
0308 
0309     class ProcessBlockSourceSentry {
0310     public:
0311       ProcessBlockSourceSentry(InputSource const&, std::string const&);
0312       ~ProcessBlockSourceSentry();
0313 
0314       ProcessBlockSourceSentry(ProcessBlockSourceSentry const&) = delete;             // Disallow copying and moving
0315       ProcessBlockSourceSentry& operator=(ProcessBlockSourceSentry const&) = delete;  // Disallow copying and moving
0316 
0317     private:
0318       InputSource const& source_;
0319       std::string const& processName_;
0320     };
0321 
0322     class FileOpenSentry {
0323     public:
0324       typedef signalslot::Signal<void(std::string const&)> Sig;
0325       explicit FileOpenSentry(InputSource const& source, std::string const& lfn);
0326       ~FileOpenSentry();
0327 
0328       FileOpenSentry(FileOpenSentry const&) = delete;             // Disallow copying and moving
0329       FileOpenSentry& operator=(FileOpenSentry const&) = delete;  // Disallow copying and moving
0330 
0331     private:
0332       Sig& post_;
0333       std::string const& lfn_;
0334     };
0335 
0336     class FileCloseSentry {
0337     public:
0338       typedef signalslot::Signal<void(std::string const&)> Sig;
0339       explicit FileCloseSentry(InputSource const& source, std::string const& lfn);
0340       ~FileCloseSentry();
0341 
0342       FileCloseSentry(FileCloseSentry const&) = delete;             // Disallow copying and moving
0343       FileCloseSentry& operator=(FileCloseSentry const&) = delete;  // Disallow copying and moving
0344 
0345     private:
0346       Sig& post_;
0347       std::string const& lfn_;
0348     };
0349 
0350     signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> preEventReadFromSourceSignal_;
0351     signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> postEventReadFromSourceSignal_;
0352 
0353   protected:
0354     virtual void skip(int offset);
0355 
0356     /// To set the current time, as seen by the input source
0357     void setTimestamp(Timestamp const& theTime) { time_ = theTime; }
0358 
0359     ProductRegistry& productRegistryUpdate() { return *productRegistry_; }
0360     ProcessHistoryRegistry& processHistoryRegistryForUpdate() { return *processHistoryRegistry_; }
0361     ItemTypeInfo state() const { return state_; }
0362     void setRunAuxiliary(RunAuxiliary* rp) {
0363       runAuxiliary_.reset(rp);
0364       newRun_ = newLumi_ = true;
0365     }
0366     void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary* lbp) {
0367       lumiAuxiliary_.reset(lbp);
0368       newLumi_ = true;
0369     }
0370     void resetRunAuxiliary(bool isNewRun = true) const {
0371       runAuxiliary_.reset();
0372       newRun_ = newLumi_ = isNewRun;
0373     }
0374     void resetLuminosityBlockAuxiliary(bool isNewLumi = true) const {
0375       lumiAuxiliary_.reset();
0376       newLumi_ = isNewLumi;
0377     }
0378     void reset() const {
0379       resetLuminosityBlockAuxiliary();
0380       resetRunAuxiliary();
0381       state_ = ItemTypeInfo();
0382     }
0383     bool newRun() const { return newRun_; }
0384     void setNewRun() { newRun_ = true; }
0385     void resetNewRun() { newRun_ = false; }
0386     bool newLumi() const { return newLumi_; }
0387     void setNewLumi() { newLumi_ = true; }
0388     void resetNewLumi() { newLumi_ = false; }
0389     bool eventCached() const { return eventCached_; }
0390     /// Called by the framework to merge or ached() const {return eventCached_;}
0391     void setEventCached() { eventCached_ = true; }
0392     void resetEventCached() { eventCached_ = false; }
0393 
0394     ///Called by inheriting classes when running multicore when the receiver has told them to
0395     /// skip some events.
0396     void decreaseRemainingEventsBy(int iSkipped);
0397 
0398     ///Begin protected makes it easier to do template programming
0399     virtual void beginJob();
0400 
0401   private:
0402     bool eventLimitReached() const { return remainingEvents_ == 0; }
0403     bool lumiLimitReached() const {
0404       if (remainingLumis_ == 0) {
0405         return true;
0406       }
0407       if (maxSecondsUntilRampdown_ <= 0) {
0408         return false;
0409       }
0410       auto end = std::chrono::steady_clock::now();
0411       auto elapsed = end - processingStart_;
0412       if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > maxSecondsUntilRampdown_) {
0413         return true;
0414       }
0415       return false;
0416     }
0417     bool limitReached() const { return eventLimitReached() || lumiLimitReached(); }
0418     virtual ItemTypeInfo getNextItemType() = 0;
0419     ItemTypeInfo nextItemType_();
0420     virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_() = 0;
0421     virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() = 0;
0422     virtual void fillProcessBlockHelper_();
0423     virtual bool nextProcessBlock_(ProcessBlockPrincipal&);
0424     virtual void readProcessBlock_(ProcessBlockPrincipal&);
0425     virtual void readRun_(RunPrincipal& runPrincipal);
0426     virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal);
0427     virtual void readEvent_(EventPrincipal& eventPrincipal) = 0;
0428     virtual bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext);
0429     virtual std::shared_ptr<FileBlock> readFile_();
0430     virtual void closeFile_() {}
0431     virtual bool goToEvent_(EventID const& eventID);
0432     virtual void setRun(RunNumber_t r);
0433     virtual void setLumi(LuminosityBlockNumber_t lb);
0434     virtual void rewind_();
0435     virtual void endJob();
0436     virtual std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_();
0437 
0438     virtual bool randomAccess_() const;
0439     virtual ProcessingController::ForwardState forwardState_() const;
0440     virtual ProcessingController::ReverseState reverseState_() const;
0441 
0442   private:
0443     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0444     int maxEvents_;
0445     int remainingEvents_;
0446     int maxLumis_;
0447     int remainingLumis_;
0448     int readCount_;
0449     int maxSecondsUntilRampdown_;
0450     std::chrono::time_point<std::chrono::steady_clock> processingStart_;
0451     ProcessingMode processingMode_;
0452     ModuleDescription const moduleDescription_;
0453     edm::propagate_const<std::shared_ptr<ProductRegistry>> productRegistry_;
0454     edm::propagate_const<std::unique_ptr<ProcessHistoryRegistry>> processHistoryRegistry_;
0455     edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0456     edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0457     edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0458     std::string processGUID_;
0459     Timestamp time_;
0460     mutable bool newRun_;
0461     mutable bool newLumi_;
0462     bool eventCached_;
0463     mutable ItemTypeInfo state_;
0464     mutable std::shared_ptr<RunAuxiliary> runAuxiliary_;
0465     mutable std::shared_ptr<LuminosityBlockAuxiliary> lumiAuxiliary_;
0466     std::string statusFileName_;
0467 
0468     unsigned int numberOfEventsBeforeBigSkip_;
0469   };
0470 }  // namespace edm
0471 
0472 #endif