Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-31 04:19:42

0001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
0002 
0003 #include "IOPool/Streamer/interface/EventMessage.h"
0004 #include "IOPool/Streamer/interface/InitMessage.h"
0005 #include "IOPool/Streamer/interface/ClassFiller.h"
0006 
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/FileBlock.h"
0009 #include "DataFormats/Provenance/interface/BranchDescription.h"
0010 #include "DataFormats/Provenance/interface/ProductProvenance.h"
0011 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0012 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0013 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0014 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0015 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0016 #include "DataFormats/Provenance/interface/BranchListIndex.h"
0017 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0018 
0019 #include "zlib.h"
0020 #include "lzma.h"
0021 #include "zstd.h"
0022 
0023 #include "DataFormats/Common/interface/RefCoreStreamer.h"
0024 #include "FWCore/Utilities/interface/WrappedClassName.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0027 #include "FWCore/ParameterSet/interface/Registry.h"
0028 #include "FWCore/Utilities/interface/EDMException.h"
0029 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0030 #include "FWCore/Reflection/interface/DictionaryTools.h"
0031 
0032 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0033 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0034 #include "FWCore/Utilities/interface/DebugMacros.h"
0035 
0036 #include <string>
0037 #include <iostream>
0038 #include <set>
0039 
0040 namespace edm::streamer {
0041   namespace {
0042     int const init_size = 1024 * 1024;
0043   }
0044 
0045   StreamerInputSource::StreamerInputSource(ParameterSet const& pset, InputSourceDescription const& desc)
0046       : RawInputSource(pset, desc),
0047         tc_(getTClass(typeid(SendEvent))),
0048         dest_(init_size),
0049         xbuf_(TBuffer::kRead, init_size),
0050         sendEvent_(),
0051         eventPrincipalHolder_(),
0052         adjustEventToNewProductRegistry_(false),
0053         processName_(),
0054         protocolVersion_(0U) {}
0055 
0056   StreamerInputSource::~StreamerInputSource() {}
0057 
0058   // ---------------------------------------
0059   void StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, bool subsequent) {
0060     SendDescs const& descs = header.descs();
0061 
0062     FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
0063 
0064     if (subsequent) {
0065       ProductRegistry pReg;
0066       pReg.updateFromInput(descs);
0067       std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
0068       if (!mergeInfo.empty()) {
0069         throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo;
0070       }
0071     } else {
0072       declareStreamers(descs);
0073       buildClassCache(descs);
0074       loadExtraClasses();
0075       if (!reg.frozen()) {
0076         reg.updateFromInput(descs);
0077       }
0078     }
0079   }
0080 
0081   void StreamerInputSource::declareStreamers(SendDescs const& descs) {
0082     std::vector<std::string> missingDictionaries;
0083     std::vector<std::string> branchNamesForMissing;
0084     std::vector<std::string> producedTypes;
0085     for (auto const& item : descs) {
0086       //pi->init();
0087       std::string const real_name = wrappedClassName(item.className());
0088       FDEBUG(6) << "declare: " << real_name << std::endl;
0089       if (!loadCap(real_name, missingDictionaries)) {
0090         branchNamesForMissing.emplace_back(item.branchName());
0091         producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
0092       }
0093     }
0094     if (!missingDictionaries.empty()) {
0095       std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
0096       throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
0097     }
0098   }
0099 
0100   void StreamerInputSource::buildClassCache(SendDescs const& descs) {
0101     for (auto const& item : descs) {
0102       //pi->init();
0103       std::string const real_name = wrappedClassName(item.className());
0104       FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
0105       doBuildRealData(real_name);
0106     }
0107   }
0108 
0109   /**
0110    * Deserializes the specified init message into a SendJobHeader object
0111    * (which is related to the product registry).
0112    */
0113   std::unique_ptr<SendJobHeader> StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
0114     if (initView.code() != Header::INIT)
0115       throw cms::Exception("StreamTranslation", "Registry deserialization error")
0116           << "received wrong message type: expected INIT, got " << initView.code() << "\n";
0117 
0118     //Get the process name and store if for Protocol version 4 and above.
0119     if (initView.protocolVersion() > 3) {
0120       processName_ = initView.processName();
0121       protocolVersion_ = initView.protocolVersion();
0122 
0123       FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = " << processName_ << std::endl;
0124       FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= " << protocolVersion_ << std::endl;
0125     }
0126 
0127     // calculate the adler32 checksum
0128     uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(), initView.descLength());
0129     //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
0130     //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
0131     //          << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
0132     if ((uint32)adler32_chksum != initView.adler32_chksum()) {
0133       // skip event (based on option?) or throw exception?
0134       throw cms::Exception("StreamDeserialization", "Checksum error")
0135           << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum()
0136           << " host name = " << initView.hostName() << std::endl;
0137     }
0138 
0139     TClass* desc = getTClass(typeid(SendJobHeader));
0140 
0141     TBufferFile xbuf(
0142         TBuffer::kRead, initView.descLength(), const_cast<char*>((char const*)initView.descData()), kFALSE);
0143     RootDebug tracer(10, 10);
0144     std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
0145 
0146     if (sd.get() == nullptr) {
0147       throw cms::Exception("StreamTranslation", "Registry deserialization error")
0148           << "Could not read the initial product registry list\n";
0149     }
0150 
0151     sd->initializeTransients();
0152     return sd;
0153   }
0154 
0155   /**
0156    * Deserializes the specified init message into a SendJobHeader object
0157    * and merges registries.
0158    */
0159   void StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
0160     std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
0161     mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
0162     if (subsequent) {
0163       adjustEventToNewProductRegistry_ = true;
0164     }
0165     SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
0166     pset::Registry& psetRegistry = *pset::Registry::instance();
0167     for (auto const& item : psetMap) {
0168       ParameterSet pset(item.second.pset());
0169       pset.setID(item.first);
0170       psetRegistry.insertMapped(pset);
0171     }
0172   }
0173 
0174   void StreamerInputSource::updateEventMetaData() {
0175     branchIDListHelper()->updateFromInput(sendEvent_->branchIDLists());
0176     thinnedAssociationsHelper()->updateFromPrimaryInput(sendEvent_->thinnedAssociationsHelper());
0177   }
0178 
0179   uint32_t StreamerInputSource::eventMetaDataChecksum(EventMsgView const& eventView) const {
0180     return eventView.adler32_chksum();
0181   }
0182 
0183   void StreamerInputSource::deserializeEventMetaData(EventMsgView const& eventView) {
0184     deserializeEventCommon(eventView, true);
0185   }
0186   /**
0187    * Deserializes the specified event message.
0188    */
0189   void StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
0190     deserializeEventCommon(eventView, false);
0191   }
0192 
0193   void StreamerInputSource::deserializeEventCommon(EventMsgView const& eventView, bool isMetaData) {
0194     if (eventView.code() != Header::EVENT)
0195       throw cms::Exception("StreamTranslation", "Event deserialization error")
0196           << "received wrong message type: expected EVENT, got " << eventView.code() << "\n";
0197     FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " "
0198               << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData()
0199               << std::endl;
0200     // uncompress if we need to
0201     // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
0202     // need to get rid of this when 090 MTCC streamers are gotten rid of
0203     unsigned long origsize = eventView.origDataSize();
0204     unsigned long dest_size;  //(should be >= eventView.origDataSize())
0205 
0206     uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
0207     //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
0208     //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
0209     //          << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
0210     if (static_cast<uint32>(adler32_chksum) != eventView.adler32_chksum()) {
0211       // skip event (based on option?) or throw exception?
0212       throw cms::Exception("StreamDeserialization", "Checksum error")
0213           << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum()
0214           << " host name = " << eventView.hostName() << std::endl;
0215     }
0216     if (origsize != 78 && origsize != 0) {
0217       // compressed
0218       if (isBufferLZMA((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0219         dest_size = uncompressBufferLZMA(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0220                                          eventView.eventLength(),
0221                                          dest_,
0222                                          origsize);
0223       } else if (isBufferZSTD((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0224         dest_size = uncompressBufferZSTD(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0225                                          eventView.eventLength(),
0226                                          dest_,
0227                                          origsize);
0228       } else
0229         dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0230                                      eventView.eventLength(),
0231                                      dest_,
0232                                      origsize);
0233     } else {  // not compressed
0234       // we need to copy anyway the buffer as we are using dest in xbuf
0235       dest_size = eventView.eventLength();
0236       dest_.resize(dest_size);
0237       unsigned char* pos = (unsigned char*)&dest_[0];
0238       unsigned char const* from = (unsigned char const*)eventView.eventData();
0239       std::copy(from, from + dest_size, pos);
0240     }
0241     //TBuffer xbuf(TBuffer::kRead, dest_size,
0242     //             (char const*) &dest[0],kFALSE);
0243     //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
0244     //             (char const*) eventView.eventData(),kFALSE);
0245     xbuf_.Reset();
0246     xbuf_.SetBuffer(&dest_[0], dest_size, kFALSE);
0247     RootDebug tracer(10, 10);
0248 
0249     //We do not yet know which EventPrincipal we will use, therefore
0250     // we are using a new EventPrincipalHolder as a proxy. We need to
0251     // make a new one instead of reusing the same one becuase when running
0252     // multi-threaded there will be multiple EventPrincipals being used
0253     // simultaneously.
0254     eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>();  // propagate_const<T> has no reset() function
0255     setRefCoreStreamer(eventPrincipalHolder_.get());
0256     {
0257       std::shared_ptr<void> refCoreStreamerGuard(nullptr, [](void*) {
0258         setRefCoreStreamer();
0259         ;
0260       });
0261       sendEvent_ = std::unique_ptr<SendEvent>(reinterpret_cast<SendEvent*>(xbuf_.ReadObjectAny(tc_)));
0262     }
0263 
0264     if (sendEvent_.get() == nullptr) {
0265       throw cms::Exception("StreamTranslation", "Event deserialization error")
0266           << "got a null event from input stream\n";
0267     }
0268 
0269     if (isMetaData) {
0270       eventMetaDataChecksum_ = adler32_chksum;
0271       return;
0272     }
0273 
0274     if (sendEvent_->metaDataChecksum() != eventMetaDataChecksum_) {
0275       throw cms::Exception("StreamTranslation") << " meta data checksum from event " << sendEvent_->metaDataChecksum()
0276                                                 << " does not match last read meta data " << eventMetaDataChecksum_;
0277     }
0278 
0279     processHistoryRegistryForUpdate().registerProcessHistory(sendEvent_->processHistory());
0280 
0281     FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
0282     if (runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run() ||
0283         runAuxiliary()->processHistoryID() != sendEvent_->processHistory().id()) {
0284       RunAuxiliary* runAuxiliary =
0285           new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0286       runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0287       setRunAuxiliary(runAuxiliary);
0288       resetLuminosityBlockAuxiliary();
0289     }
0290     if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
0291       LuminosityBlockAuxiliary* luminosityBlockAuxiliary = new LuminosityBlockAuxiliary(
0292           runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0293       luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0294       setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
0295     }
0296     setEventCached();
0297   }
0298 
0299   void StreamerInputSource::read(EventPrincipal& eventPrincipal) {
0300     if (adjustEventToNewProductRegistry_) {
0301       eventPrincipal.adjustIndexesAfterProductRegistryAddition();
0302       bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
0303       assert(eventOK);
0304       adjustEventToNewProductRegistry_ = false;
0305     }
0306     EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
0307     BranchListIndexes indexes(sendEvent_->branchListIndexes());
0308     branchIDListHelper()->fixBranchListIndexes(indexes);
0309     auto history = processHistoryRegistry().getMapped(sendEvent_->aux().processHistoryID());
0310     eventPrincipal.fillEventPrincipal(sendEvent_->aux(), history, std::move(ids), std::move(indexes));
0311 
0312     //We now know which eventPrincipal to use and we can reuse the slot in
0313     // streamToEventPrincipalHolders to own the memory
0314     eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
0315     if (streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() + 1) {
0316       streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() + 1);
0317     }
0318     streamToEventPrincipalHolders_[eventPrincipal.streamID().value()] = std::move(eventPrincipalHolder_);
0319 
0320     // no process name list handling
0321 
0322     SendProds& sps = sendEvent_->products();
0323     for (auto& spitem : sps) {
0324       FDEBUG(10) << "check prodpair" << std::endl;
0325       if (spitem.desc() == nullptr)
0326         throw cms::Exception("StreamTranslation", "Empty Provenance");
0327       FDEBUG(5) << "Prov:"
0328                 << " " << spitem.desc()->className() << " " << spitem.desc()->productInstanceName() << " "
0329                 << spitem.desc()->branchID() << std::endl;
0330 
0331       BranchDescription const branchDesc(*spitem.desc());
0332       // This ProductProvenance constructor inserts into the entry description registry
0333       if (spitem.parents()) {
0334         std::optional<ProductProvenance> productProvenance{std::in_place, spitem.branchID(), *spitem.parents()};
0335         if (spitem.prod() != nullptr) {
0336           FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0337           eventPrincipal.putOnRead(branchDesc,
0338                                    std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())),
0339                                    std::move(productProvenance));
0340           FDEBUG(10) << "addproduct done" << std::endl;
0341         } else {
0342           FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0343           eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), std::move(productProvenance));
0344           FDEBUG(10) << "addproduct empty done" << std::endl;
0345         }
0346       } else {
0347         std::optional<ProductProvenance> productProvenance;
0348         if (spitem.prod() != nullptr) {
0349           FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0350           eventPrincipal.putOnRead(
0351               branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
0352           FDEBUG(10) << "addproduct done" << std::endl;
0353         } else {
0354           FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0355           eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
0356           FDEBUG(10) << "addproduct empty done" << std::endl;
0357         }
0358       }
0359       spitem.clear();
0360     }
0361 
0362     FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
0363   }
0364 
0365   /**
0366    * Uncompresses the data in the specified input buffer into the
0367    * specified output buffer.  The inputSize should be set to the size
0368    * of the compressed data in the inputBuffer.  The expectedFullSize should
0369    * be set to the original size of the data (before compression).
0370    * Returns the actual size of the uncompressed data.
0371    * Errors are reported by throwing exceptions.
0372    */
0373   unsigned int StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
0374                                                      unsigned int inputSize,
0375                                                      std::vector<unsigned char>& outputBuffer,
0376                                                      unsigned int expectedFullSize) {
0377     unsigned long origSize = expectedFullSize;
0378     unsigned long uncompressedSize = expectedFullSize * 1.1;
0379     FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0380     outputBuffer.resize(uncompressedSize);
0381     int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize);  // do not need compression level
0382     //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
0383     if (ret == Z_OK) {
0384       // check the length against original uncompressed length
0385       FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0386       if (origSize != uncompressedSize) {
0387         // we throw an error and return without event! null pointer
0388         throw cms::Exception("StreamDeserialization", "Uncompression error")
0389             << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0390       }
0391     } else {
0392       // we throw an error and return without event! null pointer
0393       throw cms::Exception("StreamDeserialization", "Uncompression error") << "Error code = " << ret << "\n ";
0394     }
0395     return (unsigned int)uncompressedSize;
0396   }
0397 
0398   bool StreamerInputSource::isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize) {
0399     if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "XZ"))
0400       return true;
0401     else
0402       return false;
0403   }
0404 
0405   unsigned int StreamerInputSource::uncompressBufferLZMA(unsigned char* inputBuffer,
0406                                                          unsigned int inputSize,
0407                                                          std::vector<unsigned char>& outputBuffer,
0408                                                          unsigned int expectedFullSize,
0409                                                          bool hasHeader) {
0410     unsigned long origSize = expectedFullSize;
0411     unsigned long uncompressedSize = expectedFullSize * 1.1;
0412     FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0413     outputBuffer.resize(uncompressedSize);
0414 
0415     lzma_stream stream = LZMA_STREAM_INIT;
0416     lzma_ret returnStatus;
0417 
0418     returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
0419     if (returnStatus != LZMA_OK) {
0420       throw cms::Exception("StreamDeserializationLZM", "LZMA stream decoder error")
0421           << "Error code = " << returnStatus << "\n ";
0422     }
0423 
0424     size_t hdrSize = hasHeader ? 4 : 0;
0425     stream.next_in = (const uint8_t*)(inputBuffer + hdrSize);
0426     stream.avail_in = (size_t)(inputSize - hdrSize);
0427     stream.next_out = (uint8_t*)&outputBuffer[0];
0428     stream.avail_out = (size_t)uncompressedSize;
0429 
0430     returnStatus = lzma_code(&stream, LZMA_FINISH);
0431     if (returnStatus != LZMA_STREAM_END) {
0432       lzma_end(&stream);
0433       throw cms::Exception("StreamDeserializationLZM", "LZMA uncompression error")
0434           << "Error code = " << returnStatus << "\n ";
0435     }
0436     lzma_end(&stream);
0437 
0438     uncompressedSize = (unsigned int)stream.total_out;
0439 
0440     FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0441     if (origSize != uncompressedSize) {
0442       // we throw an error and return without event! null pointer
0443       throw cms::Exception("StreamDeserialization", "LZMA uncompression error")
0444           << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0445     }
0446 
0447     return uncompressedSize;
0448   }
0449 
0450   bool StreamerInputSource::isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize) {
0451     if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "ZS"))
0452       return true;
0453     else
0454       return false;
0455   }
0456 
0457   unsigned int StreamerInputSource::uncompressBufferZSTD(unsigned char* inputBuffer,
0458                                                          unsigned int inputSize,
0459                                                          std::vector<unsigned char>& outputBuffer,
0460                                                          unsigned int expectedFullSize,
0461                                                          bool hasHeader) {
0462     unsigned long uncompressedSize = expectedFullSize * 1.1;
0463     FDEBUG(1) << "Uncompress: original size = " << expectedFullSize << ", compressed size = " << inputSize << std::endl;
0464     outputBuffer.resize(uncompressedSize);
0465 
0466     size_t hdrSize = hasHeader ? 4 : 0;
0467     size_t ret = ZSTD_decompress(
0468         (void*)&(outputBuffer[0]), uncompressedSize, (const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
0469 
0470     if (ZSTD_isError(ret)) {
0471       throw cms::Exception("StreamDeserializationZSTD", "ZSTD uncompression error")
0472           << "Error core " << ret << ", message:" << ZSTD_getErrorName(ret);
0473     }
0474     return (unsigned int)ret;
0475   }
0476 
0477   void StreamerInputSource::resetAfterEndRun() {
0478     // called from an online streamer source to reset after a stop command
0479     // so an enable command will work
0480     resetLuminosityBlockAuxiliary();
0481     resetRunAuxiliary();
0482     assert(!eventCached());
0483     reset();
0484   }
0485 
0486   void StreamerInputSource::setRun(RunNumber_t) {
0487     // Need to define a dummy setRun here or else the InputSource::setRun is called
0488     // if we have a source inheriting from this and wants to define a setRun method
0489     throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n"
0490                                         << "Run number cannot be modified for this type of Input Source\n"
0491                                         << "Contact a Storage Manager Developer\n";
0492   }
0493 
0494   StreamerInputSource::EventPrincipalHolder::EventPrincipalHolder() : eventPrincipal_(nullptr) {}
0495 
0496   StreamerInputSource::EventPrincipalHolder::~EventPrincipalHolder() {}
0497 
0498   WrapperBase const* StreamerInputSource::EventPrincipalHolder::getIt(ProductID const& id) const {
0499     return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
0500   }
0501 
0502   std::optional<std::tuple<edm::WrapperBase const*, unsigned int>>
0503   StreamerInputSource::EventPrincipalHolder::getThinnedProduct(edm::ProductID const& id, unsigned int index) const {
0504     if (eventPrincipal_)
0505       return eventPrincipal_->getThinnedProduct(id, index);
0506     return std::nullopt;
0507   }
0508 
0509   void StreamerInputSource::EventPrincipalHolder::getThinnedProducts(ProductID const& pid,
0510                                                                      std::vector<WrapperBase const*>& wrappers,
0511                                                                      std::vector<unsigned int>& keys) const {
0512     if (eventPrincipal_)
0513       eventPrincipal_->getThinnedProducts(pid, wrappers, keys);
0514   }
0515 
0516   edm::OptionalThinnedKey StreamerInputSource::EventPrincipalHolder::getThinnedKeyFrom(
0517       edm::ProductID const& parent, unsigned int index, edm::ProductID const& thinned) const {
0518     if (eventPrincipal_) {
0519       return eventPrincipal_->getThinnedKeyFrom(parent, index, thinned);
0520     } else {
0521       return std::monostate{};
0522     }
0523   }
0524 
0525   unsigned int StreamerInputSource::EventPrincipalHolder::transitionIndex_() const {
0526     assert(eventPrincipal_ != nullptr);
0527     return eventPrincipal_->transitionIndex();
0528   }
0529 
0530   void StreamerInputSource::EventPrincipalHolder::setEventPrincipal(EventPrincipal* ep) { eventPrincipal_ = ep; }
0531 
0532   void StreamerInputSource::fillDescription(ParameterSetDescription& desc) { RawInputSource::fillDescription(desc); }
0533 }  // namespace edm::streamer