Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-02-14 14:21:20

0001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
0002 
0003 #include "IOPool/Streamer/interface/EventMessage.h"
0004 #include "IOPool/Streamer/interface/InitMessage.h"
0005 #include "IOPool/Streamer/interface/ClassFiller.h"
0006 
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/FileBlock.h"
0009 #include "DataFormats/Provenance/interface/BranchDescription.h"
0010 #include "DataFormats/Provenance/interface/ProductProvenance.h"
0011 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0012 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0013 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0014 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0015 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0016 #include "DataFormats/Provenance/interface/BranchListIndex.h"
0017 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0018 
0019 #include "zlib.h"
0020 #include "lzma.h"
0021 #include "zstd.h"
0022 
0023 #include "DataFormats/Common/interface/RefCoreStreamer.h"
0024 #include "FWCore/Utilities/interface/WrappedClassName.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0027 #include "FWCore/ParameterSet/interface/Registry.h"
0028 #include "FWCore/Utilities/interface/EDMException.h"
0029 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0030 #include "FWCore/Reflection/interface/DictionaryTools.h"
0031 
0032 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0033 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0034 #include "FWCore/Utilities/interface/DebugMacros.h"
0035 
0036 #include <string>
0037 #include <iostream>
0038 #include <set>
0039 
0040 namespace edm {
0041   namespace {
0042     int const init_size = 1024 * 1024;
0043   }
0044 
0045   StreamerInputSource::StreamerInputSource(ParameterSet const& pset, InputSourceDescription const& desc)
0046       : RawInputSource(pset, desc),
0047         tc_(getTClass(typeid(SendEvent))),
0048         dest_(init_size),
0049         xbuf_(TBuffer::kRead, init_size),
0050         sendEvent_(),
0051         eventPrincipalHolder_(),
0052         adjustEventToNewProductRegistry_(false),
0053         processName_(),
0054         protocolVersion_(0U) {}
0055 
0056   StreamerInputSource::~StreamerInputSource() {}
0057 
0058   // ---------------------------------------
0059   void StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header,
0060                                               ProductRegistry& reg,
0061                                               BranchIDListHelper& branchIDListHelper,
0062                                               ThinnedAssociationsHelper& thinnedHelper,
0063                                               bool subsequent) {
0064     SendDescs const& descs = header.descs();
0065 
0066     FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
0067 
0068     if (subsequent) {
0069       ProductRegistry pReg;
0070       pReg.updateFromInput(descs);
0071       std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
0072       if (!mergeInfo.empty()) {
0073         throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo;
0074       }
0075       branchIDListHelper.updateFromInput(header.branchIDLists());
0076       thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
0077     } else {
0078       declareStreamers(descs);
0079       buildClassCache(descs);
0080       loadExtraClasses();
0081       if (!reg.frozen()) {
0082         reg.updateFromInput(descs);
0083       }
0084       branchIDListHelper.updateFromInput(header.branchIDLists());
0085       thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
0086     }
0087   }
0088 
0089   void StreamerInputSource::declareStreamers(SendDescs const& descs) {
0090     std::vector<std::string> missingDictionaries;
0091     std::vector<std::string> branchNamesForMissing;
0092     std::vector<std::string> producedTypes;
0093     for (auto const& item : descs) {
0094       //pi->init();
0095       std::string const real_name = wrappedClassName(item.className());
0096       FDEBUG(6) << "declare: " << real_name << std::endl;
0097       if (!loadCap(real_name, missingDictionaries)) {
0098         branchNamesForMissing.emplace_back(item.branchName());
0099         producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
0100       }
0101     }
0102     if (!missingDictionaries.empty()) {
0103       std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
0104       throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
0105     }
0106   }
0107 
0108   void StreamerInputSource::buildClassCache(SendDescs const& descs) {
0109     for (auto const& item : descs) {
0110       //pi->init();
0111       std::string const real_name = wrappedClassName(item.className());
0112       FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
0113       doBuildRealData(real_name);
0114     }
0115   }
0116 
0117   /**
0118    * Deserializes the specified init message into a SendJobHeader object
0119    * (which is related to the product registry).
0120    */
0121   std::unique_ptr<SendJobHeader> StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
0122     if (initView.code() != Header::INIT)
0123       throw cms::Exception("StreamTranslation", "Registry deserialization error")
0124           << "received wrong message type: expected INIT, got " << initView.code() << "\n";
0125 
0126     //Get the process name and store if for Protocol version 4 and above.
0127     if (initView.protocolVersion() > 3) {
0128       processName_ = initView.processName();
0129       protocolVersion_ = initView.protocolVersion();
0130 
0131       FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = " << processName_ << std::endl;
0132       FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= " << protocolVersion_ << std::endl;
0133     }
0134 
0135     // calculate the adler32 checksum
0136     uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(), initView.descLength());
0137     //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
0138     //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
0139     //          << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
0140     if ((uint32)adler32_chksum != initView.adler32_chksum()) {
0141       // skip event (based on option?) or throw exception?
0142       throw cms::Exception("StreamDeserialization", "Checksum error")
0143           << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum()
0144           << " host name = " << initView.hostName() << std::endl;
0145     }
0146 
0147     TClass* desc = getTClass(typeid(SendJobHeader));
0148 
0149     TBufferFile xbuf(
0150         TBuffer::kRead, initView.descLength(), const_cast<char*>((char const*)initView.descData()), kFALSE);
0151     RootDebug tracer(10, 10);
0152     std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
0153 
0154     if (sd.get() == nullptr) {
0155       throw cms::Exception("StreamTranslation", "Registry deserialization error")
0156           << "Could not read the initial product registry list\n";
0157     }
0158 
0159     sd->initializeTransients();
0160     return sd;
0161   }
0162 
0163   /**
0164    * Deserializes the specified init message into a SendJobHeader object
0165    * and merges registries.
0166    */
0167   void StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
0168     std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
0169     mergeIntoRegistry(*sd, productRegistryUpdate(), *branchIDListHelper(), *thinnedAssociationsHelper(), subsequent);
0170     if (subsequent) {
0171       adjustEventToNewProductRegistry_ = true;
0172     }
0173     SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
0174     pset::Registry& psetRegistry = *pset::Registry::instance();
0175     for (auto const& item : psetMap) {
0176       ParameterSet pset(item.second.pset());
0177       pset.setID(item.first);
0178       psetRegistry.insertMapped(pset);
0179     }
0180   }
0181 
0182   /**
0183    * Deserializes the specified event message.
0184    */
0185   void StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
0186     if (eventView.code() != Header::EVENT)
0187       throw cms::Exception("StreamTranslation", "Event deserialization error")
0188           << "received wrong message type: expected EVENT, got " << eventView.code() << "\n";
0189     FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " "
0190               << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData()
0191               << std::endl;
0192     // uncompress if we need to
0193     // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
0194     // need to get rid of this when 090 MTCC streamers are gotten rid of
0195     unsigned long origsize = eventView.origDataSize();
0196     unsigned long dest_size;  //(should be >= eventView.origDataSize())
0197 
0198     uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
0199     //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
0200     //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
0201     //          << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
0202     if ((uint32)adler32_chksum != eventView.adler32_chksum()) {
0203       // skip event (based on option?) or throw exception?
0204       throw cms::Exception("StreamDeserialization", "Checksum error")
0205           << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum()
0206           << " host name = " << eventView.hostName() << std::endl;
0207     }
0208     if (origsize != 78 && origsize != 0) {
0209       // compressed
0210       if (isBufferLZMA((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0211         dest_size = uncompressBufferLZMA(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0212                                          eventView.eventLength(),
0213                                          dest_,
0214                                          origsize);
0215       } else if (isBufferZSTD((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0216         dest_size = uncompressBufferZSTD(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0217                                          eventView.eventLength(),
0218                                          dest_,
0219                                          origsize);
0220       } else
0221         dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0222                                      eventView.eventLength(),
0223                                      dest_,
0224                                      origsize);
0225     } else {  // not compressed
0226       // we need to copy anyway the buffer as we are using dest in xbuf
0227       dest_size = eventView.eventLength();
0228       dest_.resize(dest_size);
0229       unsigned char* pos = (unsigned char*)&dest_[0];
0230       unsigned char const* from = (unsigned char const*)eventView.eventData();
0231       std::copy(from, from + dest_size, pos);
0232     }
0233     //TBuffer xbuf(TBuffer::kRead, dest_size,
0234     //             (char const*) &dest[0],kFALSE);
0235     //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
0236     //             (char const*) eventView.eventData(),kFALSE);
0237     xbuf_.Reset();
0238     xbuf_.SetBuffer(&dest_[0], dest_size, kFALSE);
0239     RootDebug tracer(10, 10);
0240 
0241     //We do not yet know which EventPrincipal we will use, therefore
0242     // we are using a new EventPrincipalHolder as a proxy. We need to
0243     // make a new one instead of reusing the same one becuase when running
0244     // multi-threaded there will be multiple EventPrincipals being used
0245     // simultaneously.
0246     eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>();  // propagate_const<T> has no reset() function
0247     setRefCoreStreamer(eventPrincipalHolder_.get());
0248     {
0249       std::shared_ptr<void> refCoreStreamerGuard(nullptr, [](void*) {
0250         setRefCoreStreamer();
0251         ;
0252       });
0253       sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
0254     }
0255 
0256     if (sendEvent_.get() == nullptr) {
0257       throw cms::Exception("StreamTranslation", "Event deserialization error")
0258           << "got a null event from input stream\n";
0259     }
0260     processHistoryRegistryForUpdate().registerProcessHistory(sendEvent_->processHistory());
0261 
0262     FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
0263     if (runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run() ||
0264         runAuxiliary()->processHistoryID() != sendEvent_->processHistory().id()) {
0265       RunAuxiliary* runAuxiliary =
0266           new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0267       runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0268       setRunAuxiliary(runAuxiliary);
0269       resetLuminosityBlockAuxiliary();
0270     }
0271     if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
0272       LuminosityBlockAuxiliary* luminosityBlockAuxiliary = new LuminosityBlockAuxiliary(
0273           runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0274       luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0275       setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
0276     }
0277     setEventCached();
0278   }
0279 
0280   void StreamerInputSource::read(EventPrincipal& eventPrincipal) {
0281     if (adjustEventToNewProductRegistry_) {
0282       eventPrincipal.adjustIndexesAfterProductRegistryAddition();
0283       bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
0284       assert(eventOK);
0285       adjustEventToNewProductRegistry_ = false;
0286     }
0287     EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
0288     BranchListIndexes indexes(sendEvent_->branchListIndexes());
0289     branchIDListHelper()->fixBranchListIndexes(indexes);
0290     auto history = processHistoryRegistry().getMapped(sendEvent_->aux().processHistoryID());
0291     eventPrincipal.fillEventPrincipal(sendEvent_->aux(), history, std::move(ids), std::move(indexes));
0292 
0293     //We now know which eventPrincipal to use and we can reuse the slot in
0294     // streamToEventPrincipalHolders to own the memory
0295     eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
0296     if (streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() + 1) {
0297       streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() + 1);
0298     }
0299     streamToEventPrincipalHolders_[eventPrincipal.streamID().value()] = std::move(eventPrincipalHolder_);
0300 
0301     // no process name list handling
0302 
0303     SendProds& sps = sendEvent_->products();
0304     for (auto& spitem : sps) {
0305       FDEBUG(10) << "check prodpair" << std::endl;
0306       if (spitem.desc() == nullptr)
0307         throw cms::Exception("StreamTranslation", "Empty Provenance");
0308       FDEBUG(5) << "Prov:"
0309                 << " " << spitem.desc()->className() << " " << spitem.desc()->productInstanceName() << " "
0310                 << spitem.desc()->branchID() << std::endl;
0311 
0312       BranchDescription const branchDesc(*spitem.desc());
0313       // This ProductProvenance constructor inserts into the entry description registry
0314       if (spitem.parents()) {
0315         std::optional<ProductProvenance> productProvenance{std::in_place, spitem.branchID(), *spitem.parents()};
0316         if (spitem.prod() != nullptr) {
0317           FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0318           eventPrincipal.putOnRead(branchDesc,
0319                                    std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())),
0320                                    std::move(productProvenance));
0321           FDEBUG(10) << "addproduct done" << std::endl;
0322         } else {
0323           FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0324           eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), std::move(productProvenance));
0325           FDEBUG(10) << "addproduct empty done" << std::endl;
0326         }
0327       } else {
0328         std::optional<ProductProvenance> productProvenance;
0329         if (spitem.prod() != nullptr) {
0330           FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0331           eventPrincipal.putOnRead(
0332               branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
0333           FDEBUG(10) << "addproduct done" << std::endl;
0334         } else {
0335           FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0336           eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
0337           FDEBUG(10) << "addproduct empty done" << std::endl;
0338         }
0339       }
0340       spitem.clear();
0341     }
0342 
0343     FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
0344   }
0345 
0346   /**
0347    * Uncompresses the data in the specified input buffer into the
0348    * specified output buffer.  The inputSize should be set to the size
0349    * of the compressed data in the inputBuffer.  The expectedFullSize should
0350    * be set to the original size of the data (before compression).
0351    * Returns the actual size of the uncompressed data.
0352    * Errors are reported by throwing exceptions.
0353    */
0354   unsigned int StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
0355                                                      unsigned int inputSize,
0356                                                      std::vector<unsigned char>& outputBuffer,
0357                                                      unsigned int expectedFullSize) {
0358     unsigned long origSize = expectedFullSize;
0359     unsigned long uncompressedSize = expectedFullSize * 1.1;
0360     FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0361     outputBuffer.resize(uncompressedSize);
0362     int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize);  // do not need compression level
0363     //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
0364     if (ret == Z_OK) {
0365       // check the length against original uncompressed length
0366       FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0367       if (origSize != uncompressedSize) {
0368         // we throw an error and return without event! null pointer
0369         throw cms::Exception("StreamDeserialization", "Uncompression error")
0370             << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0371       }
0372     } else {
0373       // we throw an error and return without event! null pointer
0374       throw cms::Exception("StreamDeserialization", "Uncompression error") << "Error code = " << ret << "\n ";
0375     }
0376     return (unsigned int)uncompressedSize;
0377   }
0378 
0379   bool StreamerInputSource::isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize) {
0380     if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "XZ"))
0381       return true;
0382     else
0383       return false;
0384   }
0385 
0386   unsigned int StreamerInputSource::uncompressBufferLZMA(unsigned char* inputBuffer,
0387                                                          unsigned int inputSize,
0388                                                          std::vector<unsigned char>& outputBuffer,
0389                                                          unsigned int expectedFullSize,
0390                                                          bool hasHeader) {
0391     unsigned long origSize = expectedFullSize;
0392     unsigned long uncompressedSize = expectedFullSize * 1.1;
0393     FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0394     outputBuffer.resize(uncompressedSize);
0395 
0396     lzma_stream stream = LZMA_STREAM_INIT;
0397     lzma_ret returnStatus;
0398 
0399     returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
0400     if (returnStatus != LZMA_OK) {
0401       throw cms::Exception("StreamDeserializationLZM", "LZMA stream decoder error")
0402           << "Error code = " << returnStatus << "\n ";
0403     }
0404 
0405     size_t hdrSize = hasHeader ? 4 : 0;
0406     stream.next_in = (const uint8_t*)(inputBuffer + hdrSize);
0407     stream.avail_in = (size_t)(inputSize - hdrSize);
0408     stream.next_out = (uint8_t*)&outputBuffer[0];
0409     stream.avail_out = (size_t)uncompressedSize;
0410 
0411     returnStatus = lzma_code(&stream, LZMA_FINISH);
0412     if (returnStatus != LZMA_STREAM_END) {
0413       lzma_end(&stream);
0414       throw cms::Exception("StreamDeserializationLZM", "LZMA uncompression error")
0415           << "Error code = " << returnStatus << "\n ";
0416     }
0417     lzma_end(&stream);
0418 
0419     uncompressedSize = (unsigned int)stream.total_out;
0420 
0421     FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0422     if (origSize != uncompressedSize) {
0423       // we throw an error and return without event! null pointer
0424       throw cms::Exception("StreamDeserialization", "LZMA uncompression error")
0425           << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0426     }
0427 
0428     return uncompressedSize;
0429   }
0430 
0431   bool StreamerInputSource::isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize) {
0432     if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "ZS"))
0433       return true;
0434     else
0435       return false;
0436   }
0437 
0438   unsigned int StreamerInputSource::uncompressBufferZSTD(unsigned char* inputBuffer,
0439                                                          unsigned int inputSize,
0440                                                          std::vector<unsigned char>& outputBuffer,
0441                                                          unsigned int expectedFullSize,
0442                                                          bool hasHeader) {
0443     unsigned long uncompressedSize = expectedFullSize * 1.1;
0444     FDEBUG(1) << "Uncompress: original size = " << expectedFullSize << ", compressed size = " << inputSize << std::endl;
0445     outputBuffer.resize(uncompressedSize);
0446 
0447     size_t hdrSize = hasHeader ? 4 : 0;
0448     size_t ret = ZSTD_decompress(
0449         (void*)&(outputBuffer[0]), uncompressedSize, (const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
0450 
0451     if (ZSTD_isError(ret)) {
0452       throw cms::Exception("StreamDeserializationZSTD", "ZSTD uncompression error")
0453           << "Error core " << ret << ", message:" << ZSTD_getErrorName(ret);
0454     }
0455     return (unsigned int)ret;
0456   }
0457 
0458   void StreamerInputSource::resetAfterEndRun() {
0459     // called from an online streamer source to reset after a stop command
0460     // so an enable command will work
0461     resetLuminosityBlockAuxiliary();
0462     resetRunAuxiliary();
0463     assert(!eventCached());
0464     reset();
0465   }
0466 
0467   void StreamerInputSource::setRun(RunNumber_t) {
0468     // Need to define a dummy setRun here or else the InputSource::setRun is called
0469     // if we have a source inheriting from this and wants to define a setRun method
0470     throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n"
0471                                         << "Run number cannot be modified for this type of Input Source\n"
0472                                         << "Contact a Storage Manager Developer\n";
0473   }
0474 
0475   StreamerInputSource::EventPrincipalHolder::EventPrincipalHolder() : eventPrincipal_(nullptr) {}
0476 
0477   StreamerInputSource::EventPrincipalHolder::~EventPrincipalHolder() {}
0478 
0479   WrapperBase const* StreamerInputSource::EventPrincipalHolder::getIt(ProductID const& id) const {
0480     return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
0481   }
0482 
0483   std::optional<std::tuple<edm::WrapperBase const*, unsigned int>>
0484   StreamerInputSource::EventPrincipalHolder::getThinnedProduct(edm::ProductID const& id, unsigned int index) const {
0485     if (eventPrincipal_)
0486       return eventPrincipal_->getThinnedProduct(id, index);
0487     return std::nullopt;
0488   }
0489 
0490   void StreamerInputSource::EventPrincipalHolder::getThinnedProducts(ProductID const& pid,
0491                                                                      std::vector<WrapperBase const*>& wrappers,
0492                                                                      std::vector<unsigned int>& keys) const {
0493     if (eventPrincipal_)
0494       eventPrincipal_->getThinnedProducts(pid, wrappers, keys);
0495   }
0496 
0497   edm::OptionalThinnedKey StreamerInputSource::EventPrincipalHolder::getThinnedKeyFrom(
0498       edm::ProductID const& parent, unsigned int index, edm::ProductID const& thinned) const {
0499     if (eventPrincipal_) {
0500       return eventPrincipal_->getThinnedKeyFrom(parent, index, thinned);
0501     } else {
0502       return std::monostate{};
0503     }
0504   }
0505 
0506   unsigned int StreamerInputSource::EventPrincipalHolder::transitionIndex_() const {
0507     assert(eventPrincipal_ != nullptr);
0508     return eventPrincipal_->transitionIndex();
0509   }
0510 
0511   void StreamerInputSource::EventPrincipalHolder::setEventPrincipal(EventPrincipal* ep) { eventPrincipal_ = ep; }
0512 
0513   void StreamerInputSource::fillDescription(ParameterSetDescription& desc) { RawInputSource::fillDescription(desc); }
0514 }  // namespace edm