Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-02-14 14:21:19

0001 #ifndef IOPool_Streamer_StreamSerializer_h
0002 #define IOPool_Streamer_StreamSerializer_h
0003 
0004 /**
0005  * StreamSerializer.h
0006  *
0007  * Utility class for translating framework objects (e.g. ProductRegistry and
0008  * EventForOutput) into streamer message objects.
0009  */
0010 
0011 #include "TBufferFile.h"
0012 
0013 #include <cstdint>
0014 #include <vector>
0015 
0016 #include "DataFormats/Provenance/interface/BranchIDList.h"
0017 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0018 #include "DataFormats/Provenance/interface/SelectedProducts.h"
0019 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0020 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0021 
0022 // Data structure to be shared by all output modules for event serialization
0023 struct SerializeDataBuffer {
0024   typedef std::vector<char> SBuffer;
0025   static constexpr int init_size = 0;  //will be allocated on first event
0026   static constexpr unsigned int reserve_size = 50000;
0027 
0028   SerializeDataBuffer()
0029       : comp_buf_(reserve_size + init_size),
0030         curr_event_size_(),
0031         curr_space_used_(),
0032         rootbuf_(TBuffer::kWrite, init_size),
0033         ptr_((unsigned char *)rootbuf_.Buffer()),
0034         header_buf_(),
0035         adler32_chksum_(0) {}
0036 
0037   // This object caches the results of the last INIT or event
0038   // serialization operation.  You get access to the data using the
0039   // following member functions.
0040 
0041   unsigned char const *bufferPointer() const { return get_underlying_safe(ptr_); }
0042   unsigned char *&bufferPointer() { return get_underlying_safe(ptr_); }
0043   unsigned int currentSpaceUsed() const { return curr_space_used_; }
0044   unsigned int currentEventSize() const { return curr_event_size_; }
0045   uint32_t adler32_chksum() const { return adler32_chksum_; }
0046 
0047   void clearHeaderBuffer() {
0048     header_buf_.clear();
0049     header_buf_.shrink_to_fit();
0050     rootbuf_.Reset();
0051     rootbuf_.Expand(init_size);  //shrink TBuffer to size 0 after resetting TBuffer length
0052   }
0053 
0054   std::vector<unsigned char> comp_buf_;  // space for compressed data
0055   unsigned int curr_event_size_;
0056   unsigned int curr_space_used_;  // less than curr_event_size_ if compressed
0057   TBufferFile rootbuf_;
0058   edm::propagate_const<unsigned char *> ptr_;  // set to the place where the last event stored
0059   SBuffer header_buf_;                         // place for INIT message creation and streamer event header
0060   uint32_t adler32_chksum_;                    // adler32 check sum for the (compressed) data
0061 };
0062 
0063 class EventMsgBuilder;
0064 class InitMsgBuilder;
0065 namespace edm {
0066   enum StreamerCompressionAlgo { UNCOMPRESSED = 0, ZLIB = 1, LZMA = 2, ZSTD = 4 };
0067 
0068   class EventForOutput;
0069   class ModuleCallingContext;
0070   class ThinnedAssociationsHelper;
0071 
0072   class StreamSerializer {
0073   public:
0074     StreamSerializer(SelectedProducts const *selections);
0075 
0076     int serializeRegistry(SerializeDataBuffer &data_buffer,
0077                           const BranchIDLists &branchIDLists,
0078                           ThinnedAssociationsHelper const &thinnedAssociationsHelper);
0079 
0080     int serializeRegistry(SerializeDataBuffer &data_buffer,
0081                           const BranchIDLists &branchIDLists,
0082                           ThinnedAssociationsHelper const &thinnedAssociationsHelper,
0083                           SendJobHeader::ParameterSetMap const &psetMap);
0084 
0085     int serializeEvent(SerializeDataBuffer &data_buffer,
0086                        EventForOutput const &event,
0087                        ParameterSetID const &selectorConfig,
0088                        StreamerCompressionAlgo compressionAlgo,
0089                        int compression_level,
0090                        unsigned int reserveSize) const;
0091 
0092     /**
0093      * Compresses the data in the specified input buffer into the
0094      * specified output buffer.  Returns the size of the compressed data
0095      * or zero if compression failed.
0096      */
0097     static unsigned int compressBuffer(unsigned char *inputBuffer,
0098                                        unsigned int inputSize,
0099                                        std::vector<unsigned char> &outputBuffer,
0100                                        int compressionLevel,
0101                                        unsigned int reserveSize);
0102 
0103     static unsigned int compressBufferLZMA(unsigned char *inputBuffer,
0104                                            unsigned int inputSize,
0105                                            std::vector<unsigned char> &outputBuffer,
0106                                            int compressionLevel,
0107                                            unsigned int reserveSize,
0108                                            bool addHeader = true);
0109 
0110     static unsigned int compressBufferZSTD(unsigned char *inputBuffer,
0111                                            unsigned int inputSize,
0112                                            std::vector<unsigned char> &outputBuffer,
0113                                            int compressionLevel,
0114                                            unsigned int reserveSize,
0115                                            bool addHeader = true);
0116 
0117   private:
0118     SelectedProducts const *selections_;
0119     edm::propagate_const<TClass *> tc_;
0120   };
0121 
0122 }  // namespace edm
0123 
0124 #endif