File indexing completed on 2024-04-06 12:12:03
0001 #ifndef FWCore_Framework_InputSource_h
0002 #define FWCore_Framework_InputSource_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0018 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0020 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0021 #include "DataFormats/Provenance/interface/RunID.h"
0022 #include "DataFormats/Provenance/interface/Timestamp.h"
0023 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0024 #include "FWCore/Framework/interface/Frameworkfwd.h"
0025 #include "FWCore/Framework/interface/ProcessingController.h"
0026 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0027 #include "FWCore/Utilities/interface/RunIndex.h"
0028 #include "FWCore/Utilities/interface/Signal.h"
0029 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0030 #include "FWCore/Utilities/interface/StreamID.h"
0031
0032 #include <memory>
0033 #include <string>
0034 #include <chrono>
0035 #include <mutex>
0036
0037 namespace edm {
0038 class ActivityRegistry;
0039 class BranchIDListHelper;
0040 class ConfigurationDescriptions;
0041 class HistoryAppender;
0042 class ParameterSet;
0043 class ParameterSetDescription;
0044 class ProcessContext;
0045 class ProcessHistoryRegistry;
0046 class ProductRegistry;
0047 class StreamContext;
0048 class ModuleCallingContext;
0049 class SharedResourcesAcquirer;
0050 class ThinnedAssociationsHelper;
0051
0052 class InputSource {
0053 public:
0054 enum class ItemType : char { IsInvalid, IsStop, IsFile, IsRun, IsLumi, IsEvent, IsRepeat, IsSynchronize };
0055 enum class ItemPosition : char { Invalid, LastItemToBeMerged, NotLastItemToBeMerged };
0056
0057 class ItemTypeInfo {
0058 public:
0059 constexpr ItemTypeInfo(ItemType type = ItemType::IsInvalid, ItemPosition position = ItemPosition::Invalid)
0060 : type_(type), position_(position) {}
0061 ItemType itemType() const { return type_; }
0062 ItemPosition itemPosition() const { return position_; }
0063
0064
0065
0066
0067
0068
0069 operator ItemType() const { return type_; }
0070
0071 private:
0072 ItemType type_;
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082 ItemPosition position_;
0083 };
0084
0085 enum ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents };
0086
0087
0088 explicit InputSource(ParameterSet const&, InputSourceDescription const&);
0089
0090
0091 virtual ~InputSource() noexcept(false);
0092
0093 InputSource(InputSource const&) = delete;
0094 InputSource& operator=(InputSource const&) = delete;
0095
0096 static void fillDescriptions(ConfigurationDescriptions& descriptions);
0097 static const std::string& baseType();
0098 static void fillDescription(ParameterSetDescription& desc);
0099 static void prevalidate(ConfigurationDescriptions&);
0100
0101
0102 ItemTypeInfo nextItemType();
0103
0104
0105 void readEvent(EventPrincipal& ep, StreamContext&);
0106
0107
0108 bool readEvent(EventPrincipal& ep, EventID const&, StreamContext&);
0109
0110
0111 std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary();
0112
0113
0114 std::shared_ptr<RunAuxiliary> readRunAuxiliary();
0115
0116
0117 void readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender);
0118
0119
0120 void readAndMergeRun(RunPrincipal& rp);
0121
0122
0123 void readLuminosityBlock(LuminosityBlockPrincipal& lumiPrincipal, HistoryAppender& historyAppender);
0124
0125
0126 void readAndMergeLumi(LuminosityBlockPrincipal& lbp);
0127
0128
0129 void fillProcessBlockHelper();
0130
0131
0132 bool nextProcessBlock(ProcessBlockPrincipal&);
0133
0134
0135 void readProcessBlock(ProcessBlockPrincipal&);
0136
0137
0138 std::shared_ptr<FileBlock> readFile();
0139
0140
0141 void closeFile(FileBlock*, bool cleaningUpAfterException);
0142
0143
0144
0145 void skipEvents(int offset);
0146
0147 bool goToEvent(EventID const& eventID);
0148
0149
0150 void rewind();
0151
0152
0153 void setRunNumber(RunNumber_t r) { setRun(r); }
0154
0155
0156 void setLuminosityBlockNumber_t(LuminosityBlockNumber_t lb) { setLumi(lb); }
0157
0158
0159 void issueReports(EventID const& eventID, StreamID streamID);
0160
0161
0162 virtual void registerProducts();
0163
0164
0165 std::shared_ptr<ProductRegistry const> productRegistry() const { return get_underlying_safe(productRegistry_); }
0166
0167
0168 ProcessHistoryRegistry const& processHistoryRegistry() const { return *processHistoryRegistry_; }
0169 ProcessHistoryRegistry& processHistoryRegistry() { return *processHistoryRegistry_; }
0170
0171
0172 std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0173 return get_underlying_safe(branchIDListHelper_);
0174 }
0175 std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0176
0177
0178 std::shared_ptr<ProcessBlockHelper const> processBlockHelper() const {
0179 return get_underlying_safe(processBlockHelper_);
0180 }
0181 std::shared_ptr<ProcessBlockHelper>& processBlockHelper() { return get_underlying_safe(processBlockHelper_); }
0182
0183
0184 std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0185 return get_underlying_safe(thinnedAssociationsHelper_);
0186 }
0187 std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0188 return get_underlying_safe(thinnedAssociationsHelper_);
0189 }
0190
0191
0192 void repeat() {
0193 remainingEvents_ = maxEvents_;
0194 remainingLumis_ = maxLumis_;
0195 }
0196
0197
0198 std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader();
0199
0200
0201 void switchTo(std::shared_ptr<ProductRegistry> iOther) { productRegistry_ = iOther; }
0202
0203
0204
0205 int maxEvents() const { return maxEvents_; }
0206
0207
0208
0209 int remainingEvents() const { return remainingEvents_; }
0210
0211
0212
0213 int maxLuminosityBlocks() const { return maxLumis_; }
0214
0215
0216
0217 int remainingLuminosityBlocks() const { return remainingLumis_; }
0218
0219
0220 ModuleDescription const& moduleDescription() const { return moduleDescription_; }
0221
0222
0223 ProcessConfiguration const& processConfiguration() const { return moduleDescription().processConfiguration(); }
0224
0225
0226 std::string const& processGUID() const { return processGUID_; }
0227
0228
0229 void doBeginJob();
0230
0231
0232 void doEndJob();
0233
0234
0235 virtual void doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*);
0236
0237
0238 virtual void doBeginRun(RunPrincipal& rp, ProcessContext const*);
0239
0240
0241 Timestamp const& timestamp() const { return time_; }
0242
0243
0244
0245
0246 ProcessHistoryID const& reducedProcessHistoryID() const;
0247
0248
0249 RunNumber_t run() const;
0250
0251
0252 LuminosityBlockNumber_t luminosityBlock() const;
0253
0254
0255 ProcessingMode processingMode() const { return processingMode_; }
0256
0257
0258 std::shared_ptr<ActivityRegistry> actReg() const { return actReg_; }
0259
0260
0261 std::shared_ptr<RunAuxiliary> runAuxiliary() const { return runAuxiliary_; }
0262
0263
0264 std::shared_ptr<LuminosityBlockAuxiliary> luminosityBlockAuxiliary() const { return lumiAuxiliary_; }
0265
0266 bool randomAccess() const;
0267 ProcessingController::ForwardState forwardState() const;
0268 ProcessingController::ReverseState reverseState() const;
0269
0270 class EventSourceSentry {
0271 public:
0272 EventSourceSentry(InputSource const& source, StreamContext& sc);
0273 ~EventSourceSentry();
0274
0275 EventSourceSentry(EventSourceSentry const&) = delete;
0276 EventSourceSentry& operator=(EventSourceSentry const&) = delete;
0277
0278 private:
0279 InputSource const& source_;
0280 StreamContext& sc_;
0281 };
0282
0283 class LumiSourceSentry {
0284 public:
0285 LumiSourceSentry(InputSource const& source, LuminosityBlockIndex id);
0286 ~LumiSourceSentry();
0287
0288 LumiSourceSentry(LumiSourceSentry const&) = delete;
0289 LumiSourceSentry& operator=(LumiSourceSentry const&) = delete;
0290
0291 private:
0292 InputSource const& source_;
0293 LuminosityBlockIndex index_;
0294 };
0295
0296 class RunSourceSentry {
0297 public:
0298 RunSourceSentry(InputSource const& source, RunIndex id);
0299 ~RunSourceSentry();
0300
0301 RunSourceSentry(RunSourceSentry const&) = delete;
0302 RunSourceSentry& operator=(RunSourceSentry const&) = delete;
0303
0304 private:
0305 InputSource const& source_;
0306 RunIndex index_;
0307 };
0308
0309 class ProcessBlockSourceSentry {
0310 public:
0311 ProcessBlockSourceSentry(InputSource const&, std::string const&);
0312 ~ProcessBlockSourceSentry();
0313
0314 ProcessBlockSourceSentry(ProcessBlockSourceSentry const&) = delete;
0315 ProcessBlockSourceSentry& operator=(ProcessBlockSourceSentry const&) = delete;
0316
0317 private:
0318 InputSource const& source_;
0319 std::string const& processName_;
0320 };
0321
0322 class FileOpenSentry {
0323 public:
0324 typedef signalslot::Signal<void(std::string const&)> Sig;
0325 explicit FileOpenSentry(InputSource const& source, std::string const& lfn);
0326 ~FileOpenSentry();
0327
0328 FileOpenSentry(FileOpenSentry const&) = delete;
0329 FileOpenSentry& operator=(FileOpenSentry const&) = delete;
0330
0331 private:
0332 Sig& post_;
0333 std::string const& lfn_;
0334 };
0335
0336 class FileCloseSentry {
0337 public:
0338 typedef signalslot::Signal<void(std::string const&)> Sig;
0339 explicit FileCloseSentry(InputSource const& source, std::string const& lfn);
0340 ~FileCloseSentry();
0341
0342 FileCloseSentry(FileCloseSentry const&) = delete;
0343 FileCloseSentry& operator=(FileCloseSentry const&) = delete;
0344
0345 private:
0346 Sig& post_;
0347 std::string const& lfn_;
0348 };
0349
0350 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> preEventReadFromSourceSignal_;
0351 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> postEventReadFromSourceSignal_;
0352
0353 protected:
0354 virtual void skip(int offset);
0355
0356
0357 void setTimestamp(Timestamp const& theTime) { time_ = theTime; }
0358
0359 ProductRegistry& productRegistryUpdate() { return *productRegistry_; }
0360 ProcessHistoryRegistry& processHistoryRegistryForUpdate() { return *processHistoryRegistry_; }
0361 ItemTypeInfo state() const { return state_; }
0362 void setRunAuxiliary(RunAuxiliary* rp) {
0363 runAuxiliary_.reset(rp);
0364 newRun_ = newLumi_ = true;
0365 }
0366 void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary* lbp) {
0367 lumiAuxiliary_.reset(lbp);
0368 newLumi_ = true;
0369 }
0370 void resetRunAuxiliary(bool isNewRun = true) const {
0371 runAuxiliary_.reset();
0372 newRun_ = newLumi_ = isNewRun;
0373 }
0374 void resetLuminosityBlockAuxiliary(bool isNewLumi = true) const {
0375 lumiAuxiliary_.reset();
0376 newLumi_ = isNewLumi;
0377 }
0378 void reset() const {
0379 resetLuminosityBlockAuxiliary();
0380 resetRunAuxiliary();
0381 state_ = ItemTypeInfo();
0382 }
0383 bool newRun() const { return newRun_; }
0384 void setNewRun() { newRun_ = true; }
0385 void resetNewRun() { newRun_ = false; }
0386 bool newLumi() const { return newLumi_; }
0387 void setNewLumi() { newLumi_ = true; }
0388 void resetNewLumi() { newLumi_ = false; }
0389 bool eventCached() const { return eventCached_; }
0390
0391 void setEventCached() { eventCached_ = true; }
0392 void resetEventCached() { eventCached_ = false; }
0393
0394
0395
0396 void decreaseRemainingEventsBy(int iSkipped);
0397
0398
0399 virtual void beginJob();
0400
0401 private:
0402 bool eventLimitReached() const { return remainingEvents_ == 0; }
0403 bool lumiLimitReached() const {
0404 if (remainingLumis_ == 0) {
0405 return true;
0406 }
0407 if (maxSecondsUntilRampdown_ <= 0) {
0408 return false;
0409 }
0410 auto end = std::chrono::steady_clock::now();
0411 auto elapsed = end - processingStart_;
0412 if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > maxSecondsUntilRampdown_) {
0413 return true;
0414 }
0415 return false;
0416 }
0417 bool limitReached() const { return eventLimitReached() || lumiLimitReached(); }
0418 virtual ItemTypeInfo getNextItemType() = 0;
0419 ItemTypeInfo nextItemType_();
0420 virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_() = 0;
0421 virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() = 0;
0422 virtual void fillProcessBlockHelper_();
0423 virtual bool nextProcessBlock_(ProcessBlockPrincipal&);
0424 virtual void readProcessBlock_(ProcessBlockPrincipal&);
0425 virtual void readRun_(RunPrincipal& runPrincipal);
0426 virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal);
0427 virtual void readEvent_(EventPrincipal& eventPrincipal) = 0;
0428 virtual bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext);
0429 virtual std::shared_ptr<FileBlock> readFile_();
0430 virtual void closeFile_() {}
0431 virtual bool goToEvent_(EventID const& eventID);
0432 virtual void setRun(RunNumber_t r);
0433 virtual void setLumi(LuminosityBlockNumber_t lb);
0434 virtual void rewind_();
0435 virtual void endJob();
0436 virtual std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_();
0437
0438 virtual bool randomAccess_() const;
0439 virtual ProcessingController::ForwardState forwardState_() const;
0440 virtual ProcessingController::ReverseState reverseState_() const;
0441
0442 private:
0443 std::shared_ptr<ActivityRegistry> actReg_;
0444 int maxEvents_;
0445 int remainingEvents_;
0446 int maxLumis_;
0447 int remainingLumis_;
0448 int readCount_;
0449 int maxSecondsUntilRampdown_;
0450 std::chrono::time_point<std::chrono::steady_clock> processingStart_;
0451 ProcessingMode processingMode_;
0452 ModuleDescription const moduleDescription_;
0453 edm::propagate_const<std::shared_ptr<ProductRegistry>> productRegistry_;
0454 edm::propagate_const<std::unique_ptr<ProcessHistoryRegistry>> processHistoryRegistry_;
0455 edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0456 edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0457 edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0458 std::string processGUID_;
0459 Timestamp time_;
0460 mutable bool newRun_;
0461 mutable bool newLumi_;
0462 bool eventCached_;
0463 mutable ItemTypeInfo state_;
0464 mutable std::shared_ptr<RunAuxiliary> runAuxiliary_;
0465 mutable std::shared_ptr<LuminosityBlockAuxiliary> lumiAuxiliary_;
0466 std::string statusFileName_;
0467
0468 unsigned int numberOfEventsBeforeBigSkip_;
0469 };
0470 }
0471
0472 #endif