File indexing completed on 2023-10-25 09:53:48
0001
0002
0003
0004
0005
0006
0007 #include "IOPool/Streamer/interface/StreamSerializer.h"
0008 #include "DataFormats/Provenance/interface/BranchDescription.h"
0009 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0010 #include "DataFormats/Provenance/interface/Parentage.h"
0011 #include "DataFormats/Provenance/interface/ProductProvenance.h"
0012 #include "DataFormats/Provenance/interface/SelectedProducts.h"
0013 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0014 #include "DataFormats/Provenance/interface/BranchListIndex.h"
0015 #include "IOPool/Streamer/interface/ClassFiller.h"
0016 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0017 #include "FWCore/Framework/interface/ConstProductRegistry.h"
0018 #include "FWCore/Framework/interface/EventForOutput.h"
0019 #include "FWCore/ParameterSet/interface/Registry.h"
0020 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0021 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0022 #include "FWCore/ServiceRegistry/interface/Service.h"
0023 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0024
0025 #include "zlib.h"
0026 #include "lzma.h"
0027 #include "zstd.h"
0028 #include <algorithm>
0029 #include <cstdlib>
0030 #include <iostream>
0031 #include <vector>
0032
0033 namespace edm {
0034
0035
0036
0037
0038 StreamSerializer::StreamSerializer(SelectedProducts const *selections)
0039 : selections_(selections), tc_(getTClass(typeid(SendEvent))) {}
0040
0041
0042
0043
0044
0045 int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer,
0046 const BranchIDLists &branchIDLists,
0047 ThinnedAssociationsHelper const &thinnedAssociationsHelper) {
0048 SendJobHeader::ParameterSetMap psetMap;
0049 pset::Registry::instance()->fillMap(psetMap);
0050 return serializeRegistry(data_buffer, branchIDLists, thinnedAssociationsHelper, psetMap);
0051 }
0052
0053 int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer,
0054 const BranchIDLists &branchIDLists,
0055 ThinnedAssociationsHelper const &thinnedAssociationsHelper,
0056 SendJobHeader::ParameterSetMap const &psetMap) {
0057 FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
0058 SendJobHeader sd;
0059
0060 FDEBUG(9) << "Product List: " << std::endl;
0061
0062 for (auto const &selection : *selections_) {
0063 sd.push_back(*selection.first);
0064 FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl;
0065 }
0066 Service<ConstProductRegistry> reg;
0067 sd.setBranchIDLists(branchIDLists);
0068 sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
0069 sd.setParameterSetMap(psetMap);
0070
0071 data_buffer.rootbuf_.Reset();
0072
0073 RootDebug tracer(10, 10);
0074
0075 TClass *tc = getTClass(typeid(SendJobHeader));
0076 int bres = data_buffer.rootbuf_.WriteObjectAny((char *)&sd, tc);
0077
0078 switch (bres) {
0079 case 0:
0080 {
0081 throw cms::Exception("StreamTranslation", "Registry serialization failed")
0082 << "StreamSerializer failed to serialize registry\n";
0083 break;
0084 }
0085 case 1:
0086 break;
0087 case 2:
0088 {
0089 throw cms::Exception("StreamTranslation", "Registry serialization truncated")
0090 << "StreamSerializer module attempted to serialize\n"
0091 << "a registry that is to big for the allocated buffers\n";
0092 break;
0093 }
0094 default:
0095 {
0096 throw cms::Exception("StreamTranslation", "Registry serialization failed")
0097 << "StreamSerializer module got an unknown error code\n"
0098 << " while attempting to serialize registry\n";
0099 break;
0100 }
0101 }
0102
0103 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
0104 data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
0105 data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
0106
0107 data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
0108
0109 return data_buffer.curr_space_used_;
0110 }
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124
0125
0126
0127
0128
0129
0130
0131
0132 int StreamSerializer::serializeEvent(SerializeDataBuffer &data_buffer,
0133 EventForOutput const &event,
0134 ParameterSetID const &selectorConfig,
0135 StreamerCompressionAlgo compressionAlgo,
0136 int compression_level,
0137 unsigned int reserveSize) const {
0138 EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
0139 selectionIDs.push_back(selectorConfig);
0140 SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes());
0141
0142
0143
0144
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154
0155
0156 for (auto const &selection : *selections_) {
0157 BranchDescription const &desc = *selection.first;
0158 BasicHandle result = event.getByToken(selection.second, desc.unwrappedTypeID());
0159 if (!result.isValid()) {
0160
0161
0162 se.products().push_back(StreamedProduct(desc));
0163 } else {
0164 if (result.provenance()->productProvenance()) {
0165 Parentage const *parentage =
0166 ParentageRegistry::instance()->getMapped(result.provenance()->productProvenance()->parentageID());
0167 assert(parentage);
0168 se.products().push_back(
0169 StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, &parentage->parents()));
0170 } else {
0171 se.products().push_back(StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, nullptr));
0172 }
0173 }
0174 }
0175
0176 data_buffer.rootbuf_.Reset();
0177 RootDebug tracer(10, 10);
0178
0179
0180 int bres = data_buffer.rootbuf_.WriteObjectAny(&se, tc_);
0181 switch (bres) {
0182 case 0:
0183 {
0184 throw cms::Exception("StreamTranslation", "Event serialization failed")
0185 << "StreamSerializer failed to serialize event: " << event.id();
0186 break;
0187 }
0188 case 1:
0189 break;
0190 case 2:
0191 {
0192 throw cms::Exception("StreamTranslation", "Event serialization truncated")
0193 << "StreamSerializer module attempted to serialize an event\n"
0194 << "that is to big for the allocated buffers: " << event.id();
0195 break;
0196 }
0197 default:
0198 {
0199 throw cms::Exception("StreamTranslation", "Event serialization failed")
0200 << "StreamSerializer module got an unknown error code\n"
0201 << " while attempting to serialize event: " << event.id();
0202 break;
0203 }
0204 }
0205
0206 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
0207 data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
0208
0209 #if 0
0210 if(data_buffer.ptr_ != data_.ptr_) {
0211 std::cerr << "ROOT reset the buffer!!!!\n";
0212 data_.ptr_ = data_buffer.ptr_;
0213 }
0214 #endif
0215
0216
0217
0218
0219
0220
0221
0222 unsigned int dest_size = 0;
0223 switch (compressionAlgo) {
0224 case ZLIB:
0225 dest_size = compressBuffer((unsigned char *)data_buffer.rootbuf_.Buffer(),
0226 data_buffer.curr_event_size_,
0227 data_buffer.comp_buf_,
0228 compression_level,
0229 reserveSize);
0230 break;
0231 case LZMA:
0232 dest_size = compressBufferLZMA((unsigned char *)data_buffer.rootbuf_.Buffer(),
0233 data_buffer.curr_event_size_,
0234 data_buffer.comp_buf_,
0235 compression_level,
0236 reserveSize);
0237 break;
0238 case ZSTD:
0239 dest_size = compressBufferZSTD((unsigned char *)data_buffer.rootbuf_.Buffer(),
0240 data_buffer.curr_event_size_,
0241 data_buffer.comp_buf_,
0242 compression_level,
0243 reserveSize);
0244 break;
0245 default:
0246 dest_size = data_buffer.rootbuf_.Length();
0247 if (data_buffer.comp_buf_.size() < dest_size + reserveSize)
0248 data_buffer.comp_buf_.resize(dest_size + reserveSize);
0249 std::copy((char *)data_buffer.rootbuf_.Buffer(),
0250 (char *)data_buffer.rootbuf_.Buffer() + dest_size,
0251 (char *)(&data_buffer.comp_buf_[SerializeDataBuffer::reserve_size]));
0252 break;
0253 };
0254
0255 data_buffer.ptr_ = &data_buffer.comp_buf_[reserveSize];
0256 data_buffer.curr_space_used_ = dest_size;
0257
0258
0259 data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
0260
0261
0262 return data_buffer.curr_space_used_;
0263 }
0264
0265
0266
0267
0268
0269
0270 unsigned int StreamSerializer::compressBuffer(unsigned char *inputBuffer,
0271 unsigned int inputSize,
0272 std::vector<unsigned char> &outputBuffer,
0273 int compressionLevel,
0274 unsigned int reserveSize) {
0275 unsigned int resultSize = 0;
0276
0277
0278 unsigned long dest_size = (unsigned long)(double(inputSize) * 1.002 + 1.0) + 12;
0279
0280 if (outputBuffer.size() < dest_size + reserveSize)
0281 outputBuffer.resize(dest_size + reserveSize);
0282
0283
0284 int ret = compress2(&outputBuffer[reserveSize], &dest_size, inputBuffer, inputSize, compressionLevel);
0285
0286
0287 if (ret == Z_OK) {
0288
0289 resultSize = dest_size;
0290
0291 FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
0292 << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
0293 } else {
0294 throw cms::Exception("StreamSerializer", "compressBuffer")
0295 << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
0296 }
0297
0298 return resultSize;
0299 }
0300
0301
0302 unsigned int StreamSerializer::compressBufferLZMA(unsigned char *inputBuffer,
0303 unsigned int inputSize,
0304 std::vector<unsigned char> &outputBuffer,
0305 int compressionLevel,
0306 unsigned int reserveSize,
0307 bool addHeader) {
0308
0309 unsigned int hdr_size = addHeader ? 4 : 0;
0310 unsigned long dest_size = (unsigned long)(double(inputSize) * 1.01 + 1.0) + 12;
0311 if (outputBuffer.size() < dest_size + reserveSize)
0312 outputBuffer.resize(dest_size + reserveSize);
0313
0314
0315 uint32_t dict_size_est = inputSize / 4;
0316 lzma_stream stream = LZMA_STREAM_INIT;
0317 lzma_options_lzma opt_lzma2;
0318 lzma_filter filters[] = {
0319 {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
0320 {.id = LZMA_VLI_UNKNOWN, .options = nullptr},
0321 };
0322 lzma_ret returnStatus;
0323
0324 unsigned char *tgt = &outputBuffer[reserveSize];
0325
0326
0327
0328
0329
0330 if (compressionLevel > 9)
0331 compressionLevel = 9;
0332
0333 lzma_bool presetStatus = lzma_lzma_preset(&opt_lzma2, compressionLevel);
0334 if (presetStatus) {
0335 throw cms::Exception("StreamSerializer", "compressBufferLZMA") << "LZMA preset return status: " << presetStatus;
0336 }
0337
0338 if (LZMA_DICT_SIZE_MIN > dict_size_est) {
0339 dict_size_est = LZMA_DICT_SIZE_MIN;
0340 }
0341 if (opt_lzma2.dict_size > dict_size_est) {
0342
0343
0344
0345 opt_lzma2.dict_size = dict_size_est;
0346 }
0347
0348 returnStatus =
0349 lzma_stream_encoder(&stream,
0350 filters,
0351 LZMA_CHECK_NONE);
0352 if (returnStatus != LZMA_OK) {
0353 throw cms::Exception("StreamSerializer", "compressBufferLZMA")
0354 << "LZMA compression encoder return value: " << returnStatus;
0355 }
0356
0357 stream.next_in = (const uint8_t *)inputBuffer;
0358 stream.avail_in = (size_t)(inputSize);
0359
0360 stream.next_out = (uint8_t *)(&tgt[hdr_size]);
0361 stream.avail_out = (size_t)(dest_size - hdr_size);
0362
0363 returnStatus = lzma_code(&stream, LZMA_FINISH);
0364
0365 if (returnStatus != LZMA_STREAM_END) {
0366 lzma_end(&stream);
0367 throw cms::Exception("StreamSerializer", "compressBufferLZMA")
0368 << "LZMA compression return value: " << returnStatus;
0369 }
0370 lzma_end(&stream);
0371
0372
0373 if (addHeader) {
0374 tgt[0] = 'X';
0375 tgt[1] = 'Z';
0376 tgt[2] = 0;
0377 tgt[3] = 0;
0378 }
0379
0380 FDEBUG(1) << " LZMA original size = " << inputSize << " final size = " << stream.total_out
0381 << " ratio = " << double(stream.total_out) / double(inputSize) << std::endl;
0382
0383 return stream.total_out + hdr_size;
0384 }
0385
0386 unsigned int StreamSerializer::compressBufferZSTD(unsigned char *inputBuffer,
0387 unsigned int inputSize,
0388 std::vector<unsigned char> &outputBuffer,
0389 int compressionLevel,
0390 unsigned int reserveSize,
0391 bool addHeader) {
0392 unsigned int hdr_size = addHeader ? 4 : 0;
0393 unsigned int resultSize = 0;
0394
0395
0396 size_t worst_size = ZSTD_compressBound(inputSize);
0397
0398 if (outputBuffer.size() < worst_size + reserveSize + hdr_size)
0399 outputBuffer.resize(worst_size + reserveSize + hdr_size);
0400
0401
0402 unsigned char *tgt = &outputBuffer[reserveSize];
0403 if (addHeader) {
0404 tgt[0] = 'Z';
0405 tgt[1] = 'S';
0406 tgt[2] = 0;
0407 tgt[3] = 0;
0408 }
0409
0410
0411 size_t dest_size = ZSTD_compress(
0412 (void *)&outputBuffer[reserveSize + hdr_size], worst_size, (void *)inputBuffer, inputSize, compressionLevel);
0413
0414
0415 if (!ZSTD_isError(dest_size)) {
0416
0417 resultSize = (unsigned int)dest_size + hdr_size;
0418
0419 FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
0420 << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
0421 } else {
0422 throw cms::Exception("StreamSerializer", "compressBuffer")
0423 << "Compression (ZSTD) Error: " << ZSTD_getErrorName(dest_size);
0424 }
0425
0426 return resultSize;
0427 }
0428
0429 }