File indexing completed on 2025-01-31 02:19:45
0001
0002
0003
0004
0005
0006
0007 #include "IOPool/Streamer/interface/StreamSerializer.h"
0008 #include "DataFormats/Provenance/interface/ProductDescription.h"
0009 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0010 #include "DataFormats/Provenance/interface/Parentage.h"
0011 #include "DataFormats/Provenance/interface/ProductProvenance.h"
0012 #include "DataFormats/Provenance/interface/SelectedProducts.h"
0013 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0014 #include "DataFormats/Provenance/interface/BranchListIndex.h"
0015 #include "IOPool/Streamer/interface/ClassFiller.h"
0016 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0017 #include "FWCore/Framework/interface/EventForOutput.h"
0018 #include "FWCore/ParameterSet/interface/Registry.h"
0019 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0020 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0021 #include "FWCore/ServiceRegistry/interface/Service.h"
0022 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0023
0024 #include "zlib.h"
0025 #include "lzma.h"
0026 #include "zstd.h"
0027 #include <algorithm>
0028 #include <cstdlib>
0029 #include <iostream>
0030 #include <vector>
0031
0032 namespace edm::streamer {
0033
0034
0035
0036
0037 StreamSerializer::StreamSerializer(SelectedProducts const *selections)
0038 : selections_(selections), tc_(getTClass(typeid(SendEvent))) {}
0039
0040
0041
0042
0043
0044 int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer) const {
0045 SendJobHeader::ParameterSetMap psetMap;
0046 pset::Registry::instance()->fillMap(psetMap);
0047 return serializeRegistry(data_buffer, psetMap);
0048 }
0049
0050 int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer,
0051 SendJobHeader::ParameterSetMap const &psetMap) const {
0052 FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
0053 SendJobHeader sd;
0054
0055 FDEBUG(9) << "Product List: " << std::endl;
0056
0057 for (auto const &selection : *selections_) {
0058 sd.push_back(*selection.first);
0059 FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl;
0060 }
0061 sd.setParameterSetMap(psetMap);
0062
0063 data_buffer.rootbuf_.Reset();
0064
0065 RootDebug tracer(10, 10);
0066
0067 TClass *tc = getTClass(typeid(SendJobHeader));
0068 int bres = data_buffer.rootbuf_.WriteObjectAny((char *)&sd, tc);
0069
0070 switch (bres) {
0071 case 0:
0072 {
0073 throw cms::Exception("StreamTranslation", "Registry serialization failed")
0074 << "StreamSerializer failed to serialize registry\n";
0075 break;
0076 }
0077 case 1:
0078 break;
0079 case 2:
0080 {
0081 throw cms::Exception("StreamTranslation", "Registry serialization truncated")
0082 << "StreamSerializer module attempted to serialize\n"
0083 << "a registry that is to big for the allocated buffers\n";
0084 break;
0085 }
0086 default:
0087 {
0088 throw cms::Exception("StreamTranslation", "Registry serialization failed")
0089 << "StreamSerializer module got an unknown error code\n"
0090 << " while attempting to serialize registry\n";
0091 break;
0092 }
0093 }
0094
0095 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
0096 data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
0097 data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
0098
0099 data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
0100
0101 return data_buffer.curr_space_used_;
0102 }
0103
0104
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124 int StreamSerializer::serializeEvent(SerializeDataBuffer &data_buffer,
0125 EventForOutput const &event,
0126 ParameterSetID const &selectorConfig,
0127 uint32_t metaDataChecksum,
0128 StreamerCompressionAlgo compressionAlgo,
0129 int compression_level,
0130 unsigned int reserveSize) const {
0131 EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
0132 selectionIDs.push_back(selectorConfig);
0133 SendEvent se(event.eventAuxiliary(),
0134 event.processHistory(),
0135 selectionIDs,
0136 event.branchListIndexes(),
0137 {},
0138 {},
0139 metaDataChecksum);
0140
0141
0142
0143
0144
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154
0155 for (auto const &selection : *selections_) {
0156 ProductDescription const &desc = *selection.first;
0157 BasicHandle result = event.getByToken(selection.second, desc.unwrappedTypeID());
0158 if (!result.isValid()) {
0159
0160
0161 se.products().push_back(StreamedProduct(desc));
0162 } else {
0163 if (result.provenance()->productProvenance()) {
0164 Parentage const *parentage =
0165 ParentageRegistry::instance()->getMapped(result.provenance()->productProvenance()->parentageID());
0166 assert(parentage);
0167 se.products().push_back(
0168 StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, &parentage->parents()));
0169 } else {
0170 se.products().push_back(StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, nullptr));
0171 }
0172 }
0173 }
0174 return serializeEventCommon(data_buffer, se, compressionAlgo, compression_level, reserveSize);
0175 }
0176
0177 int StreamSerializer::serializeEventMetaData(SerializeDataBuffer &data_buffer,
0178 const BranchIDLists &branchIDLists,
0179 ThinnedAssociationsHelper const &thinnedAssociationsHelper,
0180 StreamerCompressionAlgo compressionAlgo,
0181 int compression_level,
0182 unsigned int reserveSize) const {
0183 SendEvent se({}, {}, {}, {}, branchIDLists, thinnedAssociationsHelper, 0);
0184
0185 return serializeEventCommon(data_buffer, se, compressionAlgo, compression_level, reserveSize);
0186 }
0187
0188 int StreamSerializer::serializeEventCommon(SerializeDataBuffer &data_buffer,
0189 edm::SendEvent const &se,
0190 StreamerCompressionAlgo compressionAlgo,
0191 int compression_level,
0192 unsigned int reserveSize) const {
0193 data_buffer.rootbuf_.Reset();
0194 RootDebug tracer(10, 10);
0195
0196
0197 int bres = data_buffer.rootbuf_.WriteObjectAny(&se, tc_);
0198 switch (bres) {
0199 case 0:
0200 {
0201 throw cms::Exception("StreamTranslation", "Event serialization failed")
0202 << "StreamSerializer failed to serialize event: " << se.aux().id();
0203 break;
0204 }
0205 case 1:
0206 break;
0207 case 2:
0208 {
0209 throw cms::Exception("StreamTranslation", "Event serialization truncated")
0210 << "StreamSerializer module attempted to serialize an event\n"
0211 << "that is to big for the allocated buffers: " << se.aux().id();
0212 break;
0213 }
0214 default:
0215 {
0216 throw cms::Exception("StreamTranslation", "Event serialization failed")
0217 << "StreamSerializer module got an unknown error code\n"
0218 << " while attempting to serialize event: " << se.aux().id();
0219 break;
0220 }
0221 }
0222
0223 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
0224 data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
0225
0226 #if 0
0227 if(data_buffer.ptr_ != data_.ptr_) {
0228 std::cerr << "ROOT reset the buffer!!!!\n";
0229 data_.ptr_ = data_buffer.ptr_;
0230 }
0231 #endif
0232
0233
0234
0235
0236
0237
0238
0239 unsigned int dest_size = 0;
0240 switch (compressionAlgo) {
0241 case ZLIB:
0242 dest_size = compressBuffer((unsigned char *)data_buffer.rootbuf_.Buffer(),
0243 data_buffer.curr_event_size_,
0244 data_buffer.comp_buf_,
0245 compression_level,
0246 reserveSize);
0247 break;
0248 case LZMA:
0249 dest_size = compressBufferLZMA((unsigned char *)data_buffer.rootbuf_.Buffer(),
0250 data_buffer.curr_event_size_,
0251 data_buffer.comp_buf_,
0252 compression_level,
0253 reserveSize);
0254 break;
0255 case ZSTD:
0256 dest_size = compressBufferZSTD((unsigned char *)data_buffer.rootbuf_.Buffer(),
0257 data_buffer.curr_event_size_,
0258 data_buffer.comp_buf_,
0259 compression_level,
0260 reserveSize);
0261 break;
0262 default:
0263 dest_size = data_buffer.rootbuf_.Length();
0264 if (data_buffer.comp_buf_.size() < dest_size + reserveSize)
0265 data_buffer.comp_buf_.resize(dest_size + reserveSize);
0266 std::copy((char *)data_buffer.rootbuf_.Buffer(),
0267 (char *)data_buffer.rootbuf_.Buffer() + dest_size,
0268 (char *)(&data_buffer.comp_buf_[SerializeDataBuffer::reserve_size]));
0269 break;
0270 };
0271
0272 data_buffer.ptr_ = &data_buffer.comp_buf_[reserveSize];
0273 data_buffer.curr_space_used_ = dest_size;
0274
0275
0276 data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
0277
0278
0279 return data_buffer.curr_space_used_;
0280 }
0281
0282
0283
0284
0285
0286
0287 unsigned int StreamSerializer::compressBuffer(unsigned char *inputBuffer,
0288 unsigned int inputSize,
0289 std::vector<unsigned char> &outputBuffer,
0290 int compressionLevel,
0291 unsigned int reserveSize) {
0292 unsigned int resultSize = 0;
0293
0294
0295 unsigned long dest_size = (unsigned long)(double(inputSize) * 1.002 + 1.0) + 12;
0296
0297 if (outputBuffer.size() < dest_size + reserveSize)
0298 outputBuffer.resize(dest_size + reserveSize);
0299
0300
0301 int ret = compress2(&outputBuffer[reserveSize], &dest_size, inputBuffer, inputSize, compressionLevel);
0302
0303
0304 if (ret == Z_OK) {
0305
0306 resultSize = dest_size;
0307
0308 FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
0309 << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
0310 } else {
0311 throw cms::Exception("StreamSerializer", "compressBuffer")
0312 << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
0313 }
0314
0315 return resultSize;
0316 }
0317
0318
0319 unsigned int StreamSerializer::compressBufferLZMA(unsigned char *inputBuffer,
0320 unsigned int inputSize,
0321 std::vector<unsigned char> &outputBuffer,
0322 int compressionLevel,
0323 unsigned int reserveSize,
0324 bool addHeader) {
0325
0326 unsigned int hdr_size = addHeader ? 4 : 0;
0327 unsigned long dest_size = (unsigned long)(double(inputSize) * 1.01 + 1.0) + 12;
0328 if (outputBuffer.size() < dest_size + reserveSize)
0329 outputBuffer.resize(dest_size + reserveSize);
0330
0331
0332 uint32_t dict_size_est = inputSize / 4;
0333 lzma_stream stream = LZMA_STREAM_INIT;
0334 lzma_options_lzma opt_lzma2;
0335 lzma_filter filters[] = {
0336 {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
0337 {.id = LZMA_VLI_UNKNOWN, .options = nullptr},
0338 };
0339 lzma_ret returnStatus;
0340
0341 unsigned char *tgt = &outputBuffer[reserveSize];
0342
0343
0344
0345
0346
0347 if (compressionLevel > 9)
0348 compressionLevel = 9;
0349
0350 lzma_bool presetStatus = lzma_lzma_preset(&opt_lzma2, compressionLevel);
0351 if (presetStatus) {
0352 throw cms::Exception("StreamSerializer", "compressBufferLZMA") << "LZMA preset return status: " << presetStatus;
0353 }
0354
0355 if (LZMA_DICT_SIZE_MIN > dict_size_est) {
0356 dict_size_est = LZMA_DICT_SIZE_MIN;
0357 }
0358 if (opt_lzma2.dict_size > dict_size_est) {
0359
0360
0361
0362 opt_lzma2.dict_size = dict_size_est;
0363 }
0364
0365 returnStatus =
0366 lzma_stream_encoder(&stream,
0367 filters,
0368 LZMA_CHECK_NONE);
0369 if (returnStatus != LZMA_OK) {
0370 throw cms::Exception("StreamSerializer", "compressBufferLZMA")
0371 << "LZMA compression encoder return value: " << returnStatus;
0372 }
0373
0374 stream.next_in = (const uint8_t *)inputBuffer;
0375 stream.avail_in = (size_t)(inputSize);
0376
0377 stream.next_out = (uint8_t *)(&tgt[hdr_size]);
0378 stream.avail_out = (size_t)(dest_size - hdr_size);
0379
0380 returnStatus = lzma_code(&stream, LZMA_FINISH);
0381
0382 if (returnStatus != LZMA_STREAM_END) {
0383 lzma_end(&stream);
0384 throw cms::Exception("StreamSerializer", "compressBufferLZMA")
0385 << "LZMA compression return value: " << returnStatus;
0386 }
0387 lzma_end(&stream);
0388
0389
0390 if (addHeader) {
0391 tgt[0] = 'X';
0392 tgt[1] = 'Z';
0393 tgt[2] = 0;
0394 tgt[3] = 0;
0395 }
0396
0397 FDEBUG(1) << " LZMA original size = " << inputSize << " final size = " << stream.total_out
0398 << " ratio = " << double(stream.total_out) / double(inputSize) << std::endl;
0399
0400 return stream.total_out + hdr_size;
0401 }
0402
0403 unsigned int StreamSerializer::compressBufferZSTD(unsigned char *inputBuffer,
0404 unsigned int inputSize,
0405 std::vector<unsigned char> &outputBuffer,
0406 int compressionLevel,
0407 unsigned int reserveSize,
0408 bool addHeader) {
0409 unsigned int hdr_size = addHeader ? 4 : 0;
0410 unsigned int resultSize = 0;
0411
0412
0413 size_t worst_size = ZSTD_compressBound(inputSize);
0414
0415 if (outputBuffer.size() < worst_size + reserveSize + hdr_size)
0416 outputBuffer.resize(worst_size + reserveSize + hdr_size);
0417
0418
0419 unsigned char *tgt = &outputBuffer[reserveSize];
0420 if (addHeader) {
0421 tgt[0] = 'Z';
0422 tgt[1] = 'S';
0423 tgt[2] = 0;
0424 tgt[3] = 0;
0425 }
0426
0427
0428 size_t dest_size = ZSTD_compress(
0429 (void *)&outputBuffer[reserveSize + hdr_size], worst_size, (void *)inputBuffer, inputSize, compressionLevel);
0430
0431
0432 if (!ZSTD_isError(dest_size)) {
0433
0434 resultSize = (unsigned int)dest_size + hdr_size;
0435
0436 FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
0437 << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
0438 } else {
0439 throw cms::Exception("StreamSerializer", "compressBuffer")
0440 << "Compression (ZSTD) Error: " << ZSTD_getErrorName(dest_size);
0441 }
0442
0443 return resultSize;
0444 }
0445
0446 }