Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-31 02:19:45

0001 /**
0002  * StreamSerializer.cc
0003  *
0004  * Utility class for serializing framework objects (e.g. ProductRegistry and
0005  * Event) into streamer message objects.
0006  */
0007 #include "IOPool/Streamer/interface/StreamSerializer.h"
0008 #include "DataFormats/Provenance/interface/ProductDescription.h"
0009 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0010 #include "DataFormats/Provenance/interface/Parentage.h"
0011 #include "DataFormats/Provenance/interface/ProductProvenance.h"
0012 #include "DataFormats/Provenance/interface/SelectedProducts.h"
0013 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0014 #include "DataFormats/Provenance/interface/BranchListIndex.h"
0015 #include "IOPool/Streamer/interface/ClassFiller.h"
0016 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0017 #include "FWCore/Framework/interface/EventForOutput.h"
0018 #include "FWCore/ParameterSet/interface/Registry.h"
0019 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0020 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0021 #include "FWCore/ServiceRegistry/interface/Service.h"
0022 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0023 
0024 #include "zlib.h"
0025 #include "lzma.h"
0026 #include "zstd.h"
0027 #include <algorithm>
0028 #include <cstdlib>
0029 #include <iostream>
0030 #include <vector>
0031 
0032 namespace edm::streamer {
0033 
0034   /**
0035    * Creates a translator instance for the specified product registry.
0036    */
0037   StreamSerializer::StreamSerializer(SelectedProducts const *selections)
0038       : selections_(selections), tc_(getTClass(typeid(SendEvent))) {}
0039 
0040   /**
0041    * Serializes the product registry (that was specified to the constructor)
0042    * into the specified InitMessage.
0043    */
0044   int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer) const {
0045     SendJobHeader::ParameterSetMap psetMap;
0046     pset::Registry::instance()->fillMap(psetMap);
0047     return serializeRegistry(data_buffer, psetMap);
0048   }
0049 
0050   int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer,
0051                                           SendJobHeader::ParameterSetMap const &psetMap) const {
0052     FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
0053     SendJobHeader sd;
0054 
0055     FDEBUG(9) << "Product List: " << std::endl;
0056 
0057     for (auto const &selection : *selections_) {
0058       sd.push_back(*selection.first);
0059       FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl;
0060     }
0061     sd.setParameterSetMap(psetMap);
0062 
0063     data_buffer.rootbuf_.Reset();
0064 
0065     RootDebug tracer(10, 10);
0066 
0067     TClass *tc = getTClass(typeid(SendJobHeader));
0068     int bres = data_buffer.rootbuf_.WriteObjectAny((char *)&sd, tc);
0069 
0070     switch (bres) {
0071       case 0:  // failure
0072       {
0073         throw cms::Exception("StreamTranslation", "Registry serialization failed")
0074             << "StreamSerializer failed to serialize registry\n";
0075         break;
0076       }
0077       case 1:  // succcess
0078         break;
0079       case 2:  // truncated result
0080       {
0081         throw cms::Exception("StreamTranslation", "Registry serialization truncated")
0082             << "StreamSerializer module attempted to serialize\n"
0083             << "a registry that is to big for the allocated buffers\n";
0084         break;
0085       }
0086       default:  // unknown
0087       {
0088         throw cms::Exception("StreamTranslation", "Registry serialization failed")
0089             << "StreamSerializer module got an unknown error code\n"
0090             << " while attempting to serialize registry\n";
0091         break;
0092       }
0093     }
0094 
0095     data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
0096     data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
0097     data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
0098     // calculate the adler32 checksum and fill it into the struct
0099     data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
0100     //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
0101     return data_buffer.curr_space_used_;
0102   }
0103 
0104   /**
0105    * Serializes the specified event into the specified event message.
0106 
0107 
0108    make a char* as a data member, tell ROOT to not adapt it, but still use it.
0109    initialize it to 1M, let ROOT resize if it wants, then delete it in the
0110    dtor.
0111 
0112    change the call to not take an eventMessage, add a member function to
0113    return the address of the place that ROOT wrote the serialized data.
0114 
0115    return the length of the serialized object and the actual length if
0116    compression has been done (may want to cache these lengths in this
0117    object instead.
0118 
0119    the caller will need to copy the data from this object to its final
0120    destination in the EventMsgBuilder.
0121 
0122 
0123    */
0124   int StreamSerializer::serializeEvent(SerializeDataBuffer &data_buffer,
0125                                        EventForOutput const &event,
0126                                        ParameterSetID const &selectorConfig,
0127                                        uint32_t metaDataChecksum,
0128                                        StreamerCompressionAlgo compressionAlgo,
0129                                        int compression_level,
0130                                        unsigned int reserveSize) const {
0131     EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
0132     selectionIDs.push_back(selectorConfig);
0133     SendEvent se(event.eventAuxiliary(),
0134                  event.processHistory(),
0135                  selectionIDs,
0136                  event.branchListIndexes(),
0137                  {},
0138                  {},
0139                  metaDataChecksum);
0140 
0141     // Loop over EDProducts, fill the provenance, and write.
0142 
0143     // Historical note. I fixed two bugs in the code below in
0144     // March 2017. One would have caused any Parentage written
0145     // using the Streamer output module to be total nonsense
0146     // prior to the fix. The other would have caused seg faults
0147     // when the Parentage was dropped in an earlier process.
0148 
0149     // FIX ME. The code below stores the direct parentage of
0150     // kept products, but it does not save the parentage of
0151     // dropped objects that are ancestors of kept products like
0152     // the PoolOutputModule. That information is currently
0153     // lost when the streamer output module is used.
0154 
0155     for (auto const &selection : *selections_) {
0156       ProductDescription const &desc = *selection.first;
0157       BasicHandle result = event.getByToken(selection.second, desc.unwrappedTypeID());
0158       if (!result.isValid()) {
0159         // No product with this ID was put in the event.
0160         // Create and write the provenance.
0161         se.products().push_back(StreamedProduct(desc));
0162       } else {
0163         if (result.provenance()->productProvenance()) {
0164           Parentage const *parentage =
0165               ParentageRegistry::instance()->getMapped(result.provenance()->productProvenance()->parentageID());
0166           assert(parentage);
0167           se.products().push_back(
0168               StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, &parentage->parents()));
0169         } else {
0170           se.products().push_back(StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, nullptr));
0171         }
0172       }
0173     }
0174     return serializeEventCommon(data_buffer, se, compressionAlgo, compression_level, reserveSize);
0175   }
0176 
0177   int StreamSerializer::serializeEventMetaData(SerializeDataBuffer &data_buffer,
0178                                                const BranchIDLists &branchIDLists,
0179                                                ThinnedAssociationsHelper const &thinnedAssociationsHelper,
0180                                                StreamerCompressionAlgo compressionAlgo,
0181                                                int compression_level,
0182                                                unsigned int reserveSize) const {
0183     SendEvent se({}, {}, {}, {}, branchIDLists, thinnedAssociationsHelper, 0);
0184 
0185     return serializeEventCommon(data_buffer, se, compressionAlgo, compression_level, reserveSize);
0186   }
0187 
0188   int StreamSerializer::serializeEventCommon(SerializeDataBuffer &data_buffer,
0189                                              edm::SendEvent const &se,
0190                                              StreamerCompressionAlgo compressionAlgo,
0191                                              int compression_level,
0192                                              unsigned int reserveSize) const {
0193     data_buffer.rootbuf_.Reset();
0194     RootDebug tracer(10, 10);
0195 
0196     //TClass* tc = getTClass(typeid(SendEvent));
0197     int bres = data_buffer.rootbuf_.WriteObjectAny(&se, tc_);
0198     switch (bres) {
0199       case 0:  // failure
0200       {
0201         throw cms::Exception("StreamTranslation", "Event serialization failed")
0202             << "StreamSerializer failed to serialize event: " << se.aux().id();
0203         break;
0204       }
0205       case 1:  // succcess
0206         break;
0207       case 2:  // truncated result
0208       {
0209         throw cms::Exception("StreamTranslation", "Event serialization truncated")
0210             << "StreamSerializer module attempted to serialize an event\n"
0211             << "that is to big for the allocated buffers: " << se.aux().id();
0212         break;
0213       }
0214       default:  // unknown
0215       {
0216         throw cms::Exception("StreamTranslation", "Event serialization failed")
0217             << "StreamSerializer module got an unknown error code\n"
0218             << " while attempting to serialize event: " << se.aux().id();
0219         break;
0220       }
0221     }
0222 
0223     data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
0224     data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
0225 
0226 #if 0
0227    if(data_buffer.ptr_ != data_.ptr_) {
0228         std::cerr << "ROOT reset the buffer!!!!\n";
0229         data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
0230         }
0231 #endif
0232     // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
0233     // eventMessage.eventAddr());
0234     // eventMessage.setEventLength(rootbuf.Length());
0235 
0236     // compress before return if we need to
0237     // should test if compressed already - should never be?
0238     //   as double compression can have problems
0239     unsigned int dest_size = 0;
0240     switch (compressionAlgo) {
0241       case ZLIB:
0242         dest_size = compressBuffer((unsigned char *)data_buffer.rootbuf_.Buffer(),
0243                                    data_buffer.curr_event_size_,
0244                                    data_buffer.comp_buf_,
0245                                    compression_level,
0246                                    reserveSize);
0247         break;
0248       case LZMA:
0249         dest_size = compressBufferLZMA((unsigned char *)data_buffer.rootbuf_.Buffer(),
0250                                        data_buffer.curr_event_size_,
0251                                        data_buffer.comp_buf_,
0252                                        compression_level,
0253                                        reserveSize);
0254         break;
0255       case ZSTD:
0256         dest_size = compressBufferZSTD((unsigned char *)data_buffer.rootbuf_.Buffer(),
0257                                        data_buffer.curr_event_size_,
0258                                        data_buffer.comp_buf_,
0259                                        compression_level,
0260                                        reserveSize);
0261         break;
0262       default:
0263         dest_size = data_buffer.rootbuf_.Length();
0264         if (data_buffer.comp_buf_.size() < dest_size + reserveSize)
0265           data_buffer.comp_buf_.resize(dest_size + reserveSize);
0266         std::copy((char *)data_buffer.rootbuf_.Buffer(),
0267                   (char *)data_buffer.rootbuf_.Buffer() + dest_size,
0268                   (char *)(&data_buffer.comp_buf_[SerializeDataBuffer::reserve_size]));
0269         break;
0270     };
0271 
0272     data_buffer.ptr_ = &data_buffer.comp_buf_[reserveSize];  // reset to point at compressed area
0273     data_buffer.curr_space_used_ = dest_size;
0274 
0275     // calculate the adler32 checksum and fill it into the struct
0276     data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
0277     //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
0278 
0279     return data_buffer.curr_space_used_;
0280   }
0281 
0282   /**
0283    * Compresses the data in the specified input buffer into the
0284    * specified output buffer.  Returns the size of the compressed data
0285    * or zero if compression failed.
0286    */
0287   unsigned int StreamSerializer::compressBuffer(unsigned char *inputBuffer,
0288                                                 unsigned int inputSize,
0289                                                 std::vector<unsigned char> &outputBuffer,
0290                                                 int compressionLevel,
0291                                                 unsigned int reserveSize) {
0292     unsigned int resultSize = 0;
0293 
0294     // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
0295     unsigned long dest_size = (unsigned long)(double(inputSize) * 1.002 + 1.0) + 12;
0296     //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
0297     if (outputBuffer.size() < dest_size + reserveSize)
0298       outputBuffer.resize(dest_size + reserveSize);
0299 
0300     // compression 1-9, 6 is zlib default, 0 none
0301     int ret = compress2(&outputBuffer[reserveSize], &dest_size, inputBuffer, inputSize, compressionLevel);
0302 
0303     // check status
0304     if (ret == Z_OK) {
0305       // return the correct length
0306       resultSize = dest_size;
0307 
0308       FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
0309                 << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
0310     } else {
0311       throw cms::Exception("StreamSerializer", "compressBuffer")
0312           << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
0313     }
0314 
0315     return resultSize;
0316   }
0317 
0318   //this is based on ROOT R__zipLZMA
0319   unsigned int StreamSerializer::compressBufferLZMA(unsigned char *inputBuffer,
0320                                                     unsigned int inputSize,
0321                                                     std::vector<unsigned char> &outputBuffer,
0322                                                     int compressionLevel,
0323                                                     unsigned int reserveSize,
0324                                                     bool addHeader) {
0325     // what are these magic numbers? (jbk)
0326     unsigned int hdr_size = addHeader ? 4 : 0;
0327     unsigned long dest_size = (unsigned long)(double(inputSize) * 1.01 + 1.0) + 12;
0328     if (outputBuffer.size() < dest_size + reserveSize)
0329       outputBuffer.resize(dest_size + reserveSize);
0330 
0331     // compression 1-9
0332     uint32_t dict_size_est = inputSize / 4;
0333     lzma_stream stream = LZMA_STREAM_INIT;
0334     lzma_options_lzma opt_lzma2;
0335     lzma_filter filters[] = {
0336         {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
0337         {.id = LZMA_VLI_UNKNOWN, .options = nullptr},
0338     };
0339     lzma_ret returnStatus;
0340 
0341     unsigned char *tgt = &outputBuffer[reserveSize];
0342 
0343     //if (*srcsize > 0xffffff || *srcsize < 0) { //16 MB limit ?
0344     //   return;
0345     //}
0346 
0347     if (compressionLevel > 9)
0348       compressionLevel = 9;
0349 
0350     lzma_bool presetStatus = lzma_lzma_preset(&opt_lzma2, compressionLevel);
0351     if (presetStatus) {
0352       throw cms::Exception("StreamSerializer", "compressBufferLZMA") << "LZMA preset return status: " << presetStatus;
0353     }
0354 
0355     if (LZMA_DICT_SIZE_MIN > dict_size_est) {
0356       dict_size_est = LZMA_DICT_SIZE_MIN;
0357     }
0358     if (opt_lzma2.dict_size > dict_size_est) {
0359       /* reduce the dictionary size if larger than 1/4 the input size, preset
0360          dictionaries size can be expensively large
0361        */
0362       opt_lzma2.dict_size = dict_size_est;
0363     }
0364 
0365     returnStatus =
0366         lzma_stream_encoder(&stream,
0367                             filters,
0368                             LZMA_CHECK_NONE);  //CRC32 and CRC64 are available, but we already calculate adler32
0369     if (returnStatus != LZMA_OK) {
0370       throw cms::Exception("StreamSerializer", "compressBufferLZMA")
0371           << "LZMA compression encoder return value: " << returnStatus;
0372     }
0373 
0374     stream.next_in = (const uint8_t *)inputBuffer;
0375     stream.avail_in = (size_t)(inputSize);
0376 
0377     stream.next_out = (uint8_t *)(&tgt[hdr_size]);
0378     stream.avail_out = (size_t)(dest_size - hdr_size);
0379 
0380     returnStatus = lzma_code(&stream, LZMA_FINISH);
0381 
0382     if (returnStatus != LZMA_STREAM_END) {
0383       lzma_end(&stream);
0384       throw cms::Exception("StreamSerializer", "compressBufferLZMA")
0385           << "LZMA compression return value: " << returnStatus;
0386     }
0387     lzma_end(&stream);
0388 
0389     //Add compression-specific header at the buffer start. This will be used to detect LZMA(2) format after streamer header
0390     if (addHeader) {
0391       tgt[0] = 'X'; /* Signature of LZMA from XZ Utils */
0392       tgt[1] = 'Z';
0393       tgt[2] = 0;
0394       tgt[3] = 0;  //let's put offset to 4, not 3
0395     }
0396 
0397     FDEBUG(1) << " LZMA original size = " << inputSize << " final size = " << stream.total_out
0398               << " ratio = " << double(stream.total_out) / double(inputSize) << std::endl;
0399 
0400     return stream.total_out + hdr_size;
0401   }
0402 
0403   unsigned int StreamSerializer::compressBufferZSTD(unsigned char *inputBuffer,
0404                                                     unsigned int inputSize,
0405                                                     std::vector<unsigned char> &outputBuffer,
0406                                                     int compressionLevel,
0407                                                     unsigned int reserveSize,
0408                                                     bool addHeader) {
0409     unsigned int hdr_size = addHeader ? 4 : 0;
0410     unsigned int resultSize = 0;
0411 
0412     // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
0413     size_t worst_size = ZSTD_compressBound(inputSize);
0414     //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
0415     if (outputBuffer.size() < worst_size + reserveSize + hdr_size)
0416       outputBuffer.resize(worst_size + reserveSize + hdr_size);
0417 
0418     //Add compression-specific header at the buffer start. This will be used to detect ZSTD format after streamer header
0419     unsigned char *tgt = &outputBuffer[reserveSize];
0420     if (addHeader) {
0421       tgt[0] = 'Z'; /* Pre */
0422       tgt[1] = 'S';
0423       tgt[2] = 0;
0424       tgt[3] = 0;
0425     }
0426 
0427     // compression 1-20
0428     size_t dest_size = ZSTD_compress(
0429         (void *)&outputBuffer[reserveSize + hdr_size], worst_size, (void *)inputBuffer, inputSize, compressionLevel);
0430 
0431     // check status
0432     if (!ZSTD_isError(dest_size)) {
0433       // return the correct length
0434       resultSize = (unsigned int)dest_size + hdr_size;
0435 
0436       FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
0437                 << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
0438     } else {
0439       throw cms::Exception("StreamSerializer", "compressBuffer")
0440           << "Compression (ZSTD) Error: " << ZSTD_getErrorName(dest_size);
0441     }
0442 
0443     return resultSize;
0444   }
0445 
0446 }  // namespace edm::streamer