Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-08-04 22:45:02

0001 #ifndef FWCore_Framework_InputSource_h
0002 #define FWCore_Framework_InputSource_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 InputSource: Abstract interface for all input sources.
0007 
0008 Some examples of InputSource subclasses are:
0009 
0010  1) PoolSource: Handles things related to reading from an EDM/ROOT file.
0011  This source provides for delayed loading of data.
0012  2) EmptySource: Handles similar tasks for the case where there is no
0013  data in the input.
0014 
0015 ----------------------------------------------------------------------*/
0016 
0017 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0018 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0020 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0021 #include "DataFormats/Provenance/interface/RunID.h"
0022 #include "DataFormats/Provenance/interface/Timestamp.h"
0023 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0024 #include "FWCore/Framework/interface/Frameworkfwd.h"
0025 #include "FWCore/Framework/interface/ProcessingController.h"
0026 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0027 #include "FWCore/Utilities/interface/RunIndex.h"
0028 #include "FWCore/Utilities/interface/Signal.h"
0029 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0030 #include "FWCore/Utilities/interface/StreamID.h"
0031 
0032 #include <memory>
0033 #include <string>
0034 #include <chrono>
0035 #include <mutex>
0036 
0037 namespace edm {
0038   class ActivityRegistry;
0039   class BranchIDListHelper;
0040   class ConfigurationDescriptions;
0041   class HistoryAppender;
0042   class ParameterSet;
0043   class ParameterSetDescription;
0044   class ProcessContext;
0045   class ProcessHistoryRegistry;
0046   class ProductRegistry;
0047   class StreamContext;
0048   class ModuleCallingContext;
0049   class SharedResourcesAcquirer;
0050   class ThinnedAssociationsHelper;
0051 
0052   class InputSource {
0053   public:
0054     enum ItemType { IsInvalid, IsStop, IsFile, IsRun, IsLumi, IsEvent, IsRepeat, IsSynchronize };
0055 
0056     enum ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents };
0057 
0058     /// Constructor
0059     explicit InputSource(ParameterSet const&, InputSourceDescription const&);
0060 
0061     /// Destructor
0062     virtual ~InputSource() noexcept(false);
0063 
0064     InputSource(InputSource const&) = delete;             // Disallow copying and moving
0065     InputSource& operator=(InputSource const&) = delete;  // Disallow copying and moving
0066 
0067     static void fillDescriptions(ConfigurationDescriptions& descriptions);
0068     static const std::string& baseType();
0069     static void fillDescription(ParameterSetDescription& desc);
0070     static void prevalidate(ConfigurationDescriptions&);
0071 
0072     /// Advances the source to the next item
0073     ItemType nextItemType();
0074 
0075     /// Read next event
0076     void readEvent(EventPrincipal& ep, StreamContext&);
0077 
0078     /// Read a specific event
0079     bool readEvent(EventPrincipal& ep, EventID const&, StreamContext&);
0080 
0081     /// Read next luminosity block Auxilary
0082     std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary();
0083 
0084     /// Read next run Auxiliary
0085     std::shared_ptr<RunAuxiliary> readRunAuxiliary();
0086 
0087     /// Read next run (new run)
0088     void readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender);
0089 
0090     /// Read next run (same as a prior run)
0091     void readAndMergeRun(RunPrincipal& rp);
0092 
0093     /// Read next luminosity block (new lumi)
0094     void readLuminosityBlock(LuminosityBlockPrincipal& lumiPrincipal, HistoryAppender& historyAppender);
0095 
0096     /// Read next luminosity block (same as a prior lumi)
0097     void readAndMergeLumi(LuminosityBlockPrincipal& lbp);
0098 
0099     /// Fill the ProcessBlockHelper with info for the current file
0100     void fillProcessBlockHelper();
0101 
0102     /// Next process block, return false if there is none, sets the processName in the principal
0103     bool nextProcessBlock(ProcessBlockPrincipal&);
0104 
0105     /// Read next process block
0106     void readProcessBlock(ProcessBlockPrincipal&);
0107 
0108     /// Read next file
0109     std::shared_ptr<FileBlock> readFile();
0110 
0111     /// close current file
0112     void closeFile(FileBlock*, bool cleaningUpAfterException);
0113 
0114     /// Skip the number of events specified.
0115     /// Offset may be negative.
0116     void skipEvents(int offset);
0117 
0118     bool goToEvent(EventID const& eventID);
0119 
0120     /// Begin again at the first event
0121     void rewind();
0122 
0123     /// Set the run number
0124     void setRunNumber(RunNumber_t r) { setRun(r); }
0125 
0126     /// Set the luminosity block ID
0127     void setLuminosityBlockNumber_t(LuminosityBlockNumber_t lb) { setLumi(lb); }
0128 
0129     /// issue an event report
0130     void issueReports(EventID const& eventID, StreamID streamID);
0131 
0132     /// Register any produced products
0133     virtual void registerProducts();
0134 
0135     /// Accessors for product registry
0136     std::shared_ptr<ProductRegistry const> productRegistry() const { return get_underlying_safe(productRegistry_); }
0137 
0138     /// Accessors for process history registry.
0139     ProcessHistoryRegistry const& processHistoryRegistry() const { return *processHistoryRegistry_; }
0140     ProcessHistoryRegistry& processHistoryRegistry() { return *processHistoryRegistry_; }
0141 
0142     /// Accessors for branchIDListHelper
0143     std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0144       return get_underlying_safe(branchIDListHelper_);
0145     }
0146     std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0147 
0148     /// Accessors for processBlockHelper
0149     std::shared_ptr<ProcessBlockHelper const> processBlockHelper() const {
0150       return get_underlying_safe(processBlockHelper_);
0151     }
0152     std::shared_ptr<ProcessBlockHelper>& processBlockHelper() { return get_underlying_safe(processBlockHelper_); }
0153 
0154     /// Accessors for thinnedAssociationsHelper
0155     std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0156       return get_underlying_safe(thinnedAssociationsHelper_);
0157     }
0158     std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0159       return get_underlying_safe(thinnedAssociationsHelper_);
0160     }
0161 
0162     /// Reset the remaining number of events/lumis to the maximum number.
0163     void repeat() {
0164       remainingEvents_ = maxEvents_;
0165       remainingLumis_ = maxLumis_;
0166     }
0167 
0168     /// Returns nullptr if no resource shared between the Source and a DelayedReader
0169     std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader();
0170 
0171     /// switch to a different ProductRegistry.
0172     void switchTo(std::shared_ptr<ProductRegistry> iOther) { productRegistry_ = iOther; }
0173 
0174     /// Accessor for maximum number of events to be read.
0175     /// -1 is used for unlimited.
0176     int maxEvents() const { return maxEvents_; }
0177 
0178     /// Accessor for remaining number of events to be read.
0179     /// -1 is used for unlimited.
0180     int remainingEvents() const { return remainingEvents_; }
0181 
0182     /// Accessor for maximum number of lumis to be read.
0183     /// -1 is used for unlimited.
0184     int maxLuminosityBlocks() const { return maxLumis_; }
0185 
0186     /// Accessor for remaining number of lumis to be read.
0187     /// -1 is used for unlimited.
0188     int remainingLuminosityBlocks() const { return remainingLumis_; }
0189 
0190     /// Accessor for 'module' description.
0191     ModuleDescription const& moduleDescription() const { return moduleDescription_; }
0192 
0193     /// Accessor for Process Configuration
0194     ProcessConfiguration const& processConfiguration() const { return moduleDescription().processConfiguration(); }
0195 
0196     /// Accessor for global process identifier
0197     std::string const& processGUID() const { return processGUID_; }
0198 
0199     /// Called by framework at beginning of job
0200     void doBeginJob();
0201 
0202     /// Called by framework at end of job
0203     void doEndJob();
0204 
0205     /// Called by framework at beginning of lumi block
0206     virtual void doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*);
0207 
0208     /// Called by framework at beginning of run
0209     virtual void doBeginRun(RunPrincipal& rp, ProcessContext const*);
0210 
0211     /// Accessor for the current time, as seen by the input source
0212     Timestamp const& timestamp() const { return time_; }
0213 
0214     /// Accessor for the reduced process history ID of the current run.
0215     /// This is the ID of the input process history which does not include
0216     /// the current process.
0217     ProcessHistoryID const& reducedProcessHistoryID() const;
0218 
0219     /// Accessor for current run number
0220     RunNumber_t run() const;
0221 
0222     /// Accessor for current luminosity block number
0223     LuminosityBlockNumber_t luminosityBlock() const;
0224 
0225     /// RunsLumisAndEvents (default), RunsAndLumis, or Runs.
0226     ProcessingMode processingMode() const { return processingMode_; }
0227 
0228     /// Accessor for Activity Registry
0229     std::shared_ptr<ActivityRegistry> actReg() const { return actReg_; }
0230 
0231     /// Called by the framework to merge or insert run in principal cache.
0232     std::shared_ptr<RunAuxiliary> runAuxiliary() const { return runAuxiliary_; }
0233 
0234     /// Called by the framework to merge or insert lumi in principal cache.
0235     std::shared_ptr<LuminosityBlockAuxiliary> luminosityBlockAuxiliary() const { return lumiAuxiliary_; }
0236 
0237     bool randomAccess() const;
0238     ProcessingController::ForwardState forwardState() const;
0239     ProcessingController::ReverseState reverseState() const;
0240 
0241     class EventSourceSentry {
0242     public:
0243       EventSourceSentry(InputSource const& source, StreamContext& sc);
0244       ~EventSourceSentry();
0245 
0246       EventSourceSentry(EventSourceSentry const&) = delete;             // Disallow copying and moving
0247       EventSourceSentry& operator=(EventSourceSentry const&) = delete;  // Disallow copying and moving
0248 
0249     private:
0250       InputSource const& source_;
0251       StreamContext& sc_;
0252     };
0253 
0254     class LumiSourceSentry {
0255     public:
0256       LumiSourceSentry(InputSource const& source, LuminosityBlockIndex id);
0257       ~LumiSourceSentry();
0258 
0259       LumiSourceSentry(LumiSourceSentry const&) = delete;             // Disallow copying and moving
0260       LumiSourceSentry& operator=(LumiSourceSentry const&) = delete;  // Disallow copying and moving
0261 
0262     private:
0263       InputSource const& source_;
0264       LuminosityBlockIndex index_;
0265     };
0266 
0267     class RunSourceSentry {
0268     public:
0269       RunSourceSentry(InputSource const& source, RunIndex id);
0270       ~RunSourceSentry();
0271 
0272       RunSourceSentry(RunSourceSentry const&) = delete;             // Disallow copying and moving
0273       RunSourceSentry& operator=(RunSourceSentry const&) = delete;  // Disallow copying and moving
0274 
0275     private:
0276       InputSource const& source_;
0277       RunIndex index_;
0278     };
0279 
0280     class ProcessBlockSourceSentry {
0281     public:
0282       ProcessBlockSourceSentry(InputSource const&, std::string const&);
0283       ~ProcessBlockSourceSentry();
0284 
0285       ProcessBlockSourceSentry(ProcessBlockSourceSentry const&) = delete;             // Disallow copying and moving
0286       ProcessBlockSourceSentry& operator=(ProcessBlockSourceSentry const&) = delete;  // Disallow copying and moving
0287 
0288     private:
0289       InputSource const& source_;
0290       std::string const& processName_;
0291     };
0292 
0293     class FileOpenSentry {
0294     public:
0295       typedef signalslot::Signal<void(std::string const&)> Sig;
0296       explicit FileOpenSentry(InputSource const& source, std::string const& lfn);
0297       ~FileOpenSentry();
0298 
0299       FileOpenSentry(FileOpenSentry const&) = delete;             // Disallow copying and moving
0300       FileOpenSentry& operator=(FileOpenSentry const&) = delete;  // Disallow copying and moving
0301 
0302     private:
0303       Sig& post_;
0304       std::string const& lfn_;
0305     };
0306 
0307     class FileCloseSentry {
0308     public:
0309       typedef signalslot::Signal<void(std::string const&)> Sig;
0310       explicit FileCloseSentry(InputSource const& source, std::string const& lfn);
0311       ~FileCloseSentry();
0312 
0313       FileCloseSentry(FileCloseSentry const&) = delete;             // Disallow copying and moving
0314       FileCloseSentry& operator=(FileCloseSentry const&) = delete;  // Disallow copying and moving
0315 
0316     private:
0317       Sig& post_;
0318       std::string const& lfn_;
0319     };
0320 
0321     signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> preEventReadFromSourceSignal_;
0322     signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> postEventReadFromSourceSignal_;
0323 
0324   protected:
0325     virtual void skip(int offset);
0326 
0327     /// To set the current time, as seen by the input source
0328     void setTimestamp(Timestamp const& theTime) { time_ = theTime; }
0329 
0330     ProductRegistry& productRegistryUpdate() { return *productRegistry_; }
0331     ProcessHistoryRegistry& processHistoryRegistryForUpdate() { return *processHistoryRegistry_; }
0332     ItemType state() const { return state_; }
0333     void setRunAuxiliary(RunAuxiliary* rp) {
0334       runAuxiliary_.reset(rp);
0335       newRun_ = newLumi_ = true;
0336     }
0337     void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary* lbp) {
0338       lumiAuxiliary_.reset(lbp);
0339       newLumi_ = true;
0340     }
0341     void resetRunAuxiliary(bool isNewRun = true) const {
0342       runAuxiliary_.reset();
0343       newRun_ = newLumi_ = isNewRun;
0344     }
0345     void resetLuminosityBlockAuxiliary(bool isNewLumi = true) const {
0346       lumiAuxiliary_.reset();
0347       newLumi_ = isNewLumi;
0348     }
0349     void reset() const {
0350       resetLuminosityBlockAuxiliary();
0351       resetRunAuxiliary();
0352       state_ = IsInvalid;
0353     }
0354     bool newRun() const { return newRun_; }
0355     void setNewRun() { newRun_ = true; }
0356     void resetNewRun() { newRun_ = false; }
0357     bool newLumi() const { return newLumi_; }
0358     void setNewLumi() { newLumi_ = true; }
0359     void resetNewLumi() { newLumi_ = false; }
0360     bool eventCached() const { return eventCached_; }
0361     /// Called by the framework to merge or ached() const {return eventCached_;}
0362     void setEventCached() { eventCached_ = true; }
0363     void resetEventCached() { eventCached_ = false; }
0364 
0365     ///Called by inheriting classes when running multicore when the receiver has told them to
0366     /// skip some events.
0367     void decreaseRemainingEventsBy(int iSkipped);
0368 
0369     ///Begin protected makes it easier to do template programming
0370     virtual void beginJob();
0371 
0372   private:
0373     bool eventLimitReached() const { return remainingEvents_ == 0; }
0374     bool lumiLimitReached() const {
0375       if (remainingLumis_ == 0) {
0376         return true;
0377       }
0378       if (maxSecondsUntilRampdown_ <= 0) {
0379         return false;
0380       }
0381       auto end = std::chrono::steady_clock::now();
0382       auto elapsed = end - processingStart_;
0383       if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > maxSecondsUntilRampdown_) {
0384         return true;
0385       }
0386       return false;
0387     }
0388     bool limitReached() const { return eventLimitReached() || lumiLimitReached(); }
0389     virtual ItemType getNextItemType() = 0;
0390     ItemType nextItemType_();
0391     virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_() = 0;
0392     virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() = 0;
0393     virtual void fillProcessBlockHelper_();
0394     virtual bool nextProcessBlock_(ProcessBlockPrincipal&);
0395     virtual void readProcessBlock_(ProcessBlockPrincipal&);
0396     virtual void readRun_(RunPrincipal& runPrincipal);
0397     virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal);
0398     virtual void readEvent_(EventPrincipal& eventPrincipal) = 0;
0399     virtual bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext);
0400     virtual std::shared_ptr<FileBlock> readFile_();
0401     virtual void closeFile_() {}
0402     virtual bool goToEvent_(EventID const& eventID);
0403     virtual void setRun(RunNumber_t r);
0404     virtual void setLumi(LuminosityBlockNumber_t lb);
0405     virtual void rewind_();
0406     virtual void endJob();
0407     virtual std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_();
0408 
0409     virtual bool randomAccess_() const;
0410     virtual ProcessingController::ForwardState forwardState_() const;
0411     virtual ProcessingController::ReverseState reverseState_() const;
0412 
0413   private:
0414     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0415     int maxEvents_;
0416     int remainingEvents_;
0417     int maxLumis_;
0418     int remainingLumis_;
0419     int readCount_;
0420     int maxSecondsUntilRampdown_;
0421     std::chrono::time_point<std::chrono::steady_clock> processingStart_;
0422     ProcessingMode processingMode_;
0423     ModuleDescription const moduleDescription_;
0424     edm::propagate_const<std::shared_ptr<ProductRegistry>> productRegistry_;
0425     edm::propagate_const<std::unique_ptr<ProcessHistoryRegistry>> processHistoryRegistry_;
0426     edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0427     edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0428     edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0429     std::string processGUID_;
0430     Timestamp time_;
0431     mutable bool newRun_;
0432     mutable bool newLumi_;
0433     bool eventCached_;
0434     mutable ItemType state_;
0435     mutable std::shared_ptr<RunAuxiliary> runAuxiliary_;
0436     mutable std::shared_ptr<LuminosityBlockAuxiliary> lumiAuxiliary_;
0437     std::string statusFileName_;
0438 
0439     unsigned int numberOfEventsBeforeBigSkip_;
0440   };
0441 }  // namespace edm
0442 
0443 #endif