Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-31 04:19:41

0001 #ifndef IOPool_Streamer_StreamSerializer_h
0002 #define IOPool_Streamer_StreamSerializer_h
0003 
0004 /**
0005  * StreamSerializer.h
0006  *
0007  * Utility class for translating framework objects (e.g. ProductRegistry and
0008  * EventForOutput) into streamer message objects.
0009  */
0010 
0011 #include "TBufferFile.h"
0012 
0013 #include <cstdint>
0014 #include <vector>
0015 
0016 #include "DataFormats/Provenance/interface/BranchIDList.h"
0017 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0018 #include "DataFormats/Provenance/interface/SelectedProducts.h"
0019 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0020 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0021 
0022 namespace edm {
0023   class EventForOutput;
0024   class ModuleCallingContext;
0025   class ThinnedAssociationsHelper;
0026 }  // namespace edm
0027 
0028 // Data structure to be shared by all output modules for event serialization
0029 namespace edm::streamer {
0030   struct SerializeDataBuffer {
0031     typedef std::vector<char> SBuffer;
0032     static constexpr int init_size = 0;  //will be allocated on first event
0033     static constexpr unsigned int reserve_size = 50000;
0034 
0035     SerializeDataBuffer()
0036         : comp_buf_(reserve_size + init_size),
0037           curr_event_size_(),
0038           curr_space_used_(),
0039           rootbuf_(TBuffer::kWrite, init_size),
0040           ptr_((unsigned char *)rootbuf_.Buffer()),
0041           header_buf_(),
0042           adler32_chksum_(0) {}
0043 
0044     // This object caches the results of the last INIT or event
0045     // serialization operation.  You get access to the data using the
0046     // following member functions.
0047 
0048     unsigned char const *bufferPointer() const { return get_underlying_safe(ptr_); }
0049     unsigned char *&bufferPointer() { return get_underlying_safe(ptr_); }
0050     unsigned int currentSpaceUsed() const { return curr_space_used_; }
0051     unsigned int currentEventSize() const { return curr_event_size_; }
0052     uint32_t adler32_chksum() const { return adler32_chksum_; }
0053 
0054     void clearHeaderBuffer() {
0055       header_buf_.clear();
0056       header_buf_.shrink_to_fit();
0057       rootbuf_.Reset();
0058       rootbuf_.Expand(init_size);  //shrink TBuffer to size 0 after resetting TBuffer length
0059     }
0060 
0061     std::vector<unsigned char> comp_buf_;  // space for compressed data
0062     unsigned int curr_event_size_;
0063     unsigned int curr_space_used_;  // less than curr_event_size_ if compressed
0064     TBufferFile rootbuf_;
0065     edm::propagate_const<unsigned char *> ptr_;  // set to the place where the last event stored
0066     SBuffer header_buf_;                         // place for INIT message creation and streamer event header
0067     uint32_t adler32_chksum_;                    // adler32 check sum for the (compressed) data
0068   };
0069 
0070   class EventMsgBuilder;
0071   class InitMsgBuilder;
0072   enum StreamerCompressionAlgo { UNCOMPRESSED = 0, ZLIB = 1, LZMA = 2, ZSTD = 4 };
0073 
0074   class StreamSerializer {
0075   public:
0076     StreamSerializer(SelectedProducts const *selections);
0077 
0078     int serializeRegistry(SerializeDataBuffer &data_buffer) const;
0079 
0080     int serializeRegistry(SerializeDataBuffer &data_buffer, SendJobHeader::ParameterSetMap const &psetMap) const;
0081 
0082     int serializeEvent(SerializeDataBuffer &data_buffer,
0083                        EventForOutput const &event,
0084                        ParameterSetID const &selectorConfig,
0085                        uint32_t metaDataChecksum,
0086                        StreamerCompressionAlgo compressionAlgo,
0087                        int compression_level,
0088                        unsigned int reserveSize) const;
0089 
0090     ///data_buffer.adler32_chksum_ is the meta data checksum to pass to subsequent events
0091     int serializeEventMetaData(SerializeDataBuffer &data_buffer,
0092                                const BranchIDLists &branchIDLists,
0093                                ThinnedAssociationsHelper const &thinnedAssociationsHelper,
0094                                StreamerCompressionAlgo compressionAlgo,
0095                                int compression_level,
0096                                unsigned int reserveSize) const;
0097 
0098     /**
0099      * Compresses the data in the specified input buffer into the
0100      * specified output buffer.  Returns the size of the compressed data
0101      * or zero if compression failed.
0102      */
0103     static unsigned int compressBuffer(unsigned char *inputBuffer,
0104                                        unsigned int inputSize,
0105                                        std::vector<unsigned char> &outputBuffer,
0106                                        int compressionLevel,
0107                                        unsigned int reserveSize);
0108 
0109     static unsigned int compressBufferLZMA(unsigned char *inputBuffer,
0110                                            unsigned int inputSize,
0111                                            std::vector<unsigned char> &outputBuffer,
0112                                            int compressionLevel,
0113                                            unsigned int reserveSize,
0114                                            bool addHeader = true);
0115 
0116     static unsigned int compressBufferZSTD(unsigned char *inputBuffer,
0117                                            unsigned int inputSize,
0118                                            std::vector<unsigned char> &outputBuffer,
0119                                            int compressionLevel,
0120                                            unsigned int reserveSize,
0121                                            bool addHeader = true);
0122 
0123   private:
0124     int serializeEventCommon(SerializeDataBuffer &data_buffer,
0125                              edm::SendEvent const &iEvent,
0126                              StreamerCompressionAlgo compressionAlgo,
0127                              int compression_level,
0128                              unsigned int reserveSize) const;
0129 
0130     SelectedProducts const *selections_;
0131     edm::propagate_const<TClass *> tc_;
0132   };
0133 
0134 }  // namespace edm::streamer
0135 
0136 #endif