File indexing completed on 2025-05-23 02:05:09
0001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
0002
0003 #include "IOPool/Streamer/interface/EventMessage.h"
0004 #include "IOPool/Streamer/interface/InitMessage.h"
0005 #include "IOPool/Streamer/interface/ClassFiller.h"
0006
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/FileBlock.h"
0009 #include "DataFormats/Provenance/interface/ProductDescription.h"
0010 #include "DataFormats/Provenance/interface/ProductProvenance.h"
0011 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0012 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0013 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0014 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0015 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0016 #include "DataFormats/Provenance/interface/BranchListIndex.h"
0017 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0018
0019 #include "zlib.h"
0020 #include "lzma.h"
0021 #include "zstd.h"
0022
0023 #include "DataFormats/Common/interface/RefCoreStreamer.h"
0024 #include "FWCore/Utilities/interface/WrappedClassName.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0027 #include "FWCore/ParameterSet/interface/Registry.h"
0028 #include "FWCore/Utilities/interface/EDMException.h"
0029 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0030 #include "FWCore/Reflection/interface/DictionaryTools.h"
0031
0032 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0033 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0034 #include "FWCore/Utilities/interface/DebugMacros.h"
0035
0036 #include <string>
0037 #include <iostream>
0038 #include <set>
0039
0040 namespace edm::streamer {
0041 namespace {
0042 int const init_size = 1024 * 1024;
0043 }
0044
0045 StreamerInputSource::StreamerInputSource(ParameterSet const& pset, InputSourceDescription const& desc)
0046 : RawInputSource(pset, desc),
0047 tc_(getTClass(typeid(SendEvent))),
0048 dest_(init_size),
0049 xbuf_(TBuffer::kRead, init_size),
0050 sendEvent_(),
0051 eventPrincipalHolder_(),
0052 processName_(),
0053 protocolVersion_(0U) {}
0054
0055 StreamerInputSource::~StreamerInputSource() {}
0056
0057
0058 void StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, bool subsequent) {
0059 SendDescs const& descs = header.descs();
0060
0061 FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
0062
0063 if (subsequent) {
0064 ProductRegistry pReg;
0065 pReg.updateFromInput(descs);
0066 std::string mergeInfo = reg.merge(pReg, std::string(), ProductDescription::Permissive);
0067 if (!mergeInfo.empty()) {
0068 throw cms::Exception("MismatchedInput", "StreamerInputSource::mergeIntoRegistry") << mergeInfo;
0069 }
0070 } else {
0071 declareStreamers(descs);
0072 buildClassCache(descs);
0073 loadExtraClasses();
0074 if (!reg.frozen()) {
0075 reg.updateFromInput(descs);
0076 }
0077 }
0078 }
0079
0080 void StreamerInputSource::declareStreamers(SendDescs const& descs) {
0081 std::vector<std::string> missingDictionaries;
0082 std::vector<std::string> branchNamesForMissing;
0083 std::vector<std::string> producedTypes;
0084 for (auto const& item : descs) {
0085
0086 std::string const real_name = wrappedClassName(item.className());
0087 FDEBUG(6) << "declare: " << real_name << std::endl;
0088 if (!loadCap(real_name, missingDictionaries)) {
0089 branchNamesForMissing.emplace_back(item.branchName());
0090 producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
0091 }
0092 }
0093 if (!missingDictionaries.empty()) {
0094 std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
0095 throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
0096 }
0097 }
0098
0099 void StreamerInputSource::buildClassCache(SendDescs const& descs) {
0100 for (auto const& item : descs) {
0101
0102 std::string const real_name = wrappedClassName(item.className());
0103 FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
0104 doBuildRealData(real_name);
0105 }
0106 }
0107
0108
0109
0110
0111
0112 std::unique_ptr<SendJobHeader> StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
0113 if (initView.code() != Header::INIT)
0114 throw cms::Exception("StreamTranslation", "Registry deserialization error")
0115 << "received wrong message type: expected INIT, got " << initView.code() << "\n";
0116
0117
0118 if (initView.protocolVersion() > 3) {
0119 processName_ = initView.processName();
0120 protocolVersion_ = initView.protocolVersion();
0121
0122 FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = " << processName_ << std::endl;
0123 FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= " << protocolVersion_ << std::endl;
0124 }
0125
0126
0127 uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(), initView.descLength());
0128
0129
0130
0131 if ((uint32)adler32_chksum != initView.adler32_chksum()) {
0132
0133 throw cms::Exception("StreamDeserialization", "Checksum error")
0134 << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum()
0135 << " host name = " << initView.hostName() << std::endl;
0136 }
0137
0138 TClass* desc = getTClass(typeid(SendJobHeader));
0139
0140 TBufferFile xbuf(
0141 TBuffer::kRead, initView.descLength(), const_cast<char*>((char const*)initView.descData()), kFALSE);
0142 RootDebug tracer(10, 10);
0143 std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
0144
0145 if (sd.get() == nullptr) {
0146 throw cms::Exception("StreamTranslation", "Registry deserialization error")
0147 << "Could not read the initial product registry list\n";
0148 }
0149
0150 sd->initializeTransients();
0151 return sd;
0152 }
0153
0154
0155
0156
0157
0158 void StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
0159 std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
0160 mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
0161 SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
0162 pset::Registry& psetRegistry = *pset::Registry::instance();
0163 for (auto const& item : psetMap) {
0164 ParameterSet pset(item.second.pset());
0165 pset.setID(item.first);
0166 psetRegistry.insertMapped(pset);
0167 }
0168 }
0169
0170 void StreamerInputSource::updateEventMetaData() {
0171 branchIDListHelper()->updateFromInput(sendEvent_->branchIDLists());
0172 thinnedAssociationsHelper()->updateFromPrimaryInput(sendEvent_->thinnedAssociationsHelper());
0173 }
0174
0175 uint32_t StreamerInputSource::eventMetaDataChecksum(EventMsgView const& eventView) const {
0176 return eventView.adler32_chksum();
0177 }
0178
0179 void StreamerInputSource::deserializeEventMetaData(EventMsgView const& eventView) {
0180 deserializeEventCommon(eventView, true);
0181 }
0182
0183
0184
0185 void StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
0186 deserializeEventCommon(eventView, false);
0187 }
0188
0189 void StreamerInputSource::deserializeEventCommon(EventMsgView const& eventView, bool isMetaData) {
0190 if (eventView.code() != Header::EVENT)
0191 throw cms::Exception("StreamTranslation", "Event deserialization error")
0192 << "received wrong message type: expected EVENT, got " << eventView.code() << "\n";
0193 FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " "
0194 << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData()
0195 << std::endl;
0196
0197
0198
0199 unsigned long origsize = eventView.origDataSize();
0200 unsigned long dest_size;
0201
0202 uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
0203
0204
0205
0206 if (static_cast<uint32>(adler32_chksum) != eventView.adler32_chksum()) {
0207
0208 throw cms::Exception("StreamDeserialization", "Checksum error")
0209 << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum()
0210 << " host name = " << eventView.hostName() << std::endl;
0211 }
0212 if (origsize != 78 && origsize != 0) {
0213
0214 if (isBufferLZMA((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0215 dest_size = uncompressBufferLZMA(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0216 eventView.eventLength(),
0217 dest_,
0218 origsize);
0219 } else if (isBufferZSTD((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0220 dest_size = uncompressBufferZSTD(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0221 eventView.eventLength(),
0222 dest_,
0223 origsize);
0224 } else
0225 dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0226 eventView.eventLength(),
0227 dest_,
0228 origsize);
0229 } else {
0230
0231 dest_size = eventView.eventLength();
0232 dest_.resize(dest_size);
0233 unsigned char* pos = (unsigned char*)&dest_[0];
0234 unsigned char const* from = (unsigned char const*)eventView.eventData();
0235 std::copy(from, from + dest_size, pos);
0236 }
0237
0238
0239
0240
0241 xbuf_.Reset();
0242 xbuf_.SetBuffer(&dest_[0], dest_size, kFALSE);
0243 RootDebug tracer(10, 10);
0244
0245
0246
0247
0248
0249
0250 eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>();
0251 {
0252 RefCoreStreamerGuard guard(eventPrincipalHolder_.get());
0253 sendEvent_ = std::unique_ptr<SendEvent>(reinterpret_cast<SendEvent*>(xbuf_.ReadObjectAny(tc_)));
0254 }
0255
0256 if (sendEvent_.get() == nullptr) {
0257 throw cms::Exception("StreamTranslation", "Event deserialization error")
0258 << "got a null event from input stream\n";
0259 }
0260
0261 if (isMetaData) {
0262 eventMetaDataChecksum_ = adler32_chksum;
0263 return;
0264 }
0265
0266 if (sendEvent_->metaDataChecksum() != eventMetaDataChecksum_) {
0267 throw cms::Exception("StreamTranslation") << " meta data checksum from event " << sendEvent_->metaDataChecksum()
0268 << " does not match last read meta data " << eventMetaDataChecksum_;
0269 }
0270
0271 processHistoryRegistryForUpdate().registerProcessHistory(sendEvent_->processHistory());
0272
0273 FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
0274 if (runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run() ||
0275 runAuxiliary()->processHistoryID() != sendEvent_->processHistory().id()) {
0276 RunAuxiliary* runAuxiliary =
0277 new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0278 runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0279 setRunAuxiliary(runAuxiliary);
0280 resetLuminosityBlockAuxiliary();
0281 }
0282 if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
0283 LuminosityBlockAuxiliary* luminosityBlockAuxiliary = new LuminosityBlockAuxiliary(
0284 runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0285 luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0286 setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
0287 }
0288 setEventCached();
0289 }
0290
0291 void StreamerInputSource::read(EventPrincipal& eventPrincipal) {
0292 EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
0293 BranchListIndexes indexes(sendEvent_->branchListIndexes());
0294 branchIDListHelper()->fixBranchListIndexes(indexes);
0295 auto history = processHistoryRegistry().getMapped(sendEvent_->aux().processHistoryID());
0296 eventPrincipal.fillEventPrincipal(sendEvent_->aux(), history, std::move(ids), std::move(indexes));
0297
0298
0299
0300 eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
0301 if (streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() + 1) {
0302 streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() + 1);
0303 }
0304 streamToEventPrincipalHolders_[eventPrincipal.streamID().value()] = std::move(eventPrincipalHolder_);
0305
0306
0307
0308 SendProds& sps = sendEvent_->products();
0309 for (auto& spitem : sps) {
0310 FDEBUG(10) << "check prodpair" << std::endl;
0311 if (spitem.desc() == nullptr)
0312 throw cms::Exception("StreamTranslation", "Empty Provenance");
0313 FDEBUG(5) << "Prov:"
0314 << " " << spitem.desc()->className() << " " << spitem.desc()->productInstanceName() << " "
0315 << spitem.desc()->branchID() << std::endl;
0316
0317 ProductDescription const branchDesc(*spitem.desc());
0318
0319 if (spitem.parents()) {
0320 std::optional<ProductProvenance> productProvenance{std::in_place, spitem.branchID(), *spitem.parents()};
0321 if (spitem.prod() != nullptr) {
0322 FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0323 eventPrincipal.putOnRead(
0324 branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
0325 FDEBUG(10) << "addproduct done" << std::endl;
0326 } else {
0327 FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0328 eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
0329 FDEBUG(10) << "addproduct empty done" << std::endl;
0330 }
0331 } else {
0332 std::optional<ProductProvenance> productProvenance;
0333 if (spitem.prod() != nullptr) {
0334 FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0335 eventPrincipal.putOnRead(
0336 branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
0337 FDEBUG(10) << "addproduct done" << std::endl;
0338 } else {
0339 FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0340 eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
0341 FDEBUG(10) << "addproduct empty done" << std::endl;
0342 }
0343 }
0344 spitem.clear();
0345 }
0346
0347 FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
0348 }
0349
0350
0351
0352
0353
0354
0355
0356
0357
0358 unsigned int StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
0359 unsigned int inputSize,
0360 std::vector<unsigned char>& outputBuffer,
0361 unsigned int expectedFullSize) {
0362 unsigned long origSize = expectedFullSize;
0363 unsigned long uncompressedSize = expectedFullSize * 1.1;
0364 FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0365 outputBuffer.resize(uncompressedSize);
0366 int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize);
0367
0368 if (ret == Z_OK) {
0369
0370 FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0371 if (origSize != uncompressedSize) {
0372
0373 throw cms::Exception("StreamDeserialization", "Uncompression error")
0374 << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0375 }
0376 } else {
0377
0378 throw cms::Exception("StreamDeserialization", "Uncompression error") << "Error code = " << ret << "\n ";
0379 }
0380 return (unsigned int)uncompressedSize;
0381 }
0382
0383 bool StreamerInputSource::isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize) {
0384 if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "XZ"))
0385 return true;
0386 else
0387 return false;
0388 }
0389
0390 unsigned int StreamerInputSource::uncompressBufferLZMA(unsigned char* inputBuffer,
0391 unsigned int inputSize,
0392 std::vector<unsigned char>& outputBuffer,
0393 unsigned int expectedFullSize,
0394 bool hasHeader) {
0395 unsigned long origSize = expectedFullSize;
0396 unsigned long uncompressedSize = expectedFullSize * 1.1;
0397 FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0398 outputBuffer.resize(uncompressedSize);
0399
0400 lzma_stream stream = LZMA_STREAM_INIT;
0401 lzma_ret returnStatus;
0402
0403 returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
0404 if (returnStatus != LZMA_OK) {
0405 throw cms::Exception("StreamDeserializationLZM", "LZMA stream decoder error")
0406 << "Error code = " << returnStatus << "\n ";
0407 }
0408
0409 size_t hdrSize = hasHeader ? 4 : 0;
0410 stream.next_in = (const uint8_t*)(inputBuffer + hdrSize);
0411 stream.avail_in = (size_t)(inputSize - hdrSize);
0412 stream.next_out = (uint8_t*)&outputBuffer[0];
0413 stream.avail_out = (size_t)uncompressedSize;
0414
0415 returnStatus = lzma_code(&stream, LZMA_FINISH);
0416 if (returnStatus != LZMA_STREAM_END) {
0417 lzma_end(&stream);
0418 throw cms::Exception("StreamDeserializationLZM", "LZMA uncompression error")
0419 << "Error code = " << returnStatus << "\n ";
0420 }
0421 lzma_end(&stream);
0422
0423 uncompressedSize = (unsigned int)stream.total_out;
0424
0425 FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0426 if (origSize != uncompressedSize) {
0427
0428 throw cms::Exception("StreamDeserialization", "LZMA uncompression error")
0429 << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0430 }
0431
0432 return uncompressedSize;
0433 }
0434
0435 bool StreamerInputSource::isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize) {
0436 if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "ZS"))
0437 return true;
0438 else
0439 return false;
0440 }
0441
0442 unsigned int StreamerInputSource::uncompressBufferZSTD(unsigned char* inputBuffer,
0443 unsigned int inputSize,
0444 std::vector<unsigned char>& outputBuffer,
0445 unsigned int expectedFullSize,
0446 bool hasHeader) {
0447 unsigned long uncompressedSize = expectedFullSize * 1.1;
0448 FDEBUG(1) << "Uncompress: original size = " << expectedFullSize << ", compressed size = " << inputSize << std::endl;
0449 outputBuffer.resize(uncompressedSize);
0450
0451 size_t hdrSize = hasHeader ? 4 : 0;
0452 size_t ret = ZSTD_decompress(
0453 (void*)&(outputBuffer[0]), uncompressedSize, (const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
0454
0455 if (ZSTD_isError(ret)) {
0456 throw cms::Exception("StreamDeserializationZSTD", "ZSTD uncompression error")
0457 << "Error core " << ret << ", message:" << ZSTD_getErrorName(ret);
0458 }
0459 return (unsigned int)ret;
0460 }
0461
0462 void StreamerInputSource::resetAfterEndRun() {
0463
0464
0465 resetLuminosityBlockAuxiliary();
0466 resetRunAuxiliary();
0467 assert(!eventCached());
0468 reset();
0469 }
0470
0471 void StreamerInputSource::setRun(RunNumber_t) {
0472
0473
0474 throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n"
0475 << "Run number cannot be modified for this type of Input Source\n"
0476 << "Contact a Storage Manager Developer\n";
0477 }
0478
0479 StreamerInputSource::EventPrincipalHolder::EventPrincipalHolder() : eventPrincipal_(nullptr) {}
0480
0481 StreamerInputSource::EventPrincipalHolder::~EventPrincipalHolder() {}
0482
0483 WrapperBase const* StreamerInputSource::EventPrincipalHolder::getIt(ProductID const& id) const {
0484 return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
0485 }
0486
0487 std::optional<std::tuple<edm::WrapperBase const*, unsigned int>>
0488 StreamerInputSource::EventPrincipalHolder::getThinnedProduct(edm::ProductID const& id, unsigned int index) const {
0489 if (eventPrincipal_)
0490 return eventPrincipal_->getThinnedProduct(id, index);
0491 return std::nullopt;
0492 }
0493
0494 void StreamerInputSource::EventPrincipalHolder::getThinnedProducts(ProductID const& pid,
0495 std::vector<WrapperBase const*>& wrappers,
0496 std::vector<unsigned int>& keys) const {
0497 if (eventPrincipal_)
0498 eventPrincipal_->getThinnedProducts(pid, wrappers, keys);
0499 }
0500
0501 edm::OptionalThinnedKey StreamerInputSource::EventPrincipalHolder::getThinnedKeyFrom(
0502 edm::ProductID const& parent, unsigned int index, edm::ProductID const& thinned) const {
0503 if (eventPrincipal_) {
0504 return eventPrincipal_->getThinnedKeyFrom(parent, index, thinned);
0505 } else {
0506 return std::monostate{};
0507 }
0508 }
0509
0510 unsigned int StreamerInputSource::EventPrincipalHolder::transitionIndex_() const {
0511 assert(eventPrincipal_ != nullptr);
0512 return eventPrincipal_->transitionIndex();
0513 }
0514
0515 void StreamerInputSource::EventPrincipalHolder::setEventPrincipal(EventPrincipal* ep) { eventPrincipal_ = ep; }
0516
0517 void StreamerInputSource::fillDescription(ParameterSetDescription& desc) { RawInputSource::fillDescription(desc); }
0518 }