Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 11:02:16

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 #include "FWCore/Framework/interface/InputSource.h"
0004 
0005 #include "DataFormats/Provenance/interface/ProcessHistory.h"
0006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0007 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0008 #include "FWCore/Framework/interface/EventPrincipal.h"
0009 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0010 #include "FWCore/Framework/interface/FileBlock.h"
0011 #include "FWCore/Framework/interface/InputSourceDescription.h"
0012 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0013 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0014 #include "FWCore/Framework/interface/RunPrincipal.h"
0015 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0018 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0019 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0020 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0021 #include "FWCore/Utilities/interface/processGUID.h"
0022 #include "FWCore/Utilities/interface/TimeOfDay.h"
0023 
0024 #include <cassert>
0025 #include <fstream>
0026 #include <iomanip>
0027 
0028 namespace edm {
0029 
0030   namespace {
0031     std::string const& suffix(int count) {
0032       static std::string const st("st");
0033       static std::string const nd("nd");
0034       static std::string const rd("rd");
0035       static std::string const th("th");
0036       // *0, *4 - *9 use "th".
0037       int lastDigit = count % 10;
0038       if (lastDigit >= 4 || lastDigit == 0)
0039         return th;
0040       // *11, *12, or *13 use "th".
0041       if (count % 100 - lastDigit == 10)
0042         return th;
0043       return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
0044     }
0045   }  // namespace
0046 
0047   InputSource::InputSource(ParameterSet const& pset, InputSourceDescription const& desc)
0048       : actReg_(desc.actReg_),
0049         maxEvents_(desc.maxEvents_),
0050         remainingEvents_(maxEvents_),
0051         maxLumis_(desc.maxLumis_),
0052         remainingLumis_(maxLumis_),
0053         readCount_(0),
0054         maxSecondsUntilRampdown_(desc.maxSecondsUntilRampdown_),
0055         processingMode_(RunsLumisAndEvents),
0056         moduleDescription_(desc.moduleDescription_),
0057         productRegistry_(desc.productRegistry_),
0058         processHistoryRegistry_(new ProcessHistoryRegistry),
0059         branchIDListHelper_(desc.branchIDListHelper_),
0060         processBlockHelper_(desc.processBlockHelper_),
0061         thinnedAssociationsHelper_(desc.thinnedAssociationsHelper_),
0062         processGUID_(edm::processGUID().toBinary()),
0063         time_(),
0064         newRun_(true),
0065         newLumi_(true),
0066         eventCached_(false),
0067         state_(IsInvalid),
0068         runAuxiliary_(),
0069         lumiAuxiliary_(),
0070         statusFileName_(),
0071         numberOfEventsBeforeBigSkip_(0) {
0072     if (pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
0073       std::ostringstream statusfilename;
0074       statusfilename << "source_" << getpid();
0075       statusFileName_ = statusfilename.str();
0076     }
0077     if (maxSecondsUntilRampdown_ > 0) {
0078       processingStart_ = std::chrono::steady_clock::now();
0079     }
0080 
0081     std::string const defaultMode("RunsLumisAndEvents");
0082     std::string const runMode("Runs");
0083     std::string const runLumiMode("RunsAndLumis");
0084 
0085     // The default value provided as the second argument to the getUntrackedParameter function call
0086     // is not used when the ParameterSet has been validated and the parameters are not optional
0087     // in the description.  As soon as all primary input sources and all modules with a secondary
0088     // input sources have defined descriptions, the defaults in the getUntrackedParameterSet function
0089     // calls can and should be deleted from the code.
0090     std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
0091     if (processingMode == runMode) {
0092       processingMode_ = Runs;
0093     } else if (processingMode == runLumiMode) {
0094       processingMode_ = RunsAndLumis;
0095     } else if (processingMode != defaultMode) {
0096       throw Exception(errors::Configuration)
0097           << "InputSource::InputSource()\n"
0098           << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
0099           << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
0100     }
0101   }
0102 
0103   InputSource::~InputSource() noexcept(false) {}
0104 
0105   void InputSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0106     ParameterSetDescription desc;
0107     desc.setUnknown();
0108     descriptions.addDefault(desc);
0109   }
0110 
0111   void InputSource::prevalidate(ConfigurationDescriptions&) {}
0112 
0113   static std::string const kBaseType("Source");
0114 
0115   std::string const& InputSource::baseType() { return kBaseType; }
0116 
0117   void InputSource::fillDescription(ParameterSetDescription& desc) {
0118     std::string defaultString("RunsLumisAndEvents");
0119     desc.addUntracked<std::string>("processingMode", defaultString)
0120         ->setComment(
0121             "'RunsLumisAndEvents': process runs, lumis, and events.\n"
0122             "'RunsAndLumis':       process runs and lumis (not events).\n"
0123             "'Runs':               process runs (not lumis or events).");
0124     desc.addUntracked<bool>("writeStatusFile", false)
0125         ->setComment("Write a status file. Intended for use by workflow management.");
0126   }
0127 
0128   // This next function is to guarantee that "runs only" mode does not return events or lumis,
0129   // and that "runs and lumis only" mode does not return events.
0130   // For input sources that are not random access (e.g. you need to read through the events
0131   // to get to the lumis and runs), this is all that is involved to implement these modes.
0132   // For input sources where events or lumis can be skipped, getNextItemType() should
0133   // implement the skipping internally, so that the performance gain is realized.
0134   // If this is done for a source, the 'if' blocks in this function will never be entered
0135   // for that source.
0136   InputSource::ItemType InputSource::nextItemType_() {
0137     ItemType itemType = callWithTryCatchAndPrint<ItemType>([this]() { return getNextItemType(); },
0138                                                            "Calling InputSource::getNextItemType");
0139 
0140     if (itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
0141       skipEvents(1);
0142       return nextItemType_();
0143     }
0144     if (itemType == IsLumi && processingMode() == Runs) {
0145       // QQQ skipLuminosityBlock_();
0146       return nextItemType_();
0147     }
0148     return itemType;
0149   }
0150 
0151   InputSource::ItemType InputSource::nextItemType() {
0152     ItemType oldState = state_;
0153     if (eventLimitReached()) {
0154       // If the maximum event limit has been reached, stop.
0155       state_ = IsStop;
0156     } else if (lumiLimitReached()) {
0157       // If the maximum lumi limit has been reached, stop
0158       // when reaching a new file, run, or lumi.
0159       if (oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
0160         state_ = IsStop;
0161       } else {
0162         ItemType newState = nextItemType_();
0163         if (newState == IsEvent) {
0164           assert(processingMode() == RunsLumisAndEvents);
0165           state_ = IsEvent;
0166         } else {
0167           state_ = IsStop;
0168         }
0169       }
0170     } else {
0171       ItemType newState = nextItemType_();
0172       if (newState == IsStop) {
0173         state_ = IsStop;
0174       } else if (newState == IsSynchronize) {
0175         state_ = IsSynchronize;
0176       } else if (newState == IsFile || oldState == IsInvalid) {
0177         state_ = IsFile;
0178       } else if (newState == IsRun || oldState == IsFile) {
0179         runAuxiliary_ = readRunAuxiliary();
0180         state_ = IsRun;
0181       } else if (newState == IsLumi || oldState == IsRun) {
0182         assert(processingMode() != Runs);
0183         lumiAuxiliary_ = readLuminosityBlockAuxiliary();
0184         state_ = IsLumi;
0185       } else {
0186         assert(processingMode() == RunsLumisAndEvents);
0187         state_ = IsEvent;
0188       }
0189     }
0190     if (state_ == IsStop) {
0191       lumiAuxiliary_.reset();
0192       runAuxiliary_.reset();
0193     }
0194     return state_;
0195   }
0196 
0197   std::shared_ptr<LuminosityBlockAuxiliary> InputSource::readLuminosityBlockAuxiliary() {
0198     return callWithTryCatchAndPrint<std::shared_ptr<LuminosityBlockAuxiliary> >(
0199         [this]() { return readLuminosityBlockAuxiliary_(); }, "Calling InputSource::readLuminosityBlockAuxiliary_");
0200   }
0201 
0202   std::shared_ptr<RunAuxiliary> InputSource::readRunAuxiliary() {
0203     return callWithTryCatchAndPrint<std::shared_ptr<RunAuxiliary> >([this]() { return readRunAuxiliary_(); },
0204                                                                     "Calling InputSource::readRunAuxiliary_");
0205   }
0206 
0207   void InputSource::doBeginJob() { this->beginJob(); }
0208 
0209   void InputSource::doEndJob() { endJob(); }
0210 
0211   std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> InputSource::resourceSharedWithDelayedReader() {
0212     return resourceSharedWithDelayedReader_();
0213   }
0214 
0215   std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> InputSource::resourceSharedWithDelayedReader_() {
0216     return std::pair<SharedResourcesAcquirer*, std::recursive_mutex*>(nullptr, nullptr);
0217   }
0218 
0219   void InputSource::registerProducts() {}
0220 
0221   // Return a dummy file block.
0222   std::shared_ptr<FileBlock> InputSource::readFile() {
0223     assert(state_ == IsFile);
0224     assert(!limitReached());
0225     return callWithTryCatchAndPrint<std::shared_ptr<FileBlock> >([this]() { return readFile_(); },
0226                                                                  "Calling InputSource::readFile_");
0227   }
0228 
0229   void InputSource::closeFile(FileBlock* fb, bool cleaningUpAfterException) {
0230     if (fb != nullptr)
0231       fb->close();
0232     callWithTryCatchAndPrint<void>(
0233         [this]() { closeFile_(); }, "Calling InputSource::closeFile_", cleaningUpAfterException);
0234     return;
0235   }
0236 
0237   // Return a dummy file block.
0238   // This function must be overridden for any input source that reads a file
0239   // containing Products.
0240   std::shared_ptr<FileBlock> InputSource::readFile_() { return std::make_shared<FileBlock>(); }
0241 
0242   void InputSource::readRun(RunPrincipal& runPrincipal, HistoryAppender&) {
0243     RunSourceSentry sentry(*this, runPrincipal.index());
0244     callWithTryCatchAndPrint<void>([this, &runPrincipal]() { readRun_(runPrincipal); },
0245                                    "Calling InputSource::readRun_");
0246   }
0247 
0248   void InputSource::readAndMergeRun(RunPrincipal& rp) {
0249     RunSourceSentry sentry(*this, rp.index());
0250     callWithTryCatchAndPrint<void>([this, &rp]() { readRun_(rp); }, "Calling InputSource::readRun_");
0251   }
0252 
0253   void InputSource::readLuminosityBlock(LuminosityBlockPrincipal& lumiPrincipal, HistoryAppender&) {
0254     LumiSourceSentry sentry(*this, lumiPrincipal.index());
0255     callWithTryCatchAndPrint<void>([this, &lumiPrincipal]() { readLuminosityBlock_(lumiPrincipal); },
0256                                    "Calling InputSource::readLuminosityBlock_");
0257     if (remainingLumis_ > 0) {
0258       --remainingLumis_;
0259     }
0260   }
0261 
0262   void InputSource::readAndMergeLumi(LuminosityBlockPrincipal& lbp) {
0263     LumiSourceSentry sentry(*this, lbp.index());
0264     callWithTryCatchAndPrint<void>([this, &lbp]() { readLuminosityBlock_(lbp); },
0265                                    "Calling InputSource::readLuminosityBlock_");
0266     if (remainingLumis_ > 0) {
0267       --remainingLumis_;
0268     }
0269   }
0270 
0271   void InputSource::fillProcessBlockHelper() { fillProcessBlockHelper_(); }
0272 
0273   bool InputSource::nextProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
0274     return nextProcessBlock_(processBlockPrincipal);
0275   }
0276 
0277   void InputSource::readProcessBlock(ProcessBlockPrincipal& processBlockPrincipal) {
0278     ProcessBlockSourceSentry sentry(*this, processBlockPrincipal.processName());
0279     callWithTryCatchAndPrint<void>([this, &processBlockPrincipal]() { readProcessBlock_(processBlockPrincipal); },
0280                                    "Calling InputSource::readProcessBlock_");
0281   }
0282 
0283   void InputSource::fillProcessBlockHelper_() {}
0284 
0285   bool InputSource::nextProcessBlock_(ProcessBlockPrincipal&) { return false; }
0286 
0287   void InputSource::readProcessBlock_(ProcessBlockPrincipal&) {}
0288 
0289   void InputSource::readRun_(RunPrincipal& runPrincipal) {
0290     // Note: For the moment, we do not support saving and restoring the state of the
0291     // random number generator if random numbers are generated during processing of runs
0292     // (e.g. beginRun(), endRun())
0293     runPrincipal.fillRunPrincipal(processHistoryRegistry());
0294   }
0295 
0296   void InputSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0297     auto history = processHistoryRegistry().getMapped(lumiPrincipal.aux().processHistoryID());
0298     lumiPrincipal.fillLuminosityBlockPrincipal(history);
0299   }
0300 
0301   void InputSource::readEvent(EventPrincipal& ep, StreamContext& streamContext) {
0302     assert(state_ == IsEvent);
0303     assert(!eventLimitReached());
0304     {
0305       // block scope, in order to issue the PostSourceEvent signal before calling postRead and issueReports
0306       EventSourceSentry sentry(*this, streamContext);
0307 
0308       callWithTryCatchAndPrint<void>([this, &ep]() { readEvent_(ep); }, "Calling InputSource::readEvent_");
0309     }
0310 
0311     if (remainingEvents_ > 0)
0312       --remainingEvents_;
0313     ++readCount_;
0314     setTimestamp(ep.time());
0315     issueReports(ep.id(), ep.streamID());
0316   }
0317 
0318   bool InputSource::readEvent(EventPrincipal& ep, EventID const& eventID, StreamContext& streamContext) {
0319     bool result = false;
0320 
0321     if (not limitReached()) {
0322       // the Pre/PostSourceEvent signals should be generated only if the event is actually found.
0323       // this should be taken care of by an EventSourceSentry in the implementaion of readIt()
0324 
0325       //result = callWithTryCatchAndPrint<bool>( [this,&eventID,&ep](){ return readIt(eventID, ep); }, "Calling InputSource::readIt" );
0326       result = readIt(eventID, ep, streamContext);
0327 
0328       if (result) {
0329         if (remainingEvents_ > 0)
0330           --remainingEvents_;
0331         ++readCount_;
0332         issueReports(ep.id(), ep.streamID());
0333       }
0334     }
0335     return result;
0336   }
0337 
0338   void InputSource::skipEvents(int offset) {
0339     callWithTryCatchAndPrint<void>([this, &offset]() { skip(offset); }, "Calling InputSource::skip");
0340   }
0341 
0342   bool InputSource::goToEvent(EventID const& eventID) {
0343     return callWithTryCatchAndPrint<bool>([this, &eventID]() { return goToEvent_(eventID); },
0344                                           "Calling InputSource::goToEvent_");
0345   }
0346 
0347   void InputSource::rewind() {
0348     state_ = IsInvalid;
0349     remainingEvents_ = maxEvents_;
0350     setNewRun();
0351     setNewLumi();
0352     resetEventCached();
0353     callWithTryCatchAndPrint<void>([this]() { rewind_(); }, "Calling InputSource::rewind_");
0354   }
0355 
0356   void InputSource::issueReports(EventID const& eventID, StreamID streamID) {
0357     if (isFwkInfoEnabled()) {
0358       LogFwkVerbatim("FwkReport") << "Begin processing the " << readCount_ << suffix(readCount_) << " record. Run "
0359                                   << eventID.run() << ", Event " << eventID.event() << ", LumiSection "
0360                                   << eventID.luminosityBlock() << " on stream " << streamID.value() << " at "
0361                                   << std::setprecision(3) << TimeOfDay();
0362     }
0363     if (!statusFileName_.empty()) {
0364       std::ofstream statusFile(statusFileName_.c_str());
0365       statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
0366       statusFile.close();
0367     }
0368 
0369     // At some point we may want to initiate checkpointing here
0370   }
0371 
0372   bool InputSource::readIt(EventID const&, EventPrincipal&, StreamContext&) {
0373     throw Exception(errors::LogicError) << "InputSource::readIt()\n"
0374                                         << "Random access is not implemented for this type of Input Source\n"
0375                                         << "Contact a Framework Developer\n";
0376   }
0377 
0378   void InputSource::setRun(RunNumber_t) {
0379     throw Exception(errors::LogicError) << "InputSource::setRun()\n"
0380                                         << "Run number cannot be modified for this type of Input Source\n"
0381                                         << "Contact a Framework Developer\n";
0382   }
0383 
0384   void InputSource::setLumi(LuminosityBlockNumber_t) {
0385     throw Exception(errors::LogicError) << "InputSource::setLumi()\n"
0386                                         << "Luminosity Block ID cannot be modified for this type of Input Source\n"
0387                                         << "Contact a Framework Developer\n";
0388   }
0389 
0390   void InputSource::skip(int) {
0391     throw Exception(errors::LogicError) << "InputSource::skip()\n"
0392                                         << "Random access are not implemented for this type of Input Source\n"
0393                                         << "Contact a Framework Developer\n";
0394   }
0395 
0396   bool InputSource::goToEvent_(EventID const&) {
0397     throw Exception(errors::LogicError) << "InputSource::goToEvent_()\n"
0398                                         << "Random access is not implemented for this type of Input Source\n"
0399                                         << "Contact a Framework Developer\n";
0400     return true;
0401   }
0402 
0403   void InputSource::rewind_() {
0404     throw Exception(errors::LogicError) << "InputSource::rewind()\n"
0405                                         << "Random access are not implemented for this type of Input Source\n"
0406                                         << "Contact a Framework Developer\n";
0407   }
0408 
0409   void InputSource::decreaseRemainingEventsBy(int iSkipped) {
0410     if (-1 == remainingEvents_) {
0411       return;
0412     }
0413     if (iSkipped < remainingEvents_) {
0414       remainingEvents_ -= iSkipped;
0415     } else {
0416       remainingEvents_ = 0;
0417     }
0418   }
0419 
0420   void InputSource::doBeginRun(RunPrincipal& rp, ProcessContext const*) {}
0421 
0422   void InputSource::doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*) {}
0423 
0424   bool InputSource::randomAccess() const {
0425     return callWithTryCatchAndPrint<bool>([this]() { return randomAccess_(); }, "Calling InputSource::randomAccess_");
0426   }
0427 
0428   ProcessingController::ForwardState InputSource::forwardState() const {
0429     return callWithTryCatchAndPrint<ProcessingController::ForwardState>([this]() { return forwardState_(); },
0430                                                                         "Calling InputSource::forwardState_");
0431   }
0432 
0433   ProcessingController::ReverseState InputSource::reverseState() const {
0434     return callWithTryCatchAndPrint<ProcessingController::ReverseState>([this]() { return reverseState_(); },
0435                                                                         "Calling InputSource::reverseState__");
0436   }
0437 
0438   void InputSource::beginJob() {}
0439 
0440   void InputSource::endJob() {}
0441 
0442   bool InputSource::randomAccess_() const { return false; }
0443 
0444   ProcessingController::ForwardState InputSource::forwardState_() const {
0445     return ProcessingController::kUnknownForward;
0446   }
0447 
0448   ProcessingController::ReverseState InputSource::reverseState_() const {
0449     return ProcessingController::kUnknownReverse;
0450   }
0451 
0452   ProcessHistoryID const& InputSource::reducedProcessHistoryID() const {
0453     assert(runAuxiliary());
0454     return processHistoryRegistry_->reducedProcessHistoryID(runAuxiliary()->processHistoryID());
0455   }
0456 
0457   RunNumber_t InputSource::run() const {
0458     assert(runAuxiliary());
0459     return runAuxiliary()->run();
0460   }
0461 
0462   LuminosityBlockNumber_t InputSource::luminosityBlock() const {
0463     assert(luminosityBlockAuxiliary());
0464     return luminosityBlockAuxiliary()->luminosityBlock();
0465   }
0466 
0467   InputSource::EventSourceSentry::EventSourceSentry(InputSource const& source, StreamContext& sc)
0468       : source_(source), sc_(sc) {
0469     source.actReg()->preSourceSignal_(sc_.streamID());
0470   }
0471 
0472   InputSource::EventSourceSentry::~EventSourceSentry() { source_.actReg()->postSourceSignal_(sc_.streamID()); }
0473 
0474   InputSource::LumiSourceSentry::LumiSourceSentry(InputSource const& source, LuminosityBlockIndex index)
0475       : source_(source), index_(index) {
0476     source_.actReg()->preSourceLumiSignal_(index_);
0477   }
0478 
0479   InputSource::LumiSourceSentry::~LumiSourceSentry() { source_.actReg()->postSourceLumiSignal_(index_); }
0480 
0481   InputSource::RunSourceSentry::RunSourceSentry(InputSource const& source, RunIndex index)
0482       : source_(source), index_(index) {
0483     source_.actReg()->preSourceRunSignal_(index_);
0484   }
0485 
0486   InputSource::RunSourceSentry::~RunSourceSentry() { source_.actReg()->postSourceRunSignal_(index_); }
0487 
0488   InputSource::ProcessBlockSourceSentry::ProcessBlockSourceSentry(InputSource const& source,
0489                                                                   std::string const& processName)
0490       : source_(source), processName_(processName) {
0491     source_.actReg()->preSourceProcessBlockSignal_();
0492   }
0493 
0494   InputSource::ProcessBlockSourceSentry::~ProcessBlockSourceSentry() {
0495     source_.actReg()->postSourceProcessBlockSignal_(processName_);
0496   }
0497 
0498   InputSource::FileOpenSentry::FileOpenSentry(InputSource const& source, std::string const& lfn)
0499       : post_(source.actReg()->postOpenFileSignal_), lfn_(lfn) {
0500     source.actReg()->preOpenFileSignal_(lfn);
0501   }
0502 
0503   InputSource::FileOpenSentry::~FileOpenSentry() { post_(lfn_); }
0504 
0505   InputSource::FileCloseSentry::FileCloseSentry(InputSource const& source, std::string const& lfn)
0506       : post_(source.actReg()->postCloseFileSignal_), lfn_(lfn) {
0507     source.actReg()->preCloseFileSignal_(lfn);
0508   }
0509 
0510   InputSource::FileCloseSentry::~FileCloseSentry() { post_(lfn_); }
0511 }  // namespace edm