Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-23 02:05:09

0001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
0002 
0003 #include "IOPool/Streamer/interface/EventMessage.h"
0004 #include "IOPool/Streamer/interface/InitMessage.h"
0005 #include "IOPool/Streamer/interface/ClassFiller.h"
0006 
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/FileBlock.h"
0009 #include "DataFormats/Provenance/interface/ProductDescription.h"
0010 #include "DataFormats/Provenance/interface/ProductProvenance.h"
0011 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0012 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0013 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0014 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0015 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0016 #include "DataFormats/Provenance/interface/BranchListIndex.h"
0017 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0018 
0019 #include "zlib.h"
0020 #include "lzma.h"
0021 #include "zstd.h"
0022 
0023 #include "DataFormats/Common/interface/RefCoreStreamer.h"
0024 #include "FWCore/Utilities/interface/WrappedClassName.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0027 #include "FWCore/ParameterSet/interface/Registry.h"
0028 #include "FWCore/Utilities/interface/EDMException.h"
0029 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0030 #include "FWCore/Reflection/interface/DictionaryTools.h"
0031 
0032 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0033 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0034 #include "FWCore/Utilities/interface/DebugMacros.h"
0035 
0036 #include <string>
0037 #include <iostream>
0038 #include <set>
0039 
0040 namespace edm::streamer {
0041   namespace {
0042     int const init_size = 1024 * 1024;
0043   }
0044 
0045   StreamerInputSource::StreamerInputSource(ParameterSet const& pset, InputSourceDescription const& desc)
0046       : RawInputSource(pset, desc),
0047         tc_(getTClass(typeid(SendEvent))),
0048         dest_(init_size),
0049         xbuf_(TBuffer::kRead, init_size),
0050         sendEvent_(),
0051         eventPrincipalHolder_(),
0052         processName_(),
0053         protocolVersion_(0U) {}
0054 
0055   StreamerInputSource::~StreamerInputSource() {}
0056 
0057   // ---------------------------------------
0058   void StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, bool subsequent) {
0059     SendDescs const& descs = header.descs();
0060 
0061     FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
0062 
0063     if (subsequent) {
0064       ProductRegistry pReg;
0065       pReg.updateFromInput(descs);
0066       std::string mergeInfo = reg.merge(pReg, std::string(), ProductDescription::Permissive);
0067       if (!mergeInfo.empty()) {
0068         throw cms::Exception("MismatchedInput", "StreamerInputSource::mergeIntoRegistry") << mergeInfo;
0069       }
0070     } else {
0071       declareStreamers(descs);
0072       buildClassCache(descs);
0073       loadExtraClasses();
0074       if (!reg.frozen()) {
0075         reg.updateFromInput(descs);
0076       }
0077     }
0078   }
0079 
0080   void StreamerInputSource::declareStreamers(SendDescs const& descs) {
0081     std::vector<std::string> missingDictionaries;
0082     std::vector<std::string> branchNamesForMissing;
0083     std::vector<std::string> producedTypes;
0084     for (auto const& item : descs) {
0085       //pi->init();
0086       std::string const real_name = wrappedClassName(item.className());
0087       FDEBUG(6) << "declare: " << real_name << std::endl;
0088       if (!loadCap(real_name, missingDictionaries)) {
0089         branchNamesForMissing.emplace_back(item.branchName());
0090         producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
0091       }
0092     }
0093     if (!missingDictionaries.empty()) {
0094       std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
0095       throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
0096     }
0097   }
0098 
0099   void StreamerInputSource::buildClassCache(SendDescs const& descs) {
0100     for (auto const& item : descs) {
0101       //pi->init();
0102       std::string const real_name = wrappedClassName(item.className());
0103       FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
0104       doBuildRealData(real_name);
0105     }
0106   }
0107 
0108   /**
0109    * Deserializes the specified init message into a SendJobHeader object
0110    * (which is related to the product registry).
0111    */
0112   std::unique_ptr<SendJobHeader> StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
0113     if (initView.code() != Header::INIT)
0114       throw cms::Exception("StreamTranslation", "Registry deserialization error")
0115           << "received wrong message type: expected INIT, got " << initView.code() << "\n";
0116 
0117     //Get the process name and store if for Protocol version 4 and above.
0118     if (initView.protocolVersion() > 3) {
0119       processName_ = initView.processName();
0120       protocolVersion_ = initView.protocolVersion();
0121 
0122       FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = " << processName_ << std::endl;
0123       FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= " << protocolVersion_ << std::endl;
0124     }
0125 
0126     // calculate the adler32 checksum
0127     uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(), initView.descLength());
0128     //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
0129     //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
0130     //          << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
0131     if ((uint32)adler32_chksum != initView.adler32_chksum()) {
0132       // skip event (based on option?) or throw exception?
0133       throw cms::Exception("StreamDeserialization", "Checksum error")
0134           << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum()
0135           << " host name = " << initView.hostName() << std::endl;
0136     }
0137 
0138     TClass* desc = getTClass(typeid(SendJobHeader));
0139 
0140     TBufferFile xbuf(
0141         TBuffer::kRead, initView.descLength(), const_cast<char*>((char const*)initView.descData()), kFALSE);
0142     RootDebug tracer(10, 10);
0143     std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
0144 
0145     if (sd.get() == nullptr) {
0146       throw cms::Exception("StreamTranslation", "Registry deserialization error")
0147           << "Could not read the initial product registry list\n";
0148     }
0149 
0150     sd->initializeTransients();
0151     return sd;
0152   }
0153 
0154   /**
0155    * Deserializes the specified init message into a SendJobHeader object
0156    * and merges registries.
0157    */
0158   void StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
0159     std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
0160     mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
0161     SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
0162     pset::Registry& psetRegistry = *pset::Registry::instance();
0163     for (auto const& item : psetMap) {
0164       ParameterSet pset(item.second.pset());
0165       pset.setID(item.first);
0166       psetRegistry.insertMapped(pset);
0167     }
0168   }
0169 
0170   void StreamerInputSource::updateEventMetaData() {
0171     branchIDListHelper()->updateFromInput(sendEvent_->branchIDLists());
0172     thinnedAssociationsHelper()->updateFromPrimaryInput(sendEvent_->thinnedAssociationsHelper());
0173   }
0174 
0175   uint32_t StreamerInputSource::eventMetaDataChecksum(EventMsgView const& eventView) const {
0176     return eventView.adler32_chksum();
0177   }
0178 
0179   void StreamerInputSource::deserializeEventMetaData(EventMsgView const& eventView) {
0180     deserializeEventCommon(eventView, true);
0181   }
0182   /**
0183    * Deserializes the specified event message.
0184    */
0185   void StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
0186     deserializeEventCommon(eventView, false);
0187   }
0188 
0189   void StreamerInputSource::deserializeEventCommon(EventMsgView const& eventView, bool isMetaData) {
0190     if (eventView.code() != Header::EVENT)
0191       throw cms::Exception("StreamTranslation", "Event deserialization error")
0192           << "received wrong message type: expected EVENT, got " << eventView.code() << "\n";
0193     FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " "
0194               << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData()
0195               << std::endl;
0196     // uncompress if we need to
0197     // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
0198     // need to get rid of this when 090 MTCC streamers are gotten rid of
0199     unsigned long origsize = eventView.origDataSize();
0200     unsigned long dest_size;  //(should be >= eventView.origDataSize())
0201 
0202     uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
0203     //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
0204     //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
0205     //          << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
0206     if (static_cast<uint32>(adler32_chksum) != eventView.adler32_chksum()) {
0207       // skip event (based on option?) or throw exception?
0208       throw cms::Exception("StreamDeserialization", "Checksum error")
0209           << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum()
0210           << " host name = " << eventView.hostName() << std::endl;
0211     }
0212     if (origsize != 78 && origsize != 0) {
0213       // compressed
0214       if (isBufferLZMA((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0215         dest_size = uncompressBufferLZMA(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0216                                          eventView.eventLength(),
0217                                          dest_,
0218                                          origsize);
0219       } else if (isBufferZSTD((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0220         dest_size = uncompressBufferZSTD(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0221                                          eventView.eventLength(),
0222                                          dest_,
0223                                          origsize);
0224       } else
0225         dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0226                                      eventView.eventLength(),
0227                                      dest_,
0228                                      origsize);
0229     } else {  // not compressed
0230       // we need to copy anyway the buffer as we are using dest in xbuf
0231       dest_size = eventView.eventLength();
0232       dest_.resize(dest_size);
0233       unsigned char* pos = (unsigned char*)&dest_[0];
0234       unsigned char const* from = (unsigned char const*)eventView.eventData();
0235       std::copy(from, from + dest_size, pos);
0236     }
0237     //TBuffer xbuf(TBuffer::kRead, dest_size,
0238     //             (char const*) &dest[0],kFALSE);
0239     //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
0240     //             (char const*) eventView.eventData(),kFALSE);
0241     xbuf_.Reset();
0242     xbuf_.SetBuffer(&dest_[0], dest_size, kFALSE);
0243     RootDebug tracer(10, 10);
0244 
0245     //We do not yet know which EventPrincipal we will use, therefore
0246     // we are using a new EventPrincipalHolder as a proxy. We need to
0247     // make a new one instead of reusing the same one becuase when running
0248     // multi-threaded there will be multiple EventPrincipals being used
0249     // simultaneously.
0250     eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>();  // propagate_const<T> has no reset() function
0251     {
0252       RefCoreStreamerGuard guard(eventPrincipalHolder_.get());
0253       sendEvent_ = std::unique_ptr<SendEvent>(reinterpret_cast<SendEvent*>(xbuf_.ReadObjectAny(tc_)));
0254     }
0255 
0256     if (sendEvent_.get() == nullptr) {
0257       throw cms::Exception("StreamTranslation", "Event deserialization error")
0258           << "got a null event from input stream\n";
0259     }
0260 
0261     if (isMetaData) {
0262       eventMetaDataChecksum_ = adler32_chksum;
0263       return;
0264     }
0265 
0266     if (sendEvent_->metaDataChecksum() != eventMetaDataChecksum_) {
0267       throw cms::Exception("StreamTranslation") << " meta data checksum from event " << sendEvent_->metaDataChecksum()
0268                                                 << " does not match last read meta data " << eventMetaDataChecksum_;
0269     }
0270 
0271     processHistoryRegistryForUpdate().registerProcessHistory(sendEvent_->processHistory());
0272 
0273     FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
0274     if (runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run() ||
0275         runAuxiliary()->processHistoryID() != sendEvent_->processHistory().id()) {
0276       RunAuxiliary* runAuxiliary =
0277           new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0278       runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0279       setRunAuxiliary(runAuxiliary);
0280       resetLuminosityBlockAuxiliary();
0281     }
0282     if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
0283       LuminosityBlockAuxiliary* luminosityBlockAuxiliary = new LuminosityBlockAuxiliary(
0284           runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0285       luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0286       setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
0287     }
0288     setEventCached();
0289   }
0290 
0291   void StreamerInputSource::read(EventPrincipal& eventPrincipal) {
0292     EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
0293     BranchListIndexes indexes(sendEvent_->branchListIndexes());
0294     branchIDListHelper()->fixBranchListIndexes(indexes);
0295     auto history = processHistoryRegistry().getMapped(sendEvent_->aux().processHistoryID());
0296     eventPrincipal.fillEventPrincipal(sendEvent_->aux(), history, std::move(ids), std::move(indexes));
0297 
0298     //We now know which eventPrincipal to use and we can reuse the slot in
0299     // streamToEventPrincipalHolders to own the memory
0300     eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
0301     if (streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() + 1) {
0302       streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() + 1);
0303     }
0304     streamToEventPrincipalHolders_[eventPrincipal.streamID().value()] = std::move(eventPrincipalHolder_);
0305 
0306     // no process name list handling
0307 
0308     SendProds& sps = sendEvent_->products();
0309     for (auto& spitem : sps) {
0310       FDEBUG(10) << "check prodpair" << std::endl;
0311       if (spitem.desc() == nullptr)
0312         throw cms::Exception("StreamTranslation", "Empty Provenance");
0313       FDEBUG(5) << "Prov:"
0314                 << " " << spitem.desc()->className() << " " << spitem.desc()->productInstanceName() << " "
0315                 << spitem.desc()->branchID() << std::endl;
0316 
0317       ProductDescription const branchDesc(*spitem.desc());
0318       // This ProductProvenance constructor inserts into the entry description registry
0319       if (spitem.parents()) {
0320         std::optional<ProductProvenance> productProvenance{std::in_place, spitem.branchID(), *spitem.parents()};
0321         if (spitem.prod() != nullptr) {
0322           FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0323           eventPrincipal.putOnRead(
0324               branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
0325           FDEBUG(10) << "addproduct done" << std::endl;
0326         } else {
0327           FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0328           eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
0329           FDEBUG(10) << "addproduct empty done" << std::endl;
0330         }
0331       } else {
0332         std::optional<ProductProvenance> productProvenance;
0333         if (spitem.prod() != nullptr) {
0334           FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0335           eventPrincipal.putOnRead(
0336               branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
0337           FDEBUG(10) << "addproduct done" << std::endl;
0338         } else {
0339           FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0340           eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
0341           FDEBUG(10) << "addproduct empty done" << std::endl;
0342         }
0343       }
0344       spitem.clear();
0345     }
0346 
0347     FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
0348   }
0349 
0350   /**
0351    * Uncompresses the data in the specified input buffer into the
0352    * specified output buffer.  The inputSize should be set to the size
0353    * of the compressed data in the inputBuffer.  The expectedFullSize should
0354    * be set to the original size of the data (before compression).
0355    * Returns the actual size of the uncompressed data.
0356    * Errors are reported by throwing exceptions.
0357    */
0358   unsigned int StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
0359                                                      unsigned int inputSize,
0360                                                      std::vector<unsigned char>& outputBuffer,
0361                                                      unsigned int expectedFullSize) {
0362     unsigned long origSize = expectedFullSize;
0363     unsigned long uncompressedSize = expectedFullSize * 1.1;
0364     FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0365     outputBuffer.resize(uncompressedSize);
0366     int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize);  // do not need compression level
0367     //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
0368     if (ret == Z_OK) {
0369       // check the length against original uncompressed length
0370       FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0371       if (origSize != uncompressedSize) {
0372         // we throw an error and return without event! null pointer
0373         throw cms::Exception("StreamDeserialization", "Uncompression error")
0374             << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0375       }
0376     } else {
0377       // we throw an error and return without event! null pointer
0378       throw cms::Exception("StreamDeserialization", "Uncompression error") << "Error code = " << ret << "\n ";
0379     }
0380     return (unsigned int)uncompressedSize;
0381   }
0382 
0383   bool StreamerInputSource::isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize) {
0384     if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "XZ"))
0385       return true;
0386     else
0387       return false;
0388   }
0389 
0390   unsigned int StreamerInputSource::uncompressBufferLZMA(unsigned char* inputBuffer,
0391                                                          unsigned int inputSize,
0392                                                          std::vector<unsigned char>& outputBuffer,
0393                                                          unsigned int expectedFullSize,
0394                                                          bool hasHeader) {
0395     unsigned long origSize = expectedFullSize;
0396     unsigned long uncompressedSize = expectedFullSize * 1.1;
0397     FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0398     outputBuffer.resize(uncompressedSize);
0399 
0400     lzma_stream stream = LZMA_STREAM_INIT;
0401     lzma_ret returnStatus;
0402 
0403     returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
0404     if (returnStatus != LZMA_OK) {
0405       throw cms::Exception("StreamDeserializationLZM", "LZMA stream decoder error")
0406           << "Error code = " << returnStatus << "\n ";
0407     }
0408 
0409     size_t hdrSize = hasHeader ? 4 : 0;
0410     stream.next_in = (const uint8_t*)(inputBuffer + hdrSize);
0411     stream.avail_in = (size_t)(inputSize - hdrSize);
0412     stream.next_out = (uint8_t*)&outputBuffer[0];
0413     stream.avail_out = (size_t)uncompressedSize;
0414 
0415     returnStatus = lzma_code(&stream, LZMA_FINISH);
0416     if (returnStatus != LZMA_STREAM_END) {
0417       lzma_end(&stream);
0418       throw cms::Exception("StreamDeserializationLZM", "LZMA uncompression error")
0419           << "Error code = " << returnStatus << "\n ";
0420     }
0421     lzma_end(&stream);
0422 
0423     uncompressedSize = (unsigned int)stream.total_out;
0424 
0425     FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0426     if (origSize != uncompressedSize) {
0427       // we throw an error and return without event! null pointer
0428       throw cms::Exception("StreamDeserialization", "LZMA uncompression error")
0429           << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0430     }
0431 
0432     return uncompressedSize;
0433   }
0434 
0435   bool StreamerInputSource::isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize) {
0436     if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "ZS"))
0437       return true;
0438     else
0439       return false;
0440   }
0441 
0442   unsigned int StreamerInputSource::uncompressBufferZSTD(unsigned char* inputBuffer,
0443                                                          unsigned int inputSize,
0444                                                          std::vector<unsigned char>& outputBuffer,
0445                                                          unsigned int expectedFullSize,
0446                                                          bool hasHeader) {
0447     unsigned long uncompressedSize = expectedFullSize * 1.1;
0448     FDEBUG(1) << "Uncompress: original size = " << expectedFullSize << ", compressed size = " << inputSize << std::endl;
0449     outputBuffer.resize(uncompressedSize);
0450 
0451     size_t hdrSize = hasHeader ? 4 : 0;
0452     size_t ret = ZSTD_decompress(
0453         (void*)&(outputBuffer[0]), uncompressedSize, (const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
0454 
0455     if (ZSTD_isError(ret)) {
0456       throw cms::Exception("StreamDeserializationZSTD", "ZSTD uncompression error")
0457           << "Error core " << ret << ", message:" << ZSTD_getErrorName(ret);
0458     }
0459     return (unsigned int)ret;
0460   }
0461 
0462   void StreamerInputSource::resetAfterEndRun() {
0463     // called from an online streamer source to reset after a stop command
0464     // so an enable command will work
0465     resetLuminosityBlockAuxiliary();
0466     resetRunAuxiliary();
0467     assert(!eventCached());
0468     reset();
0469   }
0470 
0471   void StreamerInputSource::setRun(RunNumber_t) {
0472     // Need to define a dummy setRun here or else the InputSource::setRun is called
0473     // if we have a source inheriting from this and wants to define a setRun method
0474     throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n"
0475                                         << "Run number cannot be modified for this type of Input Source\n"
0476                                         << "Contact a Storage Manager Developer\n";
0477   }
0478 
0479   StreamerInputSource::EventPrincipalHolder::EventPrincipalHolder() : eventPrincipal_(nullptr) {}
0480 
0481   StreamerInputSource::EventPrincipalHolder::~EventPrincipalHolder() {}
0482 
0483   WrapperBase const* StreamerInputSource::EventPrincipalHolder::getIt(ProductID const& id) const {
0484     return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
0485   }
0486 
0487   std::optional<std::tuple<edm::WrapperBase const*, unsigned int>>
0488   StreamerInputSource::EventPrincipalHolder::getThinnedProduct(edm::ProductID const& id, unsigned int index) const {
0489     if (eventPrincipal_)
0490       return eventPrincipal_->getThinnedProduct(id, index);
0491     return std::nullopt;
0492   }
0493 
0494   void StreamerInputSource::EventPrincipalHolder::getThinnedProducts(ProductID const& pid,
0495                                                                      std::vector<WrapperBase const*>& wrappers,
0496                                                                      std::vector<unsigned int>& keys) const {
0497     if (eventPrincipal_)
0498       eventPrincipal_->getThinnedProducts(pid, wrappers, keys);
0499   }
0500 
0501   edm::OptionalThinnedKey StreamerInputSource::EventPrincipalHolder::getThinnedKeyFrom(
0502       edm::ProductID const& parent, unsigned int index, edm::ProductID const& thinned) const {
0503     if (eventPrincipal_) {
0504       return eventPrincipal_->getThinnedKeyFrom(parent, index, thinned);
0505     } else {
0506       return std::monostate{};
0507     }
0508   }
0509 
0510   unsigned int StreamerInputSource::EventPrincipalHolder::transitionIndex_() const {
0511     assert(eventPrincipal_ != nullptr);
0512     return eventPrincipal_->transitionIndex();
0513   }
0514 
0515   void StreamerInputSource::EventPrincipalHolder::setEventPrincipal(EventPrincipal* ep) { eventPrincipal_ = ep; }
0516 
0517   void StreamerInputSource::fillDescription(ParameterSetDescription& desc) { RawInputSource::fillDescription(desc); }
0518 }  // namespace edm::streamer