Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-03-13 02:32:07

0001 #ifndef IOPool_Streamer_StreamerInputSource_h
0002 #define IOPool_Streamer_StreamerInputSource_h
0003 
0004 /**
0005  * StreamerInputSource.h
0006  *
0007  * Base class for translating streamer message objects into
0008  * framework objects (e.g. ProductRegistry and EventPrincipal)
0009  */
0010 
0011 #include "TBufferFile.h"
0012 
0013 #include "FWCore/Framework/interface/Frameworkfwd.h"
0014 #include "FWCore/Sources/interface/RawInputSource.h"
0015 #include "FWCore/Utilities/interface/propagate_const.h"
0016 
0017 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0018 #include "DataFormats/Common/interface/EDProductGetter.h"
0019 
0020 #include <memory>
0021 #include <vector>
0022 
0023 namespace edm {
0024   class BranchIDListHelper;
0025   class ParameterSetDescription;
0026   class ThinnedAssociationsHelper;
0027 }  // namespace edm
0028 
0029 namespace edm::streamer {
0030   class InitMsgView;
0031   class EventMsgView;
0032 
0033   class StreamerInputSource : public RawInputSource {
0034   public:
0035     explicit StreamerInputSource(ParameterSet const& pset, InputSourceDescription const& desc);
0036     ~StreamerInputSource() override;
0037     static void fillDescription(ParameterSetDescription& description);
0038 
0039     std::unique_ptr<SendJobHeader> deserializeRegistry(InitMsgView const& initView);
0040 
0041     void deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent = false);
0042 
0043     //If eventView is a meta data event then this returns its checksum
0044     uint32_t eventMetaDataChecksum(EventMsgView const& eventView) const;
0045     //Should be called right after this message has been read
0046     void deserializeEventMetaData(EventMsgView const& eventView);
0047     void deserializeEvent(EventMsgView const& eventView);
0048 
0049     uint32_t presentEventMetaDataChecksum() const { return eventMetaDataChecksum_; }
0050     //This can only be called during a new file transition as it updates state that requires
0051     // framework synchronization.
0052     void updateEventMetaData();
0053 
0054     static void mergeIntoRegistry(SendJobHeader const& header, ProductRegistry&, bool subsequent);
0055 
0056     /**
0057      * Detect if buffer starts with "XZ\0" which means it is compressed in LZMA format
0058      */
0059     bool isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize);
0060 
0061     /**
0062      * Detect if buffer starts with "Z\0" which means it is compressed in ZStandard format
0063      */
0064     bool isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize);
0065 
0066     /**
0067      * Uncompresses the data in the specified input buffer into the
0068      * specified output buffer.  The inputSize should be set to the size
0069      * of the compressed data in the inputBuffer.  The expectedFullSize should
0070      * be set to the original size of the data (before compression).
0071      * Returns the actual size of the uncompressed data.
0072      * Errors are reported by throwing exceptions.
0073      */
0074     static unsigned int uncompressBuffer(unsigned char* inputBuffer,
0075                                          unsigned int inputSize,
0076                                          std::vector<unsigned char>& outputBuffer,
0077                                          unsigned int expectedFullSize);
0078 
0079     static unsigned int uncompressBufferLZMA(unsigned char* inputBuffer,
0080                                              unsigned int inputSize,
0081                                              std::vector<unsigned char>& outputBuffer,
0082                                              unsigned int expectedFullSize,
0083                                              bool hasHeader = true);
0084 
0085     static unsigned int uncompressBufferZSTD(unsigned char* inputBuffer,
0086                                              unsigned int inputSize,
0087                                              std::vector<unsigned char>& outputBuffer,
0088                                              unsigned int expectedFullSize,
0089                                              bool hasHeader = true);
0090 
0091   protected:
0092     static void declareStreamers(SendDescs const& descs);
0093     static void buildClassCache(SendDescs const& descs);
0094     void resetAfterEndRun();
0095 
0096   private:
0097     void deserializeEventCommon(EventMsgView const& eventView, bool isMetaData);
0098 
0099     class EventPrincipalHolder : public EDProductGetter {
0100     public:
0101       EventPrincipalHolder();
0102       ~EventPrincipalHolder() override;
0103 
0104       WrapperBase const* getIt(ProductID const& id) const override;
0105       std::optional<std::tuple<edm::WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
0106                                                                                          unsigned int) const override;
0107       void getThinnedProducts(ProductID const& pid,
0108                               std::vector<WrapperBase const*>& wrappers,
0109                               std::vector<unsigned int>& keys) const override;
0110       OptionalThinnedKey getThinnedKeyFrom(ProductID const&, unsigned int, ProductID const&) const override;
0111 
0112       unsigned int transitionIndex_() const override;
0113 
0114       void setEventPrincipal(EventPrincipal* ep);
0115 
0116     private:
0117       // We don't own the principal.  The lifetime must be managed externally.
0118       EventPrincipal const* eventPrincipal_;
0119     };
0120 
0121     void read(EventPrincipal& eventPrincipal) override;
0122 
0123     void setRun(RunNumber_t r) override;
0124 
0125     edm::propagate_const<TClass*> tc_;
0126     std::vector<unsigned char> dest_;
0127     TBufferFile xbuf_;
0128     edm::propagate_const<std::unique_ptr<SendEvent>> sendEvent_;
0129     edm::propagate_const<std::unique_ptr<EventPrincipalHolder>> eventPrincipalHolder_;
0130     std::vector<edm::propagate_const<std::unique_ptr<EventPrincipalHolder>>> streamToEventPrincipalHolders_;
0131 
0132     std::string processName_;
0133     unsigned int protocolVersion_;
0134     uint32_t eventMetaDataChecksum_ = 0;
0135   };  //end-of-class-def
0136 }  // namespace edm::streamer
0137 
0138 #endif