File indexing completed on 2021-12-10 02:50:55
0001 #ifndef FWCore_Framework_InputSource_h
0002 #define FWCore_Framework_InputSource_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0018 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0020 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0021 #include "DataFormats/Provenance/interface/RunID.h"
0022 #include "DataFormats/Provenance/interface/Timestamp.h"
0023 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0024 #include "FWCore/Framework/interface/Frameworkfwd.h"
0025 #include "FWCore/Framework/interface/ProcessingController.h"
0026 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0027 #include "FWCore/Utilities/interface/RunIndex.h"
0028 #include "FWCore/Utilities/interface/Signal.h"
0029 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0030 #include "FWCore/Utilities/interface/StreamID.h"
0031
0032 #include <memory>
0033 #include <string>
0034 #include <chrono>
0035 #include <mutex>
0036
0037 namespace edm {
0038 class ActivityRegistry;
0039 class BranchIDListHelper;
0040 class ConfigurationDescriptions;
0041 class HistoryAppender;
0042 class ParameterSet;
0043 class ParameterSetDescription;
0044 class ProcessContext;
0045 class ProcessHistoryRegistry;
0046 class ProductRegistry;
0047 class StreamContext;
0048 class ModuleCallingContext;
0049 class SharedResourcesAcquirer;
0050 class ThinnedAssociationsHelper;
0051
0052 class InputSource {
0053 public:
0054 enum ItemType { IsInvalid, IsStop, IsFile, IsRun, IsLumi, IsEvent, IsRepeat, IsSynchronize };
0055
0056 enum ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents };
0057
0058
0059 explicit InputSource(ParameterSet const&, InputSourceDescription const&);
0060
0061
0062 virtual ~InputSource() noexcept(false);
0063
0064 InputSource(InputSource const&) = delete;
0065 InputSource& operator=(InputSource const&) = delete;
0066
0067 static void fillDescriptions(ConfigurationDescriptions& descriptions);
0068 static const std::string& baseType();
0069 static void fillDescription(ParameterSetDescription& desc);
0070 static void prevalidate(ConfigurationDescriptions&);
0071
0072
0073 ItemType nextItemType();
0074
0075
0076 void readEvent(EventPrincipal& ep, StreamContext&);
0077
0078
0079 bool readEvent(EventPrincipal& ep, EventID const&, StreamContext&);
0080
0081
0082 std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary();
0083
0084
0085 std::shared_ptr<RunAuxiliary> readRunAuxiliary();
0086
0087
0088 void readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender);
0089
0090
0091 void readAndMergeRun(RunPrincipal& rp);
0092
0093
0094 void readLuminosityBlock(LuminosityBlockPrincipal& lumiPrincipal, HistoryAppender& historyAppender);
0095
0096
0097 void readAndMergeLumi(LuminosityBlockPrincipal& lbp);
0098
0099
0100 void fillProcessBlockHelper();
0101
0102
0103 bool nextProcessBlock(ProcessBlockPrincipal&);
0104
0105
0106 void readProcessBlock(ProcessBlockPrincipal&);
0107
0108
0109 std::shared_ptr<FileBlock> readFile();
0110
0111
0112 void closeFile(FileBlock*, bool cleaningUpAfterException);
0113
0114
0115
0116 void skipEvents(int offset);
0117
0118 bool goToEvent(EventID const& eventID);
0119
0120
0121 void rewind();
0122
0123
0124 void setRunNumber(RunNumber_t r) { setRun(r); }
0125
0126
0127 void setLuminosityBlockNumber_t(LuminosityBlockNumber_t lb) { setLumi(lb); }
0128
0129
0130 void issueReports(EventID const& eventID, StreamID streamID);
0131
0132
0133 virtual void registerProducts();
0134
0135
0136 std::shared_ptr<ProductRegistry const> productRegistry() const { return get_underlying_safe(productRegistry_); }
0137 std::shared_ptr<ProductRegistry>& productRegistry() { return get_underlying_safe(productRegistry_); }
0138
0139
0140 ProcessHistoryRegistry const& processHistoryRegistry() const { return *processHistoryRegistry_; }
0141 ProcessHistoryRegistry& processHistoryRegistry() { return *processHistoryRegistry_; }
0142
0143
0144 std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0145 return get_underlying_safe(branchIDListHelper_);
0146 }
0147 std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0148
0149
0150 std::shared_ptr<ProcessBlockHelper const> processBlockHelper() const {
0151 return get_underlying_safe(processBlockHelper_);
0152 }
0153 std::shared_ptr<ProcessBlockHelper>& processBlockHelper() { return get_underlying_safe(processBlockHelper_); }
0154
0155
0156 std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0157 return get_underlying_safe(thinnedAssociationsHelper_);
0158 }
0159 std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0160 return get_underlying_safe(thinnedAssociationsHelper_);
0161 }
0162
0163
0164 void repeat() {
0165 remainingEvents_ = maxEvents_;
0166 remainingLumis_ = maxLumis_;
0167 }
0168
0169
0170 std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader();
0171
0172
0173
0174 int maxEvents() const { return maxEvents_; }
0175
0176
0177
0178 int remainingEvents() const { return remainingEvents_; }
0179
0180
0181
0182 int maxLuminosityBlocks() const { return maxLumis_; }
0183
0184
0185
0186 int remainingLuminosityBlocks() const { return remainingLumis_; }
0187
0188
0189 ModuleDescription const& moduleDescription() const { return moduleDescription_; }
0190
0191
0192 ProcessConfiguration const& processConfiguration() const { return moduleDescription().processConfiguration(); }
0193
0194
0195 std::string const& processGUID() const { return processGUID_; }
0196
0197
0198 void doBeginJob();
0199
0200
0201 void doEndJob();
0202
0203
0204 virtual void doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*);
0205
0206
0207 virtual void doBeginRun(RunPrincipal& rp, ProcessContext const*);
0208
0209
0210 Timestamp const& timestamp() const { return time_; }
0211
0212
0213
0214
0215 ProcessHistoryID const& reducedProcessHistoryID() const;
0216
0217
0218 RunNumber_t run() const;
0219
0220
0221 LuminosityBlockNumber_t luminosityBlock() const;
0222
0223
0224 ProcessingMode processingMode() const { return processingMode_; }
0225
0226
0227 std::shared_ptr<ActivityRegistry> actReg() const { return actReg_; }
0228
0229
0230 std::shared_ptr<RunAuxiliary> runAuxiliary() const { return runAuxiliary_; }
0231
0232
0233 std::shared_ptr<LuminosityBlockAuxiliary> luminosityBlockAuxiliary() const { return lumiAuxiliary_; }
0234
0235 bool randomAccess() const;
0236 ProcessingController::ForwardState forwardState() const;
0237 ProcessingController::ReverseState reverseState() const;
0238
0239 class EventSourceSentry {
0240 public:
0241 EventSourceSentry(InputSource const& source, StreamContext& sc);
0242 ~EventSourceSentry();
0243
0244 EventSourceSentry(EventSourceSentry const&) = delete;
0245 EventSourceSentry& operator=(EventSourceSentry const&) = delete;
0246
0247 private:
0248 InputSource const& source_;
0249 StreamContext& sc_;
0250 };
0251
0252 class LumiSourceSentry {
0253 public:
0254 LumiSourceSentry(InputSource const& source, LuminosityBlockIndex id);
0255 ~LumiSourceSentry();
0256
0257 LumiSourceSentry(LumiSourceSentry const&) = delete;
0258 LumiSourceSentry& operator=(LumiSourceSentry const&) = delete;
0259
0260 private:
0261 InputSource const& source_;
0262 LuminosityBlockIndex index_;
0263 };
0264
0265 class RunSourceSentry {
0266 public:
0267 RunSourceSentry(InputSource const& source, RunIndex id);
0268 ~RunSourceSentry();
0269
0270 RunSourceSentry(RunSourceSentry const&) = delete;
0271 RunSourceSentry& operator=(RunSourceSentry const&) = delete;
0272
0273 private:
0274 InputSource const& source_;
0275 RunIndex index_;
0276 };
0277
0278 class ProcessBlockSourceSentry {
0279 public:
0280 ProcessBlockSourceSentry(InputSource const&, std::string const&);
0281 ~ProcessBlockSourceSentry();
0282
0283 ProcessBlockSourceSentry(ProcessBlockSourceSentry const&) = delete;
0284 ProcessBlockSourceSentry& operator=(ProcessBlockSourceSentry const&) = delete;
0285
0286 private:
0287 InputSource const& source_;
0288 std::string const& processName_;
0289 };
0290
0291 class FileOpenSentry {
0292 public:
0293 typedef signalslot::Signal<void(std::string const&)> Sig;
0294 explicit FileOpenSentry(InputSource const& source, std::string const& lfn);
0295 ~FileOpenSentry();
0296
0297 FileOpenSentry(FileOpenSentry const&) = delete;
0298 FileOpenSentry& operator=(FileOpenSentry const&) = delete;
0299
0300 private:
0301 Sig& post_;
0302 std::string const& lfn_;
0303 };
0304
0305 class FileCloseSentry {
0306 public:
0307 typedef signalslot::Signal<void(std::string const&)> Sig;
0308 explicit FileCloseSentry(InputSource const& source, std::string const& lfn);
0309 ~FileCloseSentry();
0310
0311 FileCloseSentry(FileCloseSentry const&) = delete;
0312 FileCloseSentry& operator=(FileCloseSentry const&) = delete;
0313
0314 private:
0315 Sig& post_;
0316 std::string const& lfn_;
0317 };
0318
0319 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> preEventReadFromSourceSignal_;
0320 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> postEventReadFromSourceSignal_;
0321
0322 protected:
0323 virtual void skip(int offset);
0324
0325
0326 void setTimestamp(Timestamp const& theTime) { time_ = theTime; }
0327
0328 ProductRegistry& productRegistryUpdate() { return *productRegistry_; }
0329 ProcessHistoryRegistry& processHistoryRegistryForUpdate() { return *processHistoryRegistry_; }
0330 ItemType state() const { return state_; }
0331 void setRunAuxiliary(RunAuxiliary* rp) {
0332 runAuxiliary_.reset(rp);
0333 newRun_ = newLumi_ = true;
0334 }
0335 void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary* lbp) {
0336 lumiAuxiliary_.reset(lbp);
0337 newLumi_ = true;
0338 }
0339 void resetRunAuxiliary(bool isNewRun = true) const {
0340 runAuxiliary_.reset();
0341 newRun_ = newLumi_ = isNewRun;
0342 }
0343 void resetLuminosityBlockAuxiliary(bool isNewLumi = true) const {
0344 lumiAuxiliary_.reset();
0345 newLumi_ = isNewLumi;
0346 }
0347 void reset() const {
0348 resetLuminosityBlockAuxiliary();
0349 resetRunAuxiliary();
0350 state_ = IsInvalid;
0351 }
0352 bool newRun() const { return newRun_; }
0353 void setNewRun() { newRun_ = true; }
0354 void resetNewRun() { newRun_ = false; }
0355 bool newLumi() const { return newLumi_; }
0356 void setNewLumi() { newLumi_ = true; }
0357 void resetNewLumi() { newLumi_ = false; }
0358 bool eventCached() const { return eventCached_; }
0359
0360 void setEventCached() { eventCached_ = true; }
0361 void resetEventCached() { eventCached_ = false; }
0362
0363
0364
0365 void decreaseRemainingEventsBy(int iSkipped);
0366
0367
0368 virtual void beginJob();
0369
0370 private:
0371 bool eventLimitReached() const { return remainingEvents_ == 0; }
0372 bool lumiLimitReached() const {
0373 if (remainingLumis_ == 0) {
0374 return true;
0375 }
0376 if (maxSecondsUntilRampdown_ <= 0) {
0377 return false;
0378 }
0379 auto end = std::chrono::steady_clock::now();
0380 auto elapsed = end - processingStart_;
0381 if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > maxSecondsUntilRampdown_) {
0382 return true;
0383 }
0384 return false;
0385 }
0386 bool limitReached() const { return eventLimitReached() || lumiLimitReached(); }
0387 virtual ItemType getNextItemType() = 0;
0388 ItemType nextItemType_();
0389 virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_() = 0;
0390 virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() = 0;
0391 virtual void fillProcessBlockHelper_();
0392 virtual bool nextProcessBlock_(ProcessBlockPrincipal&);
0393 virtual void readProcessBlock_(ProcessBlockPrincipal&);
0394 virtual void readRun_(RunPrincipal& runPrincipal);
0395 virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal);
0396 virtual void readEvent_(EventPrincipal& eventPrincipal) = 0;
0397 virtual bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext);
0398 virtual std::shared_ptr<FileBlock> readFile_();
0399 virtual void closeFile_() {}
0400 virtual bool goToEvent_(EventID const& eventID);
0401 virtual void setRun(RunNumber_t r);
0402 virtual void setLumi(LuminosityBlockNumber_t lb);
0403 virtual void rewind_();
0404 virtual void endJob();
0405 virtual std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_();
0406
0407 virtual bool randomAccess_() const;
0408 virtual ProcessingController::ForwardState forwardState_() const;
0409 virtual ProcessingController::ReverseState reverseState_() const;
0410
0411 private:
0412 std::shared_ptr<ActivityRegistry> actReg_;
0413 int maxEvents_;
0414 int remainingEvents_;
0415 int maxLumis_;
0416 int remainingLumis_;
0417 int readCount_;
0418 int maxSecondsUntilRampdown_;
0419 std::chrono::time_point<std::chrono::steady_clock> processingStart_;
0420 ProcessingMode processingMode_;
0421 ModuleDescription const moduleDescription_;
0422 edm::propagate_const<std::shared_ptr<ProductRegistry>> productRegistry_;
0423 edm::propagate_const<std::unique_ptr<ProcessHistoryRegistry>> processHistoryRegistry_;
0424 edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0425 edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0426 edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0427 std::string processGUID_;
0428 Timestamp time_;
0429 mutable bool newRun_;
0430 mutable bool newLumi_;
0431 bool eventCached_;
0432 mutable ItemType state_;
0433 mutable std::shared_ptr<RunAuxiliary> runAuxiliary_;
0434 mutable std::shared_ptr<LuminosityBlockAuxiliary> lumiAuxiliary_;
0435 std::string statusFileName_;
0436
0437 unsigned int numberOfEventsBeforeBigSkip_;
0438 };
0439 }
0440
0441 #endif