File indexing completed on 2022-08-04 22:45:02
0001 #ifndef FWCore_Framework_InputSource_h
0002 #define FWCore_Framework_InputSource_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0018 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0020 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0021 #include "DataFormats/Provenance/interface/RunID.h"
0022 #include "DataFormats/Provenance/interface/Timestamp.h"
0023 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0024 #include "FWCore/Framework/interface/Frameworkfwd.h"
0025 #include "FWCore/Framework/interface/ProcessingController.h"
0026 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0027 #include "FWCore/Utilities/interface/RunIndex.h"
0028 #include "FWCore/Utilities/interface/Signal.h"
0029 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0030 #include "FWCore/Utilities/interface/StreamID.h"
0031
0032 #include <memory>
0033 #include <string>
0034 #include <chrono>
0035 #include <mutex>
0036
0037 namespace edm {
0038 class ActivityRegistry;
0039 class BranchIDListHelper;
0040 class ConfigurationDescriptions;
0041 class HistoryAppender;
0042 class ParameterSet;
0043 class ParameterSetDescription;
0044 class ProcessContext;
0045 class ProcessHistoryRegistry;
0046 class ProductRegistry;
0047 class StreamContext;
0048 class ModuleCallingContext;
0049 class SharedResourcesAcquirer;
0050 class ThinnedAssociationsHelper;
0051
0052 class InputSource {
0053 public:
0054 enum ItemType { IsInvalid, IsStop, IsFile, IsRun, IsLumi, IsEvent, IsRepeat, IsSynchronize };
0055
0056 enum ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents };
0057
0058
0059 explicit InputSource(ParameterSet const&, InputSourceDescription const&);
0060
0061
0062 virtual ~InputSource() noexcept(false);
0063
0064 InputSource(InputSource const&) = delete;
0065 InputSource& operator=(InputSource const&) = delete;
0066
0067 static void fillDescriptions(ConfigurationDescriptions& descriptions);
0068 static const std::string& baseType();
0069 static void fillDescription(ParameterSetDescription& desc);
0070 static void prevalidate(ConfigurationDescriptions&);
0071
0072
0073 ItemType nextItemType();
0074
0075
0076 void readEvent(EventPrincipal& ep, StreamContext&);
0077
0078
0079 bool readEvent(EventPrincipal& ep, EventID const&, StreamContext&);
0080
0081
0082 std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary();
0083
0084
0085 std::shared_ptr<RunAuxiliary> readRunAuxiliary();
0086
0087
0088 void readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender);
0089
0090
0091 void readAndMergeRun(RunPrincipal& rp);
0092
0093
0094 void readLuminosityBlock(LuminosityBlockPrincipal& lumiPrincipal, HistoryAppender& historyAppender);
0095
0096
0097 void readAndMergeLumi(LuminosityBlockPrincipal& lbp);
0098
0099
0100 void fillProcessBlockHelper();
0101
0102
0103 bool nextProcessBlock(ProcessBlockPrincipal&);
0104
0105
0106 void readProcessBlock(ProcessBlockPrincipal&);
0107
0108
0109 std::shared_ptr<FileBlock> readFile();
0110
0111
0112 void closeFile(FileBlock*, bool cleaningUpAfterException);
0113
0114
0115
0116 void skipEvents(int offset);
0117
0118 bool goToEvent(EventID const& eventID);
0119
0120
0121 void rewind();
0122
0123
0124 void setRunNumber(RunNumber_t r) { setRun(r); }
0125
0126
0127 void setLuminosityBlockNumber_t(LuminosityBlockNumber_t lb) { setLumi(lb); }
0128
0129
0130 void issueReports(EventID const& eventID, StreamID streamID);
0131
0132
0133 virtual void registerProducts();
0134
0135
0136 std::shared_ptr<ProductRegistry const> productRegistry() const { return get_underlying_safe(productRegistry_); }
0137
0138
0139 ProcessHistoryRegistry const& processHistoryRegistry() const { return *processHistoryRegistry_; }
0140 ProcessHistoryRegistry& processHistoryRegistry() { return *processHistoryRegistry_; }
0141
0142
0143 std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0144 return get_underlying_safe(branchIDListHelper_);
0145 }
0146 std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0147
0148
0149 std::shared_ptr<ProcessBlockHelper const> processBlockHelper() const {
0150 return get_underlying_safe(processBlockHelper_);
0151 }
0152 std::shared_ptr<ProcessBlockHelper>& processBlockHelper() { return get_underlying_safe(processBlockHelper_); }
0153
0154
0155 std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0156 return get_underlying_safe(thinnedAssociationsHelper_);
0157 }
0158 std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0159 return get_underlying_safe(thinnedAssociationsHelper_);
0160 }
0161
0162
0163 void repeat() {
0164 remainingEvents_ = maxEvents_;
0165 remainingLumis_ = maxLumis_;
0166 }
0167
0168
0169 std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader();
0170
0171
0172 void switchTo(std::shared_ptr<ProductRegistry> iOther) { productRegistry_ = iOther; }
0173
0174
0175
0176 int maxEvents() const { return maxEvents_; }
0177
0178
0179
0180 int remainingEvents() const { return remainingEvents_; }
0181
0182
0183
0184 int maxLuminosityBlocks() const { return maxLumis_; }
0185
0186
0187
0188 int remainingLuminosityBlocks() const { return remainingLumis_; }
0189
0190
0191 ModuleDescription const& moduleDescription() const { return moduleDescription_; }
0192
0193
0194 ProcessConfiguration const& processConfiguration() const { return moduleDescription().processConfiguration(); }
0195
0196
0197 std::string const& processGUID() const { return processGUID_; }
0198
0199
0200 void doBeginJob();
0201
0202
0203 void doEndJob();
0204
0205
0206 virtual void doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*);
0207
0208
0209 virtual void doBeginRun(RunPrincipal& rp, ProcessContext const*);
0210
0211
0212 Timestamp const& timestamp() const { return time_; }
0213
0214
0215
0216
0217 ProcessHistoryID const& reducedProcessHistoryID() const;
0218
0219
0220 RunNumber_t run() const;
0221
0222
0223 LuminosityBlockNumber_t luminosityBlock() const;
0224
0225
0226 ProcessingMode processingMode() const { return processingMode_; }
0227
0228
0229 std::shared_ptr<ActivityRegistry> actReg() const { return actReg_; }
0230
0231
0232 std::shared_ptr<RunAuxiliary> runAuxiliary() const { return runAuxiliary_; }
0233
0234
0235 std::shared_ptr<LuminosityBlockAuxiliary> luminosityBlockAuxiliary() const { return lumiAuxiliary_; }
0236
0237 bool randomAccess() const;
0238 ProcessingController::ForwardState forwardState() const;
0239 ProcessingController::ReverseState reverseState() const;
0240
0241 class EventSourceSentry {
0242 public:
0243 EventSourceSentry(InputSource const& source, StreamContext& sc);
0244 ~EventSourceSentry();
0245
0246 EventSourceSentry(EventSourceSentry const&) = delete;
0247 EventSourceSentry& operator=(EventSourceSentry const&) = delete;
0248
0249 private:
0250 InputSource const& source_;
0251 StreamContext& sc_;
0252 };
0253
0254 class LumiSourceSentry {
0255 public:
0256 LumiSourceSentry(InputSource const& source, LuminosityBlockIndex id);
0257 ~LumiSourceSentry();
0258
0259 LumiSourceSentry(LumiSourceSentry const&) = delete;
0260 LumiSourceSentry& operator=(LumiSourceSentry const&) = delete;
0261
0262 private:
0263 InputSource const& source_;
0264 LuminosityBlockIndex index_;
0265 };
0266
0267 class RunSourceSentry {
0268 public:
0269 RunSourceSentry(InputSource const& source, RunIndex id);
0270 ~RunSourceSentry();
0271
0272 RunSourceSentry(RunSourceSentry const&) = delete;
0273 RunSourceSentry& operator=(RunSourceSentry const&) = delete;
0274
0275 private:
0276 InputSource const& source_;
0277 RunIndex index_;
0278 };
0279
0280 class ProcessBlockSourceSentry {
0281 public:
0282 ProcessBlockSourceSentry(InputSource const&, std::string const&);
0283 ~ProcessBlockSourceSentry();
0284
0285 ProcessBlockSourceSentry(ProcessBlockSourceSentry const&) = delete;
0286 ProcessBlockSourceSentry& operator=(ProcessBlockSourceSentry const&) = delete;
0287
0288 private:
0289 InputSource const& source_;
0290 std::string const& processName_;
0291 };
0292
0293 class FileOpenSentry {
0294 public:
0295 typedef signalslot::Signal<void(std::string const&)> Sig;
0296 explicit FileOpenSentry(InputSource const& source, std::string const& lfn);
0297 ~FileOpenSentry();
0298
0299 FileOpenSentry(FileOpenSentry const&) = delete;
0300 FileOpenSentry& operator=(FileOpenSentry const&) = delete;
0301
0302 private:
0303 Sig& post_;
0304 std::string const& lfn_;
0305 };
0306
0307 class FileCloseSentry {
0308 public:
0309 typedef signalslot::Signal<void(std::string const&)> Sig;
0310 explicit FileCloseSentry(InputSource const& source, std::string const& lfn);
0311 ~FileCloseSentry();
0312
0313 FileCloseSentry(FileCloseSentry const&) = delete;
0314 FileCloseSentry& operator=(FileCloseSentry const&) = delete;
0315
0316 private:
0317 Sig& post_;
0318 std::string const& lfn_;
0319 };
0320
0321 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> preEventReadFromSourceSignal_;
0322 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> postEventReadFromSourceSignal_;
0323
0324 protected:
0325 virtual void skip(int offset);
0326
0327
0328 void setTimestamp(Timestamp const& theTime) { time_ = theTime; }
0329
0330 ProductRegistry& productRegistryUpdate() { return *productRegistry_; }
0331 ProcessHistoryRegistry& processHistoryRegistryForUpdate() { return *processHistoryRegistry_; }
0332 ItemType state() const { return state_; }
0333 void setRunAuxiliary(RunAuxiliary* rp) {
0334 runAuxiliary_.reset(rp);
0335 newRun_ = newLumi_ = true;
0336 }
0337 void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary* lbp) {
0338 lumiAuxiliary_.reset(lbp);
0339 newLumi_ = true;
0340 }
0341 void resetRunAuxiliary(bool isNewRun = true) const {
0342 runAuxiliary_.reset();
0343 newRun_ = newLumi_ = isNewRun;
0344 }
0345 void resetLuminosityBlockAuxiliary(bool isNewLumi = true) const {
0346 lumiAuxiliary_.reset();
0347 newLumi_ = isNewLumi;
0348 }
0349 void reset() const {
0350 resetLuminosityBlockAuxiliary();
0351 resetRunAuxiliary();
0352 state_ = IsInvalid;
0353 }
0354 bool newRun() const { return newRun_; }
0355 void setNewRun() { newRun_ = true; }
0356 void resetNewRun() { newRun_ = false; }
0357 bool newLumi() const { return newLumi_; }
0358 void setNewLumi() { newLumi_ = true; }
0359 void resetNewLumi() { newLumi_ = false; }
0360 bool eventCached() const { return eventCached_; }
0361
0362 void setEventCached() { eventCached_ = true; }
0363 void resetEventCached() { eventCached_ = false; }
0364
0365
0366
0367 void decreaseRemainingEventsBy(int iSkipped);
0368
0369
0370 virtual void beginJob();
0371
0372 private:
0373 bool eventLimitReached() const { return remainingEvents_ == 0; }
0374 bool lumiLimitReached() const {
0375 if (remainingLumis_ == 0) {
0376 return true;
0377 }
0378 if (maxSecondsUntilRampdown_ <= 0) {
0379 return false;
0380 }
0381 auto end = std::chrono::steady_clock::now();
0382 auto elapsed = end - processingStart_;
0383 if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > maxSecondsUntilRampdown_) {
0384 return true;
0385 }
0386 return false;
0387 }
0388 bool limitReached() const { return eventLimitReached() || lumiLimitReached(); }
0389 virtual ItemType getNextItemType() = 0;
0390 ItemType nextItemType_();
0391 virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_() = 0;
0392 virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() = 0;
0393 virtual void fillProcessBlockHelper_();
0394 virtual bool nextProcessBlock_(ProcessBlockPrincipal&);
0395 virtual void readProcessBlock_(ProcessBlockPrincipal&);
0396 virtual void readRun_(RunPrincipal& runPrincipal);
0397 virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal);
0398 virtual void readEvent_(EventPrincipal& eventPrincipal) = 0;
0399 virtual bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext);
0400 virtual std::shared_ptr<FileBlock> readFile_();
0401 virtual void closeFile_() {}
0402 virtual bool goToEvent_(EventID const& eventID);
0403 virtual void setRun(RunNumber_t r);
0404 virtual void setLumi(LuminosityBlockNumber_t lb);
0405 virtual void rewind_();
0406 virtual void endJob();
0407 virtual std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_();
0408
0409 virtual bool randomAccess_() const;
0410 virtual ProcessingController::ForwardState forwardState_() const;
0411 virtual ProcessingController::ReverseState reverseState_() const;
0412
0413 private:
0414 std::shared_ptr<ActivityRegistry> actReg_;
0415 int maxEvents_;
0416 int remainingEvents_;
0417 int maxLumis_;
0418 int remainingLumis_;
0419 int readCount_;
0420 int maxSecondsUntilRampdown_;
0421 std::chrono::time_point<std::chrono::steady_clock> processingStart_;
0422 ProcessingMode processingMode_;
0423 ModuleDescription const moduleDescription_;
0424 edm::propagate_const<std::shared_ptr<ProductRegistry>> productRegistry_;
0425 edm::propagate_const<std::unique_ptr<ProcessHistoryRegistry>> processHistoryRegistry_;
0426 edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0427 edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0428 edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0429 std::string processGUID_;
0430 Timestamp time_;
0431 mutable bool newRun_;
0432 mutable bool newLumi_;
0433 bool eventCached_;
0434 mutable ItemType state_;
0435 mutable std::shared_ptr<RunAuxiliary> runAuxiliary_;
0436 mutable std::shared_ptr<LuminosityBlockAuxiliary> lumiAuxiliary_;
0437 std::string statusFileName_;
0438
0439 unsigned int numberOfEventsBeforeBigSkip_;
0440 };
0441 }
0442
0443 #endif