File indexing completed on 2023-03-17 11:02:16
0001
0002
0003 #include "FWCore/Framework/interface/InputSource.h"
0004
0005 #include "DataFormats/Provenance/interface/ProcessHistory.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0007 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0008 #include "FWCore/Framework/interface/EventPrincipal.h"
0009 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0010 #include "FWCore/Framework/interface/FileBlock.h"
0011 #include "FWCore/Framework/interface/InputSourceDescription.h"
0012 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0013 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0014 #include "FWCore/Framework/interface/RunPrincipal.h"
0015 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0018 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0019 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0020 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0021 #include "FWCore/Utilities/interface/processGUID.h"
0022 #include "FWCore/Utilities/interface/TimeOfDay.h"
0023
0024 #include <cassert>
0025 #include <fstream>
0026 #include <iomanip>
0027
0028 namespace edm {
0029
0030 namespace {
0031 std::string const& suffix(int count) {
0032 static std::string const st("st");
0033 static std::string const nd("nd");
0034 static std::string const rd("rd");
0035 static std::string const th("th");
0036
0037 int lastDigit = count % 10;
0038 if (lastDigit >= 4 || lastDigit == 0)
0039 return th;
0040
0041 if (count % 100 - lastDigit == 10)
0042 return th;
0043 return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
0044 }
0045 }
0046
0047 InputSource::InputSource(ParameterSet const& pset, InputSourceDescription const& desc)
0048 : actReg_(desc.actReg_),
0049 maxEvents_(desc.maxEvents_),
0050 remainingEvents_(maxEvents_),
0051 maxLumis_(desc.maxLumis_),
0052 remainingLumis_(maxLumis_),
0053 readCount_(0),
0054 maxSecondsUntilRampdown_(desc.maxSecondsUntilRampdown_),
0055 processingMode_(RunsLumisAndEvents),
0056 moduleDescription_(desc.moduleDescription_),
0057 productRegistry_(desc.productRegistry_),
0058 processHistoryRegistry_(new ProcessHistoryRegistry),
0059 branchIDListHelper_(desc.branchIDListHelper_),
0060 processBlockHelper_(desc.processBlockHelper_),
0061 thinnedAssociationsHelper_(desc.thinnedAssociationsHelper_),
0062 processGUID_(edm::processGUID().toBinary()),
0063 time_(),
0064 newRun_(true),
0065 newLumi_(true),
0066 eventCached_(false),
0067 state_(IsInvalid),
0068 runAuxiliary_(),
0069 lumiAuxiliary_(),
0070 statusFileName_(),
0071 numberOfEventsBeforeBigSkip_(0) {
0072 if (pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
0073 std::ostringstream statusfilename;
0074 statusfilename << "source_" << getpid();
0075 statusFileName_ = statusfilename.str();
0076 }
0077 if (maxSecondsUntilRampdown_ > 0) {
0078 processingStart_ = std::chrono::steady_clock::now();
0079 }
0080
0081 std::string const defaultMode("RunsLumisAndEvents");
0082 std::string const runMode("Runs");
0083 std::string const runLumiMode("RunsAndLumis");
0084
0085
0086
0087
0088
0089
0090 std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
0091 if (processingMode == runMode) {
0092 processingMode_ = Runs;
0093 } else if (processingMode == runLumiMode) {
0094 processingMode_ = RunsAndLumis;
0095 } else if (processingMode != defaultMode) {
0096 throw Exception(errors::Configuration)
0097 << "InputSource::InputSource()\n"
0098 << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
0099 << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
0100 }
0101 }
0102
0103 InputSource::~InputSource() noexcept(false) {}
0104
0105 void InputSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0106 ParameterSetDescription desc;
0107 desc.setUnknown();
0108 descriptions.addDefault(desc);
0109 }
0110
0111 void InputSource::prevalidate(ConfigurationDescriptions&) {}
0112
0113 static std::string const kBaseType("Source");
0114
0115 std::string const& InputSource::baseType() { return kBaseType; }
0116
0117 void InputSource::fillDescription(ParameterSetDescription& desc) {
0118 std::string defaultString("RunsLumisAndEvents");
0119 desc.addUntracked<std::string>("processingMode", defaultString)
0120 ->setComment(
0121 "'RunsLumisAndEvents': process runs, lumis, and events.\n"
0122 "'RunsAndLumis': process runs and lumis (not events).\n"
0123 "'Runs': process runs (not lumis or events).");
0124 desc.addUntracked<bool>("writeStatusFile", false)
0125 ->setComment("Write a status file. Intended for use by workflow management.");
0126 }
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136 InputSource::ItemType InputSource::nextItemType_() {
0137 ItemType itemType = callWithTryCatchAndPrint<ItemType>([this]() { return getNextItemType(); },
0138 "Calling InputSource::getNextItemType");
0139
0140 if (itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
0141 skipEvents(1);
0142 return nextItemType_();
0143 }
0144 if (itemType == IsLumi && processingMode() == Runs) {
0145
0146 return nextItemType_();
0147 }
0148 return itemType;
0149 }
0150
0151 InputSource::ItemType InputSource::nextItemType() {
0152 ItemType oldState = state_;
0153 if (eventLimitReached()) {
0154
0155 state_ = IsStop;
0156 } else if (lumiLimitReached()) {
0157
0158
0159 if (oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
0160 state_ = IsStop;
0161 } else {
0162 ItemType newState = nextItemType_();
0163 if (newState == IsEvent) {
0164 assert(processingMode() == RunsLumisAndEvents);
0165 state_ = IsEvent;
0166 } else {
0167 state_ = IsStop;
0168 }
0169 }
0170 } else {
0171 ItemType newState = nextItemType_();
0172 if (newState == IsStop) {
0173 state_ = IsStop;
0174 } else if (newState == IsSynchronize) {
0175 state_ = IsSynchronize;
0176 } else if (newState == IsFile || oldState == IsInvalid) {
0177 state_ = IsFile;
0178 } else if (newState == IsRun || oldState == IsFile) {
0179 runAuxiliary_ = readRunAuxiliary();
0180 state_ = IsRun;
0181 } else if (newState == IsLumi || oldState == IsRun) {
0182 assert(processingMode() != Runs);
0183 lumiAuxiliary_ = readLuminosityBlockAuxiliary();
0184 state_ = IsLumi;
0185 } else {
0186 assert(processingMode() == RunsLumisAndEvents);
0187 state_ = IsEvent;
0188 }
0189 }
0190 if (state_ == IsStop) {
0191 lumiAuxiliary_.reset();
0192 runAuxiliary_.reset();
0193 }
0194 return state_;
0195 }
0196
0197 std::shared_ptr<LuminosityBlockAuxiliary> InputSource::readLuminosityBlockAuxiliary() {
0198 return callWithTryCatchAndPrint<std::shared_ptr<LuminosityBlockAuxiliary> >(
0199 [this]() { return readLuminosityBlockAuxiliary_(); }, "Calling InputSource::readLuminosityBlockAuxiliary_");
0200 }
0201
0202 std::shared_ptr<RunAuxiliary> InputSource::readRunAuxiliary() {
0203 return callWithTryCatchAndPrint<std::shared_ptr<RunAuxiliary> >([this]() { return readRunAuxiliary_(); },
0204 "Calling InputSource::readRunAuxiliary_");
0205 }
0206
0207 void InputSource::doBeginJob() { this->beginJob(); }
0208
0209 void InputSource::doEndJob() { endJob(); }
0210
0211 std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> InputSource::resourceSharedWithDelayedReader() {
0212 return resourceSharedWithDelayedReader_();
0213 }
0214
0215 std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> InputSource::resourceSharedWithDelayedReader_() {
0216 return std::pair<SharedResourcesAcquirer*, std::recursive_mutex*>(nullptr, nullptr);
0217 }
0218
0219 void InputSource::registerProducts() {}
0220
0221
0222 std::shared_ptr<FileBlock> InputSource::readFile() {
0223 assert(state_ == IsFile);
0224 assert(!limitReached());
0225 return callWithTryCatchAndPrint<std::shared_ptr<FileBlock> >([this]() { return readFile_(); },
0226 "Calling InputSource::readFile_");
0227 }
0228
0229 void InputSource::closeFile(FileBlock* fb, bool cleaningUpAfterException) {
0230 if (fb != nullptr)
0231 fb->close();
0232 callWithTryCatchAndPrint<void>(
0233 [this]() { closeFile_(); }, "Calling InputSource::closeFile_", cleaningUpAfterException);
0234 return;
0235 }
0236
0237
0238
0239
0240 std::shared_ptr<FileBlock> InputSource::readFile_() { return std::make_shared<FileBlock>(); }
0241
0242 void InputSource::readRun(RunPrincipal& runPrincipal, HistoryAppender&) {
0243 RunSourceSentry sentry(*this, runPrincipal.index());
0244 callWithTryCatchAndPrint<void>([this, &runPrincipal]() { readRun_(runPrincipal); },
0245 "Calling InputSource::readRun_");
0246 }
0247
0248 void InputSource::readAndMergeRun(RunPrincipal& rp) {
0249 RunSourceSentry sentry(*this, rp.index());
0250 callWithTryCatchAndPrint<void>([this, &rp]() { readRun_(rp); }, "Calling InputSource::readRun_");
0251 }
0252
0253 void InputSource::readLuminosityBlock(LuminosityBlockPrincipal& lumiPrincipal, HistoryAppender&) {
0254 LumiSourceSentry sentry(*this, lumiPrincipal.index());
0255 callWithTryCatchAndPrint<void>([this, &lumiPrincipal]() { readLuminosityBlock_(lumiPrincipal); },
0256 "Calling InputSource::readLuminosityBlock_");
0257 if (remainingLumis_ > 0) {
0258 --remainingLumis_;
0259 }
0260 }
0261
0262 void InputSource::readAndMergeLumi(LuminosityBlockPrincipal& lbp) {
0263 LumiSourceSentry sentry(*this, lbp.index());
0264 callWithTryCatchAndPrint<void>([this, &lbp]() { readLuminosityBlock_(lbp); },
0265 "Calling InputSource::readLuminosityBlock_");
0266 if (remainingLumis_ > 0) {
0267 --remainingLumis_;
0268 }
0269 }
0270
0271 void InputSource::fillProcessBlockHelper() { fillProcessBlockHelper_(); }
0272
0273 bool InputSource::nextProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
0274 return nextProcessBlock_(processBlockPrincipal);
0275 }
0276
0277 void InputSource::readProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
0278 ProcessBlockSourceSentry sentry(*this, processBlockPrincipal.processName());
0279 callWithTryCatchAndPrint<void>([this, &processBlockPrincipal]() { readProcessBlock_(processBlockPrincipal); },
0280 "Calling InputSource::readProcessBlock_");
0281 }
0282
0283 void InputSource::fillProcessBlockHelper_() {}
0284
0285 bool InputSource::nextProcessBlock_(ProcessBlockPrincipal&) { return false; }
0286
0287 void InputSource::readProcessBlock_(ProcessBlockPrincipal&) {}
0288
0289 void InputSource::readRun_(RunPrincipal& runPrincipal) {
0290
0291
0292
0293 runPrincipal.fillRunPrincipal(processHistoryRegistry());
0294 }
0295
0296 void InputSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0297 auto history = processHistoryRegistry().getMapped(lumiPrincipal.aux().processHistoryID());
0298 lumiPrincipal.fillLuminosityBlockPrincipal(history);
0299 }
0300
0301 void InputSource::readEvent(EventPrincipal& ep, StreamContext& streamContext) {
0302 assert(state_ == IsEvent);
0303 assert(!eventLimitReached());
0304 {
0305
0306 EventSourceSentry sentry(*this, streamContext);
0307
0308 callWithTryCatchAndPrint<void>([this, &ep]() { readEvent_(ep); }, "Calling InputSource::readEvent_");
0309 }
0310
0311 if (remainingEvents_ > 0)
0312 --remainingEvents_;
0313 ++readCount_;
0314 setTimestamp(ep.time());
0315 issueReports(ep.id(), ep.streamID());
0316 }
0317
0318 bool InputSource::readEvent(EventPrincipal& ep, EventID const& eventID, StreamContext& streamContext) {
0319 bool result = false;
0320
0321 if (not limitReached()) {
0322
0323
0324
0325
0326 result = readIt(eventID, ep, streamContext);
0327
0328 if (result) {
0329 if (remainingEvents_ > 0)
0330 --remainingEvents_;
0331 ++readCount_;
0332 issueReports(ep.id(), ep.streamID());
0333 }
0334 }
0335 return result;
0336 }
0337
0338 void InputSource::skipEvents(int offset) {
0339 callWithTryCatchAndPrint<void>([this, &offset]() { skip(offset); }, "Calling InputSource::skip");
0340 }
0341
0342 bool InputSource::goToEvent(EventID const& eventID) {
0343 return callWithTryCatchAndPrint<bool>([this, &eventID]() { return goToEvent_(eventID); },
0344 "Calling InputSource::goToEvent_");
0345 }
0346
0347 void InputSource::rewind() {
0348 state_ = IsInvalid;
0349 remainingEvents_ = maxEvents_;
0350 setNewRun();
0351 setNewLumi();
0352 resetEventCached();
0353 callWithTryCatchAndPrint<void>([this]() { rewind_(); }, "Calling InputSource::rewind_");
0354 }
0355
0356 void InputSource::issueReports(EventID const& eventID, StreamID streamID) {
0357 if (isFwkInfoEnabled()) {
0358 LogFwkVerbatim("FwkReport") << "Begin processing the " << readCount_ << suffix(readCount_) << " record. Run "
0359 << eventID.run() << ", Event " << eventID.event() << ", LumiSection "
0360 << eventID.luminosityBlock() << " on stream " << streamID.value() << " at "
0361 << std::setprecision(3) << TimeOfDay();
0362 }
0363 if (!statusFileName_.empty()) {
0364 std::ofstream statusFile(statusFileName_.c_str());
0365 statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
0366 statusFile.close();
0367 }
0368
0369
0370 }
0371
0372 bool InputSource::readIt(EventID const&, EventPrincipal&, StreamContext&) {
0373 throw Exception(errors::LogicError) << "InputSource::readIt()\n"
0374 << "Random access is not implemented for this type of Input Source\n"
0375 << "Contact a Framework Developer\n";
0376 }
0377
0378 void InputSource::setRun(RunNumber_t) {
0379 throw Exception(errors::LogicError) << "InputSource::setRun()\n"
0380 << "Run number cannot be modified for this type of Input Source\n"
0381 << "Contact a Framework Developer\n";
0382 }
0383
0384 void InputSource::setLumi(LuminosityBlockNumber_t) {
0385 throw Exception(errors::LogicError) << "InputSource::setLumi()\n"
0386 << "Luminosity Block ID cannot be modified for this type of Input Source\n"
0387 << "Contact a Framework Developer\n";
0388 }
0389
0390 void InputSource::skip(int) {
0391 throw Exception(errors::LogicError) << "InputSource::skip()\n"
0392 << "Random access are not implemented for this type of Input Source\n"
0393 << "Contact a Framework Developer\n";
0394 }
0395
0396 bool InputSource::goToEvent_(EventID const&) {
0397 throw Exception(errors::LogicError) << "InputSource::goToEvent_()\n"
0398 << "Random access is not implemented for this type of Input Source\n"
0399 << "Contact a Framework Developer\n";
0400 return true;
0401 }
0402
0403 void InputSource::rewind_() {
0404 throw Exception(errors::LogicError) << "InputSource::rewind()\n"
0405 << "Random access are not implemented for this type of Input Source\n"
0406 << "Contact a Framework Developer\n";
0407 }
0408
0409 void InputSource::decreaseRemainingEventsBy(int iSkipped) {
0410 if (-1 == remainingEvents_) {
0411 return;
0412 }
0413 if (iSkipped < remainingEvents_) {
0414 remainingEvents_ -= iSkipped;
0415 } else {
0416 remainingEvents_ = 0;
0417 }
0418 }
0419
0420 void InputSource::doBeginRun(RunPrincipal& rp, ProcessContext const*) {}
0421
0422 void InputSource::doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*) {}
0423
0424 bool InputSource::randomAccess() const {
0425 return callWithTryCatchAndPrint<bool>([this]() { return randomAccess_(); }, "Calling InputSource::randomAccess_");
0426 }
0427
0428 ProcessingController::ForwardState InputSource::forwardState() const {
0429 return callWithTryCatchAndPrint<ProcessingController::ForwardState>([this]() { return forwardState_(); },
0430 "Calling InputSource::forwardState_");
0431 }
0432
0433 ProcessingController::ReverseState InputSource::reverseState() const {
0434 return callWithTryCatchAndPrint<ProcessingController::ReverseState>([this]() { return reverseState_(); },
0435 "Calling InputSource::reverseState__");
0436 }
0437
0438 void InputSource::beginJob() {}
0439
0440 void InputSource::endJob() {}
0441
0442 bool InputSource::randomAccess_() const { return false; }
0443
0444 ProcessingController::ForwardState InputSource::forwardState_() const {
0445 return ProcessingController::kUnknownForward;
0446 }
0447
0448 ProcessingController::ReverseState InputSource::reverseState_() const {
0449 return ProcessingController::kUnknownReverse;
0450 }
0451
0452 ProcessHistoryID const& InputSource::reducedProcessHistoryID() const {
0453 assert(runAuxiliary());
0454 return processHistoryRegistry_->reducedProcessHistoryID(runAuxiliary()->processHistoryID());
0455 }
0456
0457 RunNumber_t InputSource::run() const {
0458 assert(runAuxiliary());
0459 return runAuxiliary()->run();
0460 }
0461
0462 LuminosityBlockNumber_t InputSource::luminosityBlock() const {
0463 assert(luminosityBlockAuxiliary());
0464 return luminosityBlockAuxiliary()->luminosityBlock();
0465 }
0466
0467 InputSource::EventSourceSentry::EventSourceSentry(InputSource const& source, StreamContext& sc)
0468 : source_(source), sc_(sc) {
0469 source.actReg()->preSourceSignal_(sc_.streamID());
0470 }
0471
0472 InputSource::EventSourceSentry::~EventSourceSentry() { source_.actReg()->postSourceSignal_(sc_.streamID()); }
0473
0474 InputSource::LumiSourceSentry::LumiSourceSentry(InputSource const& source, LuminosityBlockIndex index)
0475 : source_(source), index_(index) {
0476 source_.actReg()->preSourceLumiSignal_(index_);
0477 }
0478
0479 InputSource::LumiSourceSentry::~LumiSourceSentry() { source_.actReg()->postSourceLumiSignal_(index_); }
0480
0481 InputSource::RunSourceSentry::RunSourceSentry(InputSource const& source, RunIndex index)
0482 : source_(source), index_(index) {
0483 source_.actReg()->preSourceRunSignal_(index_);
0484 }
0485
0486 InputSource::RunSourceSentry::~RunSourceSentry() { source_.actReg()->postSourceRunSignal_(index_); }
0487
0488 InputSource::ProcessBlockSourceSentry::ProcessBlockSourceSentry(InputSource const& source,
0489 std::string const& processName)
0490 : source_(source), processName_(processName) {
0491 source_.actReg()->preSourceProcessBlockSignal_();
0492 }
0493
0494 InputSource::ProcessBlockSourceSentry::~ProcessBlockSourceSentry() {
0495 source_.actReg()->postSourceProcessBlockSignal_(processName_);
0496 }
0497
0498 InputSource::FileOpenSentry::FileOpenSentry(InputSource const& source, std::string const& lfn)
0499 : post_(source.actReg()->postOpenFileSignal_), lfn_(lfn) {
0500 source.actReg()->preOpenFileSignal_(lfn);
0501 }
0502
0503 InputSource::FileOpenSentry::~FileOpenSentry() { post_(lfn_); }
0504
0505 InputSource::FileCloseSentry::FileCloseSentry(InputSource const& source, std::string const& lfn)
0506 : post_(source.actReg()->postCloseFileSignal_), lfn_(lfn) {
0507 source.actReg()->preCloseFileSignal_(lfn);
0508 }
0509
0510 InputSource::FileCloseSentry::~FileCloseSentry() { post_(lfn_); }
0511 }