Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-12-10 02:50:55

0001 #ifndef FWCore_Framework_InputSource_h
0002 #define FWCore_Framework_InputSource_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 InputSource: Abstract interface for all input sources.
0007 
0008 Some examples of InputSource subclasses are:
0009 
0010  1) PoolSource: Handles things related to reading from an EDM/ROOT file.
0011  This source provides for delayed loading of data.
0012  2) EmptySource: Handles similar tasks for the case where there is no
0013  data in the input.
0014 
0015 ----------------------------------------------------------------------*/
0016 
0017 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0018 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0020 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0021 #include "DataFormats/Provenance/interface/RunID.h"
0022 #include "DataFormats/Provenance/interface/Timestamp.h"
0023 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0024 #include "FWCore/Framework/interface/Frameworkfwd.h"
0025 #include "FWCore/Framework/interface/ProcessingController.h"
0026 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0027 #include "FWCore/Utilities/interface/RunIndex.h"
0028 #include "FWCore/Utilities/interface/Signal.h"
0029 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0030 #include "FWCore/Utilities/interface/StreamID.h"
0031 
0032 #include <memory>
0033 #include <string>
0034 #include <chrono>
0035 #include <mutex>
0036 
0037 namespace edm {
0038   class ActivityRegistry;
0039   class BranchIDListHelper;
0040   class ConfigurationDescriptions;
0041   class HistoryAppender;
0042   class ParameterSet;
0043   class ParameterSetDescription;
0044   class ProcessContext;
0045   class ProcessHistoryRegistry;
0046   class ProductRegistry;
0047   class StreamContext;
0048   class ModuleCallingContext;
0049   class SharedResourcesAcquirer;
0050   class ThinnedAssociationsHelper;
0051 
0052   class InputSource {
0053   public:
0054     enum ItemType { IsInvalid, IsStop, IsFile, IsRun, IsLumi, IsEvent, IsRepeat, IsSynchronize };
0055 
0056     enum ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents };
0057 
0058     /// Constructor
0059     explicit InputSource(ParameterSet const&, InputSourceDescription const&);
0060 
0061     /// Destructor
0062     virtual ~InputSource() noexcept(false);
0063 
0064     InputSource(InputSource const&) = delete;             // Disallow copying and moving
0065     InputSource& operator=(InputSource const&) = delete;  // Disallow copying and moving
0066 
0067     static void fillDescriptions(ConfigurationDescriptions& descriptions);
0068     static const std::string& baseType();
0069     static void fillDescription(ParameterSetDescription& desc);
0070     static void prevalidate(ConfigurationDescriptions&);
0071 
0072     /// Advances the source to the next item
0073     ItemType nextItemType();
0074 
0075     /// Read next event
0076     void readEvent(EventPrincipal& ep, StreamContext&);
0077 
0078     /// Read a specific event
0079     bool readEvent(EventPrincipal& ep, EventID const&, StreamContext&);
0080 
0081     /// Read next luminosity block Auxilary
0082     std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary();
0083 
0084     /// Read next run Auxiliary
0085     std::shared_ptr<RunAuxiliary> readRunAuxiliary();
0086 
0087     /// Read next run (new run)
0088     void readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender);
0089 
0090     /// Read next run (same as a prior run)
0091     void readAndMergeRun(RunPrincipal& rp);
0092 
0093     /// Read next luminosity block (new lumi)
0094     void readLuminosityBlock(LuminosityBlockPrincipal& lumiPrincipal, HistoryAppender& historyAppender);
0095 
0096     /// Read next luminosity block (same as a prior lumi)
0097     void readAndMergeLumi(LuminosityBlockPrincipal& lbp);
0098 
0099     /// Fill the ProcessBlockHelper with info for the current file
0100     void fillProcessBlockHelper();
0101 
0102     /// Next process block, return false if there is none, sets the processName in the principal
0103     bool nextProcessBlock(ProcessBlockPrincipal&);
0104 
0105     /// Read next process block
0106     void readProcessBlock(ProcessBlockPrincipal&);
0107 
0108     /// Read next file
0109     std::shared_ptr<FileBlock> readFile();
0110 
0111     /// close current file
0112     void closeFile(FileBlock*, bool cleaningUpAfterException);
0113 
0114     /// Skip the number of events specified.
0115     /// Offset may be negative.
0116     void skipEvents(int offset);
0117 
0118     bool goToEvent(EventID const& eventID);
0119 
0120     /// Begin again at the first event
0121     void rewind();
0122 
0123     /// Set the run number
0124     void setRunNumber(RunNumber_t r) { setRun(r); }
0125 
0126     /// Set the luminosity block ID
0127     void setLuminosityBlockNumber_t(LuminosityBlockNumber_t lb) { setLumi(lb); }
0128 
0129     /// issue an event report
0130     void issueReports(EventID const& eventID, StreamID streamID);
0131 
0132     /// Register any produced products
0133     virtual void registerProducts();
0134 
0135     /// Accessors for product registry
0136     std::shared_ptr<ProductRegistry const> productRegistry() const { return get_underlying_safe(productRegistry_); }
0137     std::shared_ptr<ProductRegistry>& productRegistry() { return get_underlying_safe(productRegistry_); }
0138 
0139     /// Accessors for process history registry.
0140     ProcessHistoryRegistry const& processHistoryRegistry() const { return *processHistoryRegistry_; }
0141     ProcessHistoryRegistry& processHistoryRegistry() { return *processHistoryRegistry_; }
0142 
0143     /// Accessors for branchIDListHelper
0144     std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0145       return get_underlying_safe(branchIDListHelper_);
0146     }
0147     std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0148 
0149     /// Accessors for processBlockHelper
0150     std::shared_ptr<ProcessBlockHelper const> processBlockHelper() const {
0151       return get_underlying_safe(processBlockHelper_);
0152     }
0153     std::shared_ptr<ProcessBlockHelper>& processBlockHelper() { return get_underlying_safe(processBlockHelper_); }
0154 
0155     /// Accessors for thinnedAssociationsHelper
0156     std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0157       return get_underlying_safe(thinnedAssociationsHelper_);
0158     }
0159     std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0160       return get_underlying_safe(thinnedAssociationsHelper_);
0161     }
0162 
0163     /// Reset the remaining number of events/lumis to the maximum number.
0164     void repeat() {
0165       remainingEvents_ = maxEvents_;
0166       remainingLumis_ = maxLumis_;
0167     }
0168 
0169     /// Returns nullptr if no resource shared between the Source and a DelayedReader
0170     std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader();
0171 
0172     /// Accessor for maximum number of events to be read.
0173     /// -1 is used for unlimited.
0174     int maxEvents() const { return maxEvents_; }
0175 
0176     /// Accessor for remaining number of events to be read.
0177     /// -1 is used for unlimited.
0178     int remainingEvents() const { return remainingEvents_; }
0179 
0180     /// Accessor for maximum number of lumis to be read.
0181     /// -1 is used for unlimited.
0182     int maxLuminosityBlocks() const { return maxLumis_; }
0183 
0184     /// Accessor for remaining number of lumis to be read.
0185     /// -1 is used for unlimited.
0186     int remainingLuminosityBlocks() const { return remainingLumis_; }
0187 
0188     /// Accessor for 'module' description.
0189     ModuleDescription const& moduleDescription() const { return moduleDescription_; }
0190 
0191     /// Accessor for Process Configuration
0192     ProcessConfiguration const& processConfiguration() const { return moduleDescription().processConfiguration(); }
0193 
0194     /// Accessor for global process identifier
0195     std::string const& processGUID() const { return processGUID_; }
0196 
0197     /// Called by framework at beginning of job
0198     void doBeginJob();
0199 
0200     /// Called by framework at end of job
0201     void doEndJob();
0202 
0203     /// Called by framework at beginning of lumi block
0204     virtual void doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*);
0205 
0206     /// Called by framework at beginning of run
0207     virtual void doBeginRun(RunPrincipal& rp, ProcessContext const*);
0208 
0209     /// Accessor for the current time, as seen by the input source
0210     Timestamp const& timestamp() const { return time_; }
0211 
0212     /// Accessor for the reduced process history ID of the current run.
0213     /// This is the ID of the input process history which does not include
0214     /// the current process.
0215     ProcessHistoryID const& reducedProcessHistoryID() const;
0216 
0217     /// Accessor for current run number
0218     RunNumber_t run() const;
0219 
0220     /// Accessor for current luminosity block number
0221     LuminosityBlockNumber_t luminosityBlock() const;
0222 
0223     /// RunsLumisAndEvents (default), RunsAndLumis, or Runs.
0224     ProcessingMode processingMode() const { return processingMode_; }
0225 
0226     /// Accessor for Activity Registry
0227     std::shared_ptr<ActivityRegistry> actReg() const { return actReg_; }
0228 
0229     /// Called by the framework to merge or insert run in principal cache.
0230     std::shared_ptr<RunAuxiliary> runAuxiliary() const { return runAuxiliary_; }
0231 
0232     /// Called by the framework to merge or insert lumi in principal cache.
0233     std::shared_ptr<LuminosityBlockAuxiliary> luminosityBlockAuxiliary() const { return lumiAuxiliary_; }
0234 
0235     bool randomAccess() const;
0236     ProcessingController::ForwardState forwardState() const;
0237     ProcessingController::ReverseState reverseState() const;
0238 
0239     class EventSourceSentry {
0240     public:
0241       EventSourceSentry(InputSource const& source, StreamContext& sc);
0242       ~EventSourceSentry();
0243 
0244       EventSourceSentry(EventSourceSentry const&) = delete;             // Disallow copying and moving
0245       EventSourceSentry& operator=(EventSourceSentry const&) = delete;  // Disallow copying and moving
0246 
0247     private:
0248       InputSource const& source_;
0249       StreamContext& sc_;
0250     };
0251 
0252     class LumiSourceSentry {
0253     public:
0254       LumiSourceSentry(InputSource const& source, LuminosityBlockIndex id);
0255       ~LumiSourceSentry();
0256 
0257       LumiSourceSentry(LumiSourceSentry const&) = delete;             // Disallow copying and moving
0258       LumiSourceSentry& operator=(LumiSourceSentry const&) = delete;  // Disallow copying and moving
0259 
0260     private:
0261       InputSource const& source_;
0262       LuminosityBlockIndex index_;
0263     };
0264 
0265     class RunSourceSentry {
0266     public:
0267       RunSourceSentry(InputSource const& source, RunIndex id);
0268       ~RunSourceSentry();
0269 
0270       RunSourceSentry(RunSourceSentry const&) = delete;             // Disallow copying and moving
0271       RunSourceSentry& operator=(RunSourceSentry const&) = delete;  // Disallow copying and moving
0272 
0273     private:
0274       InputSource const& source_;
0275       RunIndex index_;
0276     };
0277 
0278     class ProcessBlockSourceSentry {
0279     public:
0280       ProcessBlockSourceSentry(InputSource const&, std::string const&);
0281       ~ProcessBlockSourceSentry();
0282 
0283       ProcessBlockSourceSentry(ProcessBlockSourceSentry const&) = delete;             // Disallow copying and moving
0284       ProcessBlockSourceSentry& operator=(ProcessBlockSourceSentry const&) = delete;  // Disallow copying and moving
0285 
0286     private:
0287       InputSource const& source_;
0288       std::string const& processName_;
0289     };
0290 
0291     class FileOpenSentry {
0292     public:
0293       typedef signalslot::Signal<void(std::string const&)> Sig;
0294       explicit FileOpenSentry(InputSource const& source, std::string const& lfn);
0295       ~FileOpenSentry();
0296 
0297       FileOpenSentry(FileOpenSentry const&) = delete;             // Disallow copying and moving
0298       FileOpenSentry& operator=(FileOpenSentry const&) = delete;  // Disallow copying and moving
0299 
0300     private:
0301       Sig& post_;
0302       std::string const& lfn_;
0303     };
0304 
0305     class FileCloseSentry {
0306     public:
0307       typedef signalslot::Signal<void(std::string const&)> Sig;
0308       explicit FileCloseSentry(InputSource const& source, std::string const& lfn);
0309       ~FileCloseSentry();
0310 
0311       FileCloseSentry(FileCloseSentry const&) = delete;             // Disallow copying and moving
0312       FileCloseSentry& operator=(FileCloseSentry const&) = delete;  // Disallow copying and moving
0313 
0314     private:
0315       Sig& post_;
0316       std::string const& lfn_;
0317     };
0318 
0319     signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> preEventReadFromSourceSignal_;
0320     signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> postEventReadFromSourceSignal_;
0321 
0322   protected:
0323     virtual void skip(int offset);
0324 
0325     /// To set the current time, as seen by the input source
0326     void setTimestamp(Timestamp const& theTime) { time_ = theTime; }
0327 
0328     ProductRegistry& productRegistryUpdate() { return *productRegistry_; }
0329     ProcessHistoryRegistry& processHistoryRegistryForUpdate() { return *processHistoryRegistry_; }
0330     ItemType state() const { return state_; }
0331     void setRunAuxiliary(RunAuxiliary* rp) {
0332       runAuxiliary_.reset(rp);
0333       newRun_ = newLumi_ = true;
0334     }
0335     void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary* lbp) {
0336       lumiAuxiliary_.reset(lbp);
0337       newLumi_ = true;
0338     }
0339     void resetRunAuxiliary(bool isNewRun = true) const {
0340       runAuxiliary_.reset();
0341       newRun_ = newLumi_ = isNewRun;
0342     }
0343     void resetLuminosityBlockAuxiliary(bool isNewLumi = true) const {
0344       lumiAuxiliary_.reset();
0345       newLumi_ = isNewLumi;
0346     }
0347     void reset() const {
0348       resetLuminosityBlockAuxiliary();
0349       resetRunAuxiliary();
0350       state_ = IsInvalid;
0351     }
0352     bool newRun() const { return newRun_; }
0353     void setNewRun() { newRun_ = true; }
0354     void resetNewRun() { newRun_ = false; }
0355     bool newLumi() const { return newLumi_; }
0356     void setNewLumi() { newLumi_ = true; }
0357     void resetNewLumi() { newLumi_ = false; }
0358     bool eventCached() const { return eventCached_; }
0359     /// Called by the framework to merge or ached() const {return eventCached_;}
0360     void setEventCached() { eventCached_ = true; }
0361     void resetEventCached() { eventCached_ = false; }
0362 
0363     ///Called by inheriting classes when running multicore when the receiver has told them to
0364     /// skip some events.
0365     void decreaseRemainingEventsBy(int iSkipped);
0366 
0367     ///Begin protected makes it easier to do template programming
0368     virtual void beginJob();
0369 
0370   private:
0371     bool eventLimitReached() const { return remainingEvents_ == 0; }
0372     bool lumiLimitReached() const {
0373       if (remainingLumis_ == 0) {
0374         return true;
0375       }
0376       if (maxSecondsUntilRampdown_ <= 0) {
0377         return false;
0378       }
0379       auto end = std::chrono::steady_clock::now();
0380       auto elapsed = end - processingStart_;
0381       if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > maxSecondsUntilRampdown_) {
0382         return true;
0383       }
0384       return false;
0385     }
0386     bool limitReached() const { return eventLimitReached() || lumiLimitReached(); }
0387     virtual ItemType getNextItemType() = 0;
0388     ItemType nextItemType_();
0389     virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_() = 0;
0390     virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() = 0;
0391     virtual void fillProcessBlockHelper_();
0392     virtual bool nextProcessBlock_(ProcessBlockPrincipal&);
0393     virtual void readProcessBlock_(ProcessBlockPrincipal&);
0394     virtual void readRun_(RunPrincipal& runPrincipal);
0395     virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal);
0396     virtual void readEvent_(EventPrincipal& eventPrincipal) = 0;
0397     virtual bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext);
0398     virtual std::shared_ptr<FileBlock> readFile_();
0399     virtual void closeFile_() {}
0400     virtual bool goToEvent_(EventID const& eventID);
0401     virtual void setRun(RunNumber_t r);
0402     virtual void setLumi(LuminosityBlockNumber_t lb);
0403     virtual void rewind_();
0404     virtual void endJob();
0405     virtual std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_();
0406 
0407     virtual bool randomAccess_() const;
0408     virtual ProcessingController::ForwardState forwardState_() const;
0409     virtual ProcessingController::ReverseState reverseState_() const;
0410 
0411   private:
0412     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0413     int maxEvents_;
0414     int remainingEvents_;
0415     int maxLumis_;
0416     int remainingLumis_;
0417     int readCount_;
0418     int maxSecondsUntilRampdown_;
0419     std::chrono::time_point<std::chrono::steady_clock> processingStart_;
0420     ProcessingMode processingMode_;
0421     ModuleDescription const moduleDescription_;
0422     edm::propagate_const<std::shared_ptr<ProductRegistry>> productRegistry_;
0423     edm::propagate_const<std::unique_ptr<ProcessHistoryRegistry>> processHistoryRegistry_;
0424     edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0425     edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0426     edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0427     std::string processGUID_;
0428     Timestamp time_;
0429     mutable bool newRun_;
0430     mutable bool newLumi_;
0431     bool eventCached_;
0432     mutable ItemType state_;
0433     mutable std::shared_ptr<RunAuxiliary> runAuxiliary_;
0434     mutable std::shared_ptr<LuminosityBlockAuxiliary> lumiAuxiliary_;
0435     std::string statusFileName_;
0436 
0437     unsigned int numberOfEventsBeforeBigSkip_;
0438   };
0439 }  // namespace edm
0440 
0441 #endif