File indexing completed on 2021-02-14 14:21:20
0001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
0002
0003 #include "IOPool/Streamer/interface/EventMessage.h"
0004 #include "IOPool/Streamer/interface/InitMessage.h"
0005 #include "IOPool/Streamer/interface/ClassFiller.h"
0006
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/FileBlock.h"
0009 #include "DataFormats/Provenance/interface/BranchDescription.h"
0010 #include "DataFormats/Provenance/interface/ProductProvenance.h"
0011 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0012 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0013 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0014 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0015 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0016 #include "DataFormats/Provenance/interface/BranchListIndex.h"
0017 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0018
0019 #include "zlib.h"
0020 #include "lzma.h"
0021 #include "zstd.h"
0022
0023 #include "DataFormats/Common/interface/RefCoreStreamer.h"
0024 #include "FWCore/Utilities/interface/WrappedClassName.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0027 #include "FWCore/ParameterSet/interface/Registry.h"
0028 #include "FWCore/Utilities/interface/EDMException.h"
0029 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0030 #include "FWCore/Reflection/interface/DictionaryTools.h"
0031
0032 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0033 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0034 #include "FWCore/Utilities/interface/DebugMacros.h"
0035
0036 #include <string>
0037 #include <iostream>
0038 #include <set>
0039
0040 namespace edm {
0041 namespace {
0042 int const init_size = 1024 * 1024;
0043 }
0044
0045 StreamerInputSource::StreamerInputSource(ParameterSet const& pset, InputSourceDescription const& desc)
0046 : RawInputSource(pset, desc),
0047 tc_(getTClass(typeid(SendEvent))),
0048 dest_(init_size),
0049 xbuf_(TBuffer::kRead, init_size),
0050 sendEvent_(),
0051 eventPrincipalHolder_(),
0052 adjustEventToNewProductRegistry_(false),
0053 processName_(),
0054 protocolVersion_(0U) {}
0055
0056 StreamerInputSource::~StreamerInputSource() {}
0057
0058
0059 void StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header,
0060 ProductRegistry& reg,
0061 BranchIDListHelper& branchIDListHelper,
0062 ThinnedAssociationsHelper& thinnedHelper,
0063 bool subsequent) {
0064 SendDescs const& descs = header.descs();
0065
0066 FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
0067
0068 if (subsequent) {
0069 ProductRegistry pReg;
0070 pReg.updateFromInput(descs);
0071 std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
0072 if (!mergeInfo.empty()) {
0073 throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo;
0074 }
0075 branchIDListHelper.updateFromInput(header.branchIDLists());
0076 thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
0077 } else {
0078 declareStreamers(descs);
0079 buildClassCache(descs);
0080 loadExtraClasses();
0081 if (!reg.frozen()) {
0082 reg.updateFromInput(descs);
0083 }
0084 branchIDListHelper.updateFromInput(header.branchIDLists());
0085 thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
0086 }
0087 }
0088
0089 void StreamerInputSource::declareStreamers(SendDescs const& descs) {
0090 std::vector<std::string> missingDictionaries;
0091 std::vector<std::string> branchNamesForMissing;
0092 std::vector<std::string> producedTypes;
0093 for (auto const& item : descs) {
0094
0095 std::string const real_name = wrappedClassName(item.className());
0096 FDEBUG(6) << "declare: " << real_name << std::endl;
0097 if (!loadCap(real_name, missingDictionaries)) {
0098 branchNamesForMissing.emplace_back(item.branchName());
0099 producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
0100 }
0101 }
0102 if (!missingDictionaries.empty()) {
0103 std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
0104 throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
0105 }
0106 }
0107
0108 void StreamerInputSource::buildClassCache(SendDescs const& descs) {
0109 for (auto const& item : descs) {
0110
0111 std::string const real_name = wrappedClassName(item.className());
0112 FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
0113 doBuildRealData(real_name);
0114 }
0115 }
0116
0117
0118
0119
0120
0121 std::unique_ptr<SendJobHeader> StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
0122 if (initView.code() != Header::INIT)
0123 throw cms::Exception("StreamTranslation", "Registry deserialization error")
0124 << "received wrong message type: expected INIT, got " << initView.code() << "\n";
0125
0126
0127 if (initView.protocolVersion() > 3) {
0128 processName_ = initView.processName();
0129 protocolVersion_ = initView.protocolVersion();
0130
0131 FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = " << processName_ << std::endl;
0132 FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= " << protocolVersion_ << std::endl;
0133 }
0134
0135
0136 uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(), initView.descLength());
0137
0138
0139
0140 if ((uint32)adler32_chksum != initView.adler32_chksum()) {
0141
0142 throw cms::Exception("StreamDeserialization", "Checksum error")
0143 << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum()
0144 << " host name = " << initView.hostName() << std::endl;
0145 }
0146
0147 TClass* desc = getTClass(typeid(SendJobHeader));
0148
0149 TBufferFile xbuf(
0150 TBuffer::kRead, initView.descLength(), const_cast<char*>((char const*)initView.descData()), kFALSE);
0151 RootDebug tracer(10, 10);
0152 std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
0153
0154 if (sd.get() == nullptr) {
0155 throw cms::Exception("StreamTranslation", "Registry deserialization error")
0156 << "Could not read the initial product registry list\n";
0157 }
0158
0159 sd->initializeTransients();
0160 return sd;
0161 }
0162
0163
0164
0165
0166
0167 void StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
0168 std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
0169 mergeIntoRegistry(*sd, productRegistryUpdate(), *branchIDListHelper(), *thinnedAssociationsHelper(), subsequent);
0170 if (subsequent) {
0171 adjustEventToNewProductRegistry_ = true;
0172 }
0173 SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
0174 pset::Registry& psetRegistry = *pset::Registry::instance();
0175 for (auto const& item : psetMap) {
0176 ParameterSet pset(item.second.pset());
0177 pset.setID(item.first);
0178 psetRegistry.insertMapped(pset);
0179 }
0180 }
0181
0182
0183
0184
0185 void StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
0186 if (eventView.code() != Header::EVENT)
0187 throw cms::Exception("StreamTranslation", "Event deserialization error")
0188 << "received wrong message type: expected EVENT, got " << eventView.code() << "\n";
0189 FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " "
0190 << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData()
0191 << std::endl;
0192
0193
0194
0195 unsigned long origsize = eventView.origDataSize();
0196 unsigned long dest_size;
0197
0198 uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
0199
0200
0201
0202 if ((uint32)adler32_chksum != eventView.adler32_chksum()) {
0203
0204 throw cms::Exception("StreamDeserialization", "Checksum error")
0205 << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum()
0206 << " host name = " << eventView.hostName() << std::endl;
0207 }
0208 if (origsize != 78 && origsize != 0) {
0209
0210 if (isBufferLZMA((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0211 dest_size = uncompressBufferLZMA(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0212 eventView.eventLength(),
0213 dest_,
0214 origsize);
0215 } else if (isBufferZSTD((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0216 dest_size = uncompressBufferZSTD(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0217 eventView.eventLength(),
0218 dest_,
0219 origsize);
0220 } else
0221 dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0222 eventView.eventLength(),
0223 dest_,
0224 origsize);
0225 } else {
0226
0227 dest_size = eventView.eventLength();
0228 dest_.resize(dest_size);
0229 unsigned char* pos = (unsigned char*)&dest_[0];
0230 unsigned char const* from = (unsigned char const*)eventView.eventData();
0231 std::copy(from, from + dest_size, pos);
0232 }
0233
0234
0235
0236
0237 xbuf_.Reset();
0238 xbuf_.SetBuffer(&dest_[0], dest_size, kFALSE);
0239 RootDebug tracer(10, 10);
0240
0241
0242
0243
0244
0245
0246 eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>();
0247 setRefCoreStreamer(eventPrincipalHolder_.get());
0248 {
0249 std::shared_ptr<void> refCoreStreamerGuard(nullptr, [](void*) {
0250 setRefCoreStreamer();
0251 ;
0252 });
0253 sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
0254 }
0255
0256 if (sendEvent_.get() == nullptr) {
0257 throw cms::Exception("StreamTranslation", "Event deserialization error")
0258 << "got a null event from input stream\n";
0259 }
0260 processHistoryRegistryForUpdate().registerProcessHistory(sendEvent_->processHistory());
0261
0262 FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
0263 if (runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run() ||
0264 runAuxiliary()->processHistoryID() != sendEvent_->processHistory().id()) {
0265 RunAuxiliary* runAuxiliary =
0266 new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0267 runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0268 setRunAuxiliary(runAuxiliary);
0269 resetLuminosityBlockAuxiliary();
0270 }
0271 if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
0272 LuminosityBlockAuxiliary* luminosityBlockAuxiliary = new LuminosityBlockAuxiliary(
0273 runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0274 luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0275 setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
0276 }
0277 setEventCached();
0278 }
0279
0280 void StreamerInputSource::read(EventPrincipal& eventPrincipal) {
0281 if (adjustEventToNewProductRegistry_) {
0282 eventPrincipal.adjustIndexesAfterProductRegistryAddition();
0283 bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
0284 assert(eventOK);
0285 adjustEventToNewProductRegistry_ = false;
0286 }
0287 EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
0288 BranchListIndexes indexes(sendEvent_->branchListIndexes());
0289 branchIDListHelper()->fixBranchListIndexes(indexes);
0290 auto history = processHistoryRegistry().getMapped(sendEvent_->aux().processHistoryID());
0291 eventPrincipal.fillEventPrincipal(sendEvent_->aux(), history, std::move(ids), std::move(indexes));
0292
0293
0294
0295 eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
0296 if (streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() + 1) {
0297 streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() + 1);
0298 }
0299 streamToEventPrincipalHolders_[eventPrincipal.streamID().value()] = std::move(eventPrincipalHolder_);
0300
0301
0302
0303 SendProds& sps = sendEvent_->products();
0304 for (auto& spitem : sps) {
0305 FDEBUG(10) << "check prodpair" << std::endl;
0306 if (spitem.desc() == nullptr)
0307 throw cms::Exception("StreamTranslation", "Empty Provenance");
0308 FDEBUG(5) << "Prov:"
0309 << " " << spitem.desc()->className() << " " << spitem.desc()->productInstanceName() << " "
0310 << spitem.desc()->branchID() << std::endl;
0311
0312 BranchDescription const branchDesc(*spitem.desc());
0313
0314 if (spitem.parents()) {
0315 std::optional<ProductProvenance> productProvenance{std::in_place, spitem.branchID(), *spitem.parents()};
0316 if (spitem.prod() != nullptr) {
0317 FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0318 eventPrincipal.putOnRead(branchDesc,
0319 std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())),
0320 std::move(productProvenance));
0321 FDEBUG(10) << "addproduct done" << std::endl;
0322 } else {
0323 FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0324 eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), std::move(productProvenance));
0325 FDEBUG(10) << "addproduct empty done" << std::endl;
0326 }
0327 } else {
0328 std::optional<ProductProvenance> productProvenance;
0329 if (spitem.prod() != nullptr) {
0330 FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0331 eventPrincipal.putOnRead(
0332 branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
0333 FDEBUG(10) << "addproduct done" << std::endl;
0334 } else {
0335 FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0336 eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
0337 FDEBUG(10) << "addproduct empty done" << std::endl;
0338 }
0339 }
0340 spitem.clear();
0341 }
0342
0343 FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
0344 }
0345
0346
0347
0348
0349
0350
0351
0352
0353
0354 unsigned int StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
0355 unsigned int inputSize,
0356 std::vector<unsigned char>& outputBuffer,
0357 unsigned int expectedFullSize) {
0358 unsigned long origSize = expectedFullSize;
0359 unsigned long uncompressedSize = expectedFullSize * 1.1;
0360 FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0361 outputBuffer.resize(uncompressedSize);
0362 int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize);
0363
0364 if (ret == Z_OK) {
0365
0366 FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0367 if (origSize != uncompressedSize) {
0368
0369 throw cms::Exception("StreamDeserialization", "Uncompression error")
0370 << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0371 }
0372 } else {
0373
0374 throw cms::Exception("StreamDeserialization", "Uncompression error") << "Error code = " << ret << "\n ";
0375 }
0376 return (unsigned int)uncompressedSize;
0377 }
0378
0379 bool StreamerInputSource::isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize) {
0380 if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "XZ"))
0381 return true;
0382 else
0383 return false;
0384 }
0385
0386 unsigned int StreamerInputSource::uncompressBufferLZMA(unsigned char* inputBuffer,
0387 unsigned int inputSize,
0388 std::vector<unsigned char>& outputBuffer,
0389 unsigned int expectedFullSize,
0390 bool hasHeader) {
0391 unsigned long origSize = expectedFullSize;
0392 unsigned long uncompressedSize = expectedFullSize * 1.1;
0393 FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0394 outputBuffer.resize(uncompressedSize);
0395
0396 lzma_stream stream = LZMA_STREAM_INIT;
0397 lzma_ret returnStatus;
0398
0399 returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
0400 if (returnStatus != LZMA_OK) {
0401 throw cms::Exception("StreamDeserializationLZM", "LZMA stream decoder error")
0402 << "Error code = " << returnStatus << "\n ";
0403 }
0404
0405 size_t hdrSize = hasHeader ? 4 : 0;
0406 stream.next_in = (const uint8_t*)(inputBuffer + hdrSize);
0407 stream.avail_in = (size_t)(inputSize - hdrSize);
0408 stream.next_out = (uint8_t*)&outputBuffer[0];
0409 stream.avail_out = (size_t)uncompressedSize;
0410
0411 returnStatus = lzma_code(&stream, LZMA_FINISH);
0412 if (returnStatus != LZMA_STREAM_END) {
0413 lzma_end(&stream);
0414 throw cms::Exception("StreamDeserializationLZM", "LZMA uncompression error")
0415 << "Error code = " << returnStatus << "\n ";
0416 }
0417 lzma_end(&stream);
0418
0419 uncompressedSize = (unsigned int)stream.total_out;
0420
0421 FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0422 if (origSize != uncompressedSize) {
0423
0424 throw cms::Exception("StreamDeserialization", "LZMA uncompression error")
0425 << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0426 }
0427
0428 return uncompressedSize;
0429 }
0430
0431 bool StreamerInputSource::isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize) {
0432 if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "ZS"))
0433 return true;
0434 else
0435 return false;
0436 }
0437
0438 unsigned int StreamerInputSource::uncompressBufferZSTD(unsigned char* inputBuffer,
0439 unsigned int inputSize,
0440 std::vector<unsigned char>& outputBuffer,
0441 unsigned int expectedFullSize,
0442 bool hasHeader) {
0443 unsigned long uncompressedSize = expectedFullSize * 1.1;
0444 FDEBUG(1) << "Uncompress: original size = " << expectedFullSize << ", compressed size = " << inputSize << std::endl;
0445 outputBuffer.resize(uncompressedSize);
0446
0447 size_t hdrSize = hasHeader ? 4 : 0;
0448 size_t ret = ZSTD_decompress(
0449 (void*)&(outputBuffer[0]), uncompressedSize, (const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
0450
0451 if (ZSTD_isError(ret)) {
0452 throw cms::Exception("StreamDeserializationZSTD", "ZSTD uncompression error")
0453 << "Error core " << ret << ", message:" << ZSTD_getErrorName(ret);
0454 }
0455 return (unsigned int)ret;
0456 }
0457
0458 void StreamerInputSource::resetAfterEndRun() {
0459
0460
0461 resetLuminosityBlockAuxiliary();
0462 resetRunAuxiliary();
0463 assert(!eventCached());
0464 reset();
0465 }
0466
0467 void StreamerInputSource::setRun(RunNumber_t) {
0468
0469
0470 throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n"
0471 << "Run number cannot be modified for this type of Input Source\n"
0472 << "Contact a Storage Manager Developer\n";
0473 }
0474
0475 StreamerInputSource::EventPrincipalHolder::EventPrincipalHolder() : eventPrincipal_(nullptr) {}
0476
0477 StreamerInputSource::EventPrincipalHolder::~EventPrincipalHolder() {}
0478
0479 WrapperBase const* StreamerInputSource::EventPrincipalHolder::getIt(ProductID const& id) const {
0480 return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
0481 }
0482
0483 std::optional<std::tuple<edm::WrapperBase const*, unsigned int>>
0484 StreamerInputSource::EventPrincipalHolder::getThinnedProduct(edm::ProductID const& id, unsigned int index) const {
0485 if (eventPrincipal_)
0486 return eventPrincipal_->getThinnedProduct(id, index);
0487 return std::nullopt;
0488 }
0489
0490 void StreamerInputSource::EventPrincipalHolder::getThinnedProducts(ProductID const& pid,
0491 std::vector<WrapperBase const*>& wrappers,
0492 std::vector<unsigned int>& keys) const {
0493 if (eventPrincipal_)
0494 eventPrincipal_->getThinnedProducts(pid, wrappers, keys);
0495 }
0496
0497 edm::OptionalThinnedKey StreamerInputSource::EventPrincipalHolder::getThinnedKeyFrom(
0498 edm::ProductID const& parent, unsigned int index, edm::ProductID const& thinned) const {
0499 if (eventPrincipal_) {
0500 return eventPrincipal_->getThinnedKeyFrom(parent, index, thinned);
0501 } else {
0502 return std::monostate{};
0503 }
0504 }
0505
0506 unsigned int StreamerInputSource::EventPrincipalHolder::transitionIndex_() const {
0507 assert(eventPrincipal_ != nullptr);
0508 return eventPrincipal_->transitionIndex();
0509 }
0510
0511 void StreamerInputSource::EventPrincipalHolder::setEventPrincipal(EventPrincipal* ep) { eventPrincipal_ = ep; }
0512
0513 void StreamerInputSource::fillDescription(ParameterSetDescription& desc) { RawInputSource::fillDescription(desc); }
0514 }