Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-10-25 09:53:48

0001 /**
0002  * StreamSerializer.cc
0003  *
0004  * Utility class for serializing framework objects (e.g. ProductRegistry and
0005  * Event) into streamer message objects.
0006  */
0007 #include "IOPool/Streamer/interface/StreamSerializer.h"
0008 #include "DataFormats/Provenance/interface/BranchDescription.h"
0009 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0010 #include "DataFormats/Provenance/interface/Parentage.h"
0011 #include "DataFormats/Provenance/interface/ProductProvenance.h"
0012 #include "DataFormats/Provenance/interface/SelectedProducts.h"
0013 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0014 #include "DataFormats/Provenance/interface/BranchListIndex.h"
0015 #include "IOPool/Streamer/interface/ClassFiller.h"
0016 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0017 #include "FWCore/Framework/interface/ConstProductRegistry.h"
0018 #include "FWCore/Framework/interface/EventForOutput.h"
0019 #include "FWCore/ParameterSet/interface/Registry.h"
0020 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0021 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0022 #include "FWCore/ServiceRegistry/interface/Service.h"
0023 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0024 
0025 #include "zlib.h"
0026 #include "lzma.h"
0027 #include "zstd.h"
0028 #include <algorithm>
0029 #include <cstdlib>
0030 #include <iostream>
0031 #include <vector>
0032 
0033 namespace edm {
0034 
0035   /**
0036    * Creates a translator instance for the specified product registry.
0037    */
0038   StreamSerializer::StreamSerializer(SelectedProducts const *selections)
0039       : selections_(selections), tc_(getTClass(typeid(SendEvent))) {}
0040 
0041   /**
0042    * Serializes the product registry (that was specified to the constructor)
0043    * into the specified InitMessage.
0044    */
0045   int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer,
0046                                           const BranchIDLists &branchIDLists,
0047                                           ThinnedAssociationsHelper const &thinnedAssociationsHelper) {
0048     SendJobHeader::ParameterSetMap psetMap;
0049     pset::Registry::instance()->fillMap(psetMap);
0050     return serializeRegistry(data_buffer, branchIDLists, thinnedAssociationsHelper, psetMap);
0051   }
0052 
0053   int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer,
0054                                           const BranchIDLists &branchIDLists,
0055                                           ThinnedAssociationsHelper const &thinnedAssociationsHelper,
0056                                           SendJobHeader::ParameterSetMap const &psetMap) {
0057     FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
0058     SendJobHeader sd;
0059 
0060     FDEBUG(9) << "Product List: " << std::endl;
0061 
0062     for (auto const &selection : *selections_) {
0063       sd.push_back(*selection.first);
0064       FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl;
0065     }
0066     Service<ConstProductRegistry> reg;
0067     sd.setBranchIDLists(branchIDLists);
0068     sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
0069     sd.setParameterSetMap(psetMap);
0070 
0071     data_buffer.rootbuf_.Reset();
0072 
0073     RootDebug tracer(10, 10);
0074 
0075     TClass *tc = getTClass(typeid(SendJobHeader));
0076     int bres = data_buffer.rootbuf_.WriteObjectAny((char *)&sd, tc);
0077 
0078     switch (bres) {
0079       case 0:  // failure
0080       {
0081         throw cms::Exception("StreamTranslation", "Registry serialization failed")
0082             << "StreamSerializer failed to serialize registry\n";
0083         break;
0084       }
0085       case 1:  // succcess
0086         break;
0087       case 2:  // truncated result
0088       {
0089         throw cms::Exception("StreamTranslation", "Registry serialization truncated")
0090             << "StreamSerializer module attempted to serialize\n"
0091             << "a registry that is to big for the allocated buffers\n";
0092         break;
0093       }
0094       default:  // unknown
0095       {
0096         throw cms::Exception("StreamTranslation", "Registry serialization failed")
0097             << "StreamSerializer module got an unknown error code\n"
0098             << " while attempting to serialize registry\n";
0099         break;
0100       }
0101     }
0102 
0103     data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
0104     data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
0105     data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
0106     // calculate the adler32 checksum and fill it into the struct
0107     data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
0108     //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
0109     return data_buffer.curr_space_used_;
0110   }
0111 
0112   /**
0113    * Serializes the specified event into the specified event message.
0114 
0115 
0116    make a char* as a data member, tell ROOT to not adapt it, but still use it.
0117    initialize it to 1M, let ROOT resize if it wants, then delete it in the
0118    dtor.
0119 
0120    change the call to not take an eventMessage, add a member function to
0121    return the address of the place that ROOT wrote the serialized data.
0122 
0123    return the length of the serialized object and the actual length if
0124    compression has been done (may want to cache these lengths in this
0125    object instead.
0126 
0127    the caller will need to copy the data from this object to its final
0128    destination in the EventMsgBuilder.
0129 
0130 
0131    */
0132   int StreamSerializer::serializeEvent(SerializeDataBuffer &data_buffer,
0133                                        EventForOutput const &event,
0134                                        ParameterSetID const &selectorConfig,
0135                                        StreamerCompressionAlgo compressionAlgo,
0136                                        int compression_level,
0137                                        unsigned int reserveSize) const {
0138     EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
0139     selectionIDs.push_back(selectorConfig);
0140     SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes());
0141 
0142     // Loop over EDProducts, fill the provenance, and write.
0143 
0144     // Historical note. I fixed two bugs in the code below in
0145     // March 2017. One would have caused any Parentage written
0146     // using the Streamer output module to be total nonsense
0147     // prior to the fix. The other would have caused seg faults
0148     // when the Parentage was dropped in an earlier process.
0149 
0150     // FIX ME. The code below stores the direct parentage of
0151     // kept products, but it does not save the parentage of
0152     // dropped objects that are ancestors of kept products like
0153     // the PoolOutputModule. That information is currently
0154     // lost when the streamer output module is used.
0155 
0156     for (auto const &selection : *selections_) {
0157       BranchDescription const &desc = *selection.first;
0158       BasicHandle result = event.getByToken(selection.second, desc.unwrappedTypeID());
0159       if (!result.isValid()) {
0160         // No product with this ID was put in the event.
0161         // Create and write the provenance.
0162         se.products().push_back(StreamedProduct(desc));
0163       } else {
0164         if (result.provenance()->productProvenance()) {
0165           Parentage const *parentage =
0166               ParentageRegistry::instance()->getMapped(result.provenance()->productProvenance()->parentageID());
0167           assert(parentage);
0168           se.products().push_back(
0169               StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, &parentage->parents()));
0170         } else {
0171           se.products().push_back(StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, nullptr));
0172         }
0173       }
0174     }
0175 
0176     data_buffer.rootbuf_.Reset();
0177     RootDebug tracer(10, 10);
0178 
0179     //TClass* tc = getTClass(typeid(SendEvent));
0180     int bres = data_buffer.rootbuf_.WriteObjectAny(&se, tc_);
0181     switch (bres) {
0182       case 0:  // failure
0183       {
0184         throw cms::Exception("StreamTranslation", "Event serialization failed")
0185             << "StreamSerializer failed to serialize event: " << event.id();
0186         break;
0187       }
0188       case 1:  // succcess
0189         break;
0190       case 2:  // truncated result
0191       {
0192         throw cms::Exception("StreamTranslation", "Event serialization truncated")
0193             << "StreamSerializer module attempted to serialize an event\n"
0194             << "that is to big for the allocated buffers: " << event.id();
0195         break;
0196       }
0197       default:  // unknown
0198       {
0199         throw cms::Exception("StreamTranslation", "Event serialization failed")
0200             << "StreamSerializer module got an unknown error code\n"
0201             << " while attempting to serialize event: " << event.id();
0202         break;
0203       }
0204     }
0205 
0206     data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
0207     data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
0208 
0209 #if 0
0210    if(data_buffer.ptr_ != data_.ptr_) {
0211         std::cerr << "ROOT reset the buffer!!!!\n";
0212         data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
0213         }
0214 #endif
0215     // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
0216     // eventMessage.eventAddr());
0217     // eventMessage.setEventLength(rootbuf.Length());
0218 
0219     // compress before return if we need to
0220     // should test if compressed already - should never be?
0221     //   as double compression can have problems
0222     unsigned int dest_size = 0;
0223     switch (compressionAlgo) {
0224       case ZLIB:
0225         dest_size = compressBuffer((unsigned char *)data_buffer.rootbuf_.Buffer(),
0226                                    data_buffer.curr_event_size_,
0227                                    data_buffer.comp_buf_,
0228                                    compression_level,
0229                                    reserveSize);
0230         break;
0231       case LZMA:
0232         dest_size = compressBufferLZMA((unsigned char *)data_buffer.rootbuf_.Buffer(),
0233                                        data_buffer.curr_event_size_,
0234                                        data_buffer.comp_buf_,
0235                                        compression_level,
0236                                        reserveSize);
0237         break;
0238       case ZSTD:
0239         dest_size = compressBufferZSTD((unsigned char *)data_buffer.rootbuf_.Buffer(),
0240                                        data_buffer.curr_event_size_,
0241                                        data_buffer.comp_buf_,
0242                                        compression_level,
0243                                        reserveSize);
0244         break;
0245       default:
0246         dest_size = data_buffer.rootbuf_.Length();
0247         if (data_buffer.comp_buf_.size() < dest_size + reserveSize)
0248           data_buffer.comp_buf_.resize(dest_size + reserveSize);
0249         std::copy((char *)data_buffer.rootbuf_.Buffer(),
0250                   (char *)data_buffer.rootbuf_.Buffer() + dest_size,
0251                   (char *)(&data_buffer.comp_buf_[SerializeDataBuffer::reserve_size]));
0252         break;
0253     };
0254 
0255     data_buffer.ptr_ = &data_buffer.comp_buf_[reserveSize];  // reset to point at compressed area
0256     data_buffer.curr_space_used_ = dest_size;
0257 
0258     // calculate the adler32 checksum and fill it into the struct
0259     data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
0260     //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
0261 
0262     return data_buffer.curr_space_used_;
0263   }
0264 
0265   /**
0266    * Compresses the data in the specified input buffer into the
0267    * specified output buffer.  Returns the size of the compressed data
0268    * or zero if compression failed.
0269    */
0270   unsigned int StreamSerializer::compressBuffer(unsigned char *inputBuffer,
0271                                                 unsigned int inputSize,
0272                                                 std::vector<unsigned char> &outputBuffer,
0273                                                 int compressionLevel,
0274                                                 unsigned int reserveSize) {
0275     unsigned int resultSize = 0;
0276 
0277     // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
0278     unsigned long dest_size = (unsigned long)(double(inputSize) * 1.002 + 1.0) + 12;
0279     //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
0280     if (outputBuffer.size() < dest_size + reserveSize)
0281       outputBuffer.resize(dest_size + reserveSize);
0282 
0283     // compression 1-9, 6 is zlib default, 0 none
0284     int ret = compress2(&outputBuffer[reserveSize], &dest_size, inputBuffer, inputSize, compressionLevel);
0285 
0286     // check status
0287     if (ret == Z_OK) {
0288       // return the correct length
0289       resultSize = dest_size;
0290 
0291       FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
0292                 << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
0293     } else {
0294       throw cms::Exception("StreamSerializer", "compressBuffer")
0295           << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
0296     }
0297 
0298     return resultSize;
0299   }
0300 
0301   //this is based on ROOT R__zipLZMA
0302   unsigned int StreamSerializer::compressBufferLZMA(unsigned char *inputBuffer,
0303                                                     unsigned int inputSize,
0304                                                     std::vector<unsigned char> &outputBuffer,
0305                                                     int compressionLevel,
0306                                                     unsigned int reserveSize,
0307                                                     bool addHeader) {
0308     // what are these magic numbers? (jbk)
0309     unsigned int hdr_size = addHeader ? 4 : 0;
0310     unsigned long dest_size = (unsigned long)(double(inputSize) * 1.01 + 1.0) + 12;
0311     if (outputBuffer.size() < dest_size + reserveSize)
0312       outputBuffer.resize(dest_size + reserveSize);
0313 
0314     // compression 1-9
0315     uint32_t dict_size_est = inputSize / 4;
0316     lzma_stream stream = LZMA_STREAM_INIT;
0317     lzma_options_lzma opt_lzma2;
0318     lzma_filter filters[] = {
0319         {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
0320         {.id = LZMA_VLI_UNKNOWN, .options = nullptr},
0321     };
0322     lzma_ret returnStatus;
0323 
0324     unsigned char *tgt = &outputBuffer[reserveSize];
0325 
0326     //if (*srcsize > 0xffffff || *srcsize < 0) { //16 MB limit ?
0327     //   return;
0328     //}
0329 
0330     if (compressionLevel > 9)
0331       compressionLevel = 9;
0332 
0333     lzma_bool presetStatus = lzma_lzma_preset(&opt_lzma2, compressionLevel);
0334     if (presetStatus) {
0335       throw cms::Exception("StreamSerializer", "compressBufferLZMA") << "LZMA preset return status: " << presetStatus;
0336     }
0337 
0338     if (LZMA_DICT_SIZE_MIN > dict_size_est) {
0339       dict_size_est = LZMA_DICT_SIZE_MIN;
0340     }
0341     if (opt_lzma2.dict_size > dict_size_est) {
0342       /* reduce the dictionary size if larger than 1/4 the input size, preset
0343          dictionaries size can be expensively large
0344        */
0345       opt_lzma2.dict_size = dict_size_est;
0346     }
0347 
0348     returnStatus =
0349         lzma_stream_encoder(&stream,
0350                             filters,
0351                             LZMA_CHECK_NONE);  //CRC32 and CRC64 are available, but we already calculate adler32
0352     if (returnStatus != LZMA_OK) {
0353       throw cms::Exception("StreamSerializer", "compressBufferLZMA")
0354           << "LZMA compression encoder return value: " << returnStatus;
0355     }
0356 
0357     stream.next_in = (const uint8_t *)inputBuffer;
0358     stream.avail_in = (size_t)(inputSize);
0359 
0360     stream.next_out = (uint8_t *)(&tgt[hdr_size]);
0361     stream.avail_out = (size_t)(dest_size - hdr_size);
0362 
0363     returnStatus = lzma_code(&stream, LZMA_FINISH);
0364 
0365     if (returnStatus != LZMA_STREAM_END) {
0366       lzma_end(&stream);
0367       throw cms::Exception("StreamSerializer", "compressBufferLZMA")
0368           << "LZMA compression return value: " << returnStatus;
0369     }
0370     lzma_end(&stream);
0371 
0372     //Add compression-specific header at the buffer start. This will be used to detect LZMA(2) format after streamer header
0373     if (addHeader) {
0374       tgt[0] = 'X'; /* Signature of LZMA from XZ Utils */
0375       tgt[1] = 'Z';
0376       tgt[2] = 0;
0377       tgt[3] = 0;  //let's put offset to 4, not 3
0378     }
0379 
0380     FDEBUG(1) << " LZMA original size = " << inputSize << " final size = " << stream.total_out
0381               << " ratio = " << double(stream.total_out) / double(inputSize) << std::endl;
0382 
0383     return stream.total_out + hdr_size;
0384   }
0385 
0386   unsigned int StreamSerializer::compressBufferZSTD(unsigned char *inputBuffer,
0387                                                     unsigned int inputSize,
0388                                                     std::vector<unsigned char> &outputBuffer,
0389                                                     int compressionLevel,
0390                                                     unsigned int reserveSize,
0391                                                     bool addHeader) {
0392     unsigned int hdr_size = addHeader ? 4 : 0;
0393     unsigned int resultSize = 0;
0394 
0395     // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
0396     size_t worst_size = ZSTD_compressBound(inputSize);
0397     //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
0398     if (outputBuffer.size() < worst_size + reserveSize + hdr_size)
0399       outputBuffer.resize(worst_size + reserveSize + hdr_size);
0400 
0401     //Add compression-specific header at the buffer start. This will be used to detect ZSTD format after streamer header
0402     unsigned char *tgt = &outputBuffer[reserveSize];
0403     if (addHeader) {
0404       tgt[0] = 'Z'; /* Pre */
0405       tgt[1] = 'S';
0406       tgt[2] = 0;
0407       tgt[3] = 0;
0408     }
0409 
0410     // compression 1-20
0411     size_t dest_size = ZSTD_compress(
0412         (void *)&outputBuffer[reserveSize + hdr_size], worst_size, (void *)inputBuffer, inputSize, compressionLevel);
0413 
0414     // check status
0415     if (!ZSTD_isError(dest_size)) {
0416       // return the correct length
0417       resultSize = (unsigned int)dest_size + hdr_size;
0418 
0419       FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
0420                 << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
0421     } else {
0422       throw cms::Exception("StreamSerializer", "compressBuffer")
0423           << "Compression (ZSTD) Error: " << ZSTD_getErrorName(dest_size);
0424     }
0425 
0426     return resultSize;
0427   }
0428 
0429 }  // namespace edm