File indexing completed on 2025-03-13 02:31:49
0001 #ifndef FWCore_Framework_InputSource_h
0002 #define FWCore_Framework_InputSource_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0018 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0020 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0021 #include "DataFormats/Provenance/interface/RunID.h"
0022 #include "DataFormats/Provenance/interface/Timestamp.h"
0023 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0024 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0025 #include "FWCore/Framework/interface/Frameworkfwd.h"
0026 #include "FWCore/Framework/interface/ProcessingController.h"
0027 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0028 #include "FWCore/Utilities/interface/RunIndex.h"
0029 #include "FWCore/Utilities/interface/Signal.h"
0030 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0031 #include "FWCore/Utilities/interface/StreamID.h"
0032
0033 #include <memory>
0034 #include <string>
0035 #include <chrono>
0036 #include <mutex>
0037
0038 namespace edm {
0039 class ActivityRegistry;
0040 class BranchIDListHelper;
0041 class ConfigurationDescriptions;
0042 class HistoryAppender;
0043 class ParameterSet;
0044 class ParameterSetDescription;
0045 class ProcessContext;
0046 class ProcessHistoryRegistry;
0047 class StreamContext;
0048 class ModuleCallingContext;
0049 class SharedResourcesAcquirer;
0050 class ThinnedAssociationsHelper;
0051
0052 class InputSource {
0053 public:
0054 enum class ItemType : char { IsInvalid, IsStop, IsFile, IsRun, IsLumi, IsEvent, IsRepeat, IsSynchronize };
0055 enum class ItemPosition : char { Invalid, LastItemToBeMerged, NotLastItemToBeMerged };
0056
0057 class ItemTypeInfo {
0058 public:
0059 constexpr ItemTypeInfo(ItemType type = ItemType::IsInvalid, ItemPosition position = ItemPosition::Invalid)
0060 : type_(type), position_(position) {}
0061 ItemType itemType() const { return type_; }
0062 ItemPosition itemPosition() const { return position_; }
0063
0064
0065
0066
0067
0068
0069 operator ItemType() const { return type_; }
0070
0071 private:
0072 ItemType type_;
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082 ItemPosition position_;
0083 };
0084
0085 enum ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents };
0086
0087
0088 explicit InputSource(ParameterSet const&, InputSourceDescription const&);
0089
0090
0091 virtual ~InputSource() noexcept(false);
0092
0093 InputSource(InputSource const&) = delete;
0094 InputSource& operator=(InputSource const&) = delete;
0095
0096 static void fillDescriptions(ConfigurationDescriptions& descriptions);
0097 static const std::string& baseType();
0098 static void fillDescription(ParameterSetDescription& desc);
0099 static void prevalidate(ConfigurationDescriptions&);
0100
0101
0102 ItemTypeInfo nextItemType();
0103
0104
0105 void readEvent(EventPrincipal& ep, StreamContext&);
0106
0107
0108 bool readEvent(EventPrincipal& ep, EventID const&, StreamContext&);
0109
0110
0111 std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary();
0112
0113
0114 std::shared_ptr<RunAuxiliary> readRunAuxiliary();
0115
0116
0117 void readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender);
0118
0119
0120 void readAndMergeRun(RunPrincipal& rp);
0121
0122
0123 void readLuminosityBlock(LuminosityBlockPrincipal& lumiPrincipal, HistoryAppender& historyAppender);
0124
0125
0126 void readAndMergeLumi(LuminosityBlockPrincipal& lbp);
0127
0128
0129 void fillProcessBlockHelper();
0130
0131
0132 bool nextProcessBlock(ProcessBlockPrincipal&);
0133
0134
0135 void readProcessBlock(ProcessBlockPrincipal&);
0136
0137
0138 std::shared_ptr<FileBlock> readFile();
0139
0140
0141 void closeFile(FileBlock*, bool cleaningUpAfterException);
0142
0143
0144
0145 void skipEvents(int offset);
0146
0147 bool goToEvent(EventID const& eventID);
0148
0149
0150 void rewind();
0151
0152
0153 void setRunNumber(RunNumber_t r) { setRun(r); }
0154
0155
0156 void setLuminosityBlockNumber_t(LuminosityBlockNumber_t lb) { setLumi(lb); }
0157
0158
0159 void issueReports(EventID const& eventID, StreamID streamID);
0160
0161
0162 virtual void registerProducts();
0163
0164
0165 ProductRegistry const& productRegistry() const { return productRegistry_; }
0166
0167
0168 ProcessHistoryRegistry const& processHistoryRegistry() const { return *processHistoryRegistry_; }
0169 ProcessHistoryRegistry& processHistoryRegistry() { return *processHistoryRegistry_; }
0170
0171
0172 std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0173 return get_underlying_safe(branchIDListHelper_);
0174 }
0175 std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0176
0177
0178 std::shared_ptr<ProcessBlockHelper const> processBlockHelper() const {
0179 return get_underlying_safe(processBlockHelper_);
0180 }
0181 std::shared_ptr<ProcessBlockHelper>& processBlockHelper() { return get_underlying_safe(processBlockHelper_); }
0182
0183
0184 std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0185 return get_underlying_safe(thinnedAssociationsHelper_);
0186 }
0187 std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0188 return get_underlying_safe(thinnedAssociationsHelper_);
0189 }
0190
0191
0192 void repeat() {
0193 remainingEvents_ = maxEvents_;
0194 remainingLumis_ = maxLumis_;
0195 }
0196
0197
0198 std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader();
0199
0200
0201
0202 int maxEvents() const { return maxEvents_; }
0203
0204
0205
0206 int remainingEvents() const { return remainingEvents_; }
0207
0208
0209
0210 int maxLuminosityBlocks() const { return maxLumis_; }
0211
0212
0213
0214 int remainingLuminosityBlocks() const { return remainingLumis_; }
0215
0216
0217 ModuleDescription const& moduleDescription() const { return moduleDescription_; }
0218
0219
0220 ProcessConfiguration const& processConfiguration() const { return moduleDescription().processConfiguration(); }
0221
0222
0223 std::string const& processGUID() const { return processGUID_; }
0224
0225
0226 void doBeginJob(edm::ProductRegistry const&);
0227
0228
0229 void doEndJob();
0230
0231
0232 virtual void doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*);
0233
0234
0235 virtual void doBeginRun(RunPrincipal& rp, ProcessContext const*);
0236
0237
0238 Timestamp const& timestamp() const { return time_; }
0239
0240
0241
0242
0243 ProcessHistoryID const& reducedProcessHistoryID() const;
0244
0245
0246 RunNumber_t run() const;
0247
0248
0249 LuminosityBlockNumber_t luminosityBlock() const;
0250
0251
0252 ProcessingMode processingMode() const { return processingMode_; }
0253
0254
0255 std::shared_ptr<ActivityRegistry> actReg() const { return actReg_; }
0256
0257
0258 std::shared_ptr<RunAuxiliary> runAuxiliary() const { return runAuxiliary_; }
0259
0260
0261 std::shared_ptr<LuminosityBlockAuxiliary> luminosityBlockAuxiliary() const { return lumiAuxiliary_; }
0262
0263 bool randomAccess() const;
0264 ProcessingController::ForwardState forwardState() const;
0265 ProcessingController::ReverseState reverseState() const;
0266
0267 class EventSourceSentry {
0268 public:
0269 EventSourceSentry(InputSource const& source, StreamContext& sc);
0270 ~EventSourceSentry();
0271
0272 EventSourceSentry(EventSourceSentry const&) = delete;
0273 EventSourceSentry& operator=(EventSourceSentry const&) = delete;
0274
0275 private:
0276 InputSource const& source_;
0277 StreamContext& sc_;
0278 };
0279
0280 class LumiSourceSentry {
0281 public:
0282 LumiSourceSentry(InputSource const& source, LuminosityBlockIndex id);
0283 ~LumiSourceSentry();
0284
0285 LumiSourceSentry(LumiSourceSentry const&) = delete;
0286 LumiSourceSentry& operator=(LumiSourceSentry const&) = delete;
0287
0288 private:
0289 InputSource const& source_;
0290 LuminosityBlockIndex index_;
0291 };
0292
0293 class RunSourceSentry {
0294 public:
0295 RunSourceSentry(InputSource const& source, RunIndex id);
0296 ~RunSourceSentry();
0297
0298 RunSourceSentry(RunSourceSentry const&) = delete;
0299 RunSourceSentry& operator=(RunSourceSentry const&) = delete;
0300
0301 private:
0302 InputSource const& source_;
0303 RunIndex index_;
0304 };
0305
0306 class ProcessBlockSourceSentry {
0307 public:
0308 ProcessBlockSourceSentry(InputSource const&, std::string const&);
0309 ~ProcessBlockSourceSentry();
0310
0311 ProcessBlockSourceSentry(ProcessBlockSourceSentry const&) = delete;
0312 ProcessBlockSourceSentry& operator=(ProcessBlockSourceSentry const&) = delete;
0313
0314 private:
0315 InputSource const& source_;
0316 std::string const& processName_;
0317 };
0318
0319 class FileOpenSentry {
0320 public:
0321 typedef signalslot::Signal<void(std::string const&)> Sig;
0322 explicit FileOpenSentry(InputSource const& source, std::string const& lfn);
0323 ~FileOpenSentry();
0324
0325 FileOpenSentry(FileOpenSentry const&) = delete;
0326 FileOpenSentry& operator=(FileOpenSentry const&) = delete;
0327
0328 private:
0329 Sig& post_;
0330 std::string const& lfn_;
0331 };
0332
0333 class FileCloseSentry {
0334 public:
0335 typedef signalslot::Signal<void(std::string const&)> Sig;
0336 explicit FileCloseSentry(InputSource const& source, std::string const& lfn);
0337 ~FileCloseSentry();
0338
0339 FileCloseSentry(FileCloseSentry const&) = delete;
0340 FileCloseSentry& operator=(FileCloseSentry const&) = delete;
0341
0342 private:
0343 Sig& post_;
0344 std::string const& lfn_;
0345 };
0346
0347 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> preEventReadFromSourceSignal_;
0348 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> postEventReadFromSourceSignal_;
0349
0350 protected:
0351 virtual void skip(int offset);
0352
0353
0354 void setTimestamp(Timestamp const& theTime) { time_ = theTime; }
0355
0356 ProductRegistry& productRegistryUpdate() { return productRegistry_; }
0357 ProcessHistoryRegistry& processHistoryRegistryForUpdate() { return *processHistoryRegistry_; }
0358 ItemTypeInfo state() const { return state_; }
0359 void setRunAuxiliary(RunAuxiliary* rp) {
0360 runAuxiliary_.reset(rp);
0361 newRun_ = newLumi_ = true;
0362 }
0363 void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary* lbp) {
0364 lumiAuxiliary_.reset(lbp);
0365 newLumi_ = true;
0366 }
0367 void resetRunAuxiliary(bool isNewRun = true) const {
0368 runAuxiliary_.reset();
0369 newRun_ = newLumi_ = isNewRun;
0370 }
0371 void resetLuminosityBlockAuxiliary(bool isNewLumi = true) const {
0372 lumiAuxiliary_.reset();
0373 newLumi_ = isNewLumi;
0374 }
0375 void reset() const {
0376 resetLuminosityBlockAuxiliary();
0377 resetRunAuxiliary();
0378 state_ = ItemTypeInfo();
0379 }
0380 bool newRun() const { return newRun_; }
0381 void setNewRun() { newRun_ = true; }
0382 void resetNewRun() { newRun_ = false; }
0383 bool newLumi() const { return newLumi_; }
0384 void setNewLumi() { newLumi_ = true; }
0385 void resetNewLumi() { newLumi_ = false; }
0386 bool eventCached() const { return eventCached_; }
0387
0388 void setEventCached() { eventCached_ = true; }
0389 void resetEventCached() { eventCached_ = false; }
0390
0391
0392
0393 void decreaseRemainingEventsBy(int iSkipped);
0394
0395
0396 virtual void beginJob(edm::ProductRegistry const&);
0397
0398 private:
0399 bool eventLimitReached() const { return remainingEvents_ == 0; }
0400 bool lumiLimitReached() const {
0401 if (remainingLumis_ == 0) {
0402 return true;
0403 }
0404 if (maxSecondsUntilRampdown_ <= 0) {
0405 return false;
0406 }
0407 auto end = std::chrono::steady_clock::now();
0408 auto elapsed = end - processingStart_;
0409 if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > maxSecondsUntilRampdown_) {
0410 return true;
0411 }
0412 return false;
0413 }
0414 bool limitReached() const { return eventLimitReached() || lumiLimitReached(); }
0415 virtual ItemTypeInfo getNextItemType() = 0;
0416 ItemTypeInfo nextItemType_();
0417 virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_() = 0;
0418 virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() = 0;
0419 virtual void fillProcessBlockHelper_();
0420 virtual bool nextProcessBlock_(ProcessBlockPrincipal&);
0421 virtual void readProcessBlock_(ProcessBlockPrincipal&);
0422 virtual void readRun_(RunPrincipal& runPrincipal);
0423 virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal);
0424 virtual void readEvent_(EventPrincipal& eventPrincipal) = 0;
0425 virtual bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext);
0426 virtual std::shared_ptr<FileBlock> readFile_();
0427 virtual void closeFile_() {}
0428 virtual bool goToEvent_(EventID const& eventID);
0429 virtual void setRun(RunNumber_t r);
0430 virtual void setLumi(LuminosityBlockNumber_t lb);
0431 virtual void rewind_();
0432 virtual void endJob();
0433 virtual std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_();
0434
0435 virtual bool randomAccess_() const;
0436 virtual ProcessingController::ForwardState forwardState_() const;
0437 virtual ProcessingController::ReverseState reverseState_() const;
0438
0439 private:
0440 std::shared_ptr<ActivityRegistry> actReg_;
0441 int maxEvents_;
0442 int remainingEvents_;
0443 int maxLumis_;
0444 int remainingLumis_;
0445 int readCount_;
0446 int maxSecondsUntilRampdown_;
0447 std::chrono::time_point<std::chrono::steady_clock> processingStart_;
0448 ProcessingMode processingMode_;
0449 ModuleDescription const moduleDescription_;
0450 ProductRegistry productRegistry_;
0451 edm::propagate_const<std::unique_ptr<ProcessHistoryRegistry>> processHistoryRegistry_;
0452 edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0453 edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0454 edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0455 std::string processGUID_;
0456 Timestamp time_;
0457 mutable bool newRun_;
0458 mutable bool newLumi_;
0459 bool eventCached_;
0460 mutable ItemTypeInfo state_;
0461 mutable std::shared_ptr<RunAuxiliary> runAuxiliary_;
0462 mutable std::shared_ptr<LuminosityBlockAuxiliary> lumiAuxiliary_;
0463 std::string statusFileName_;
0464
0465 unsigned int numberOfEventsBeforeBigSkip_;
0466 };
0467 }
0468
0469 #endif