Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-10-25 09:53:47

0001 #ifndef IOPool_Streamer_StreamerInputSource_h
0002 #define IOPool_Streamer_StreamerInputSource_h
0003 
0004 /**
0005  * StreamerInputSource.h
0006  *
0007  * Base class for translating streamer message objects into
0008  * framework objects (e.g. ProductRegistry and EventPrincipal)
0009  */
0010 
0011 #include "TBufferFile.h"
0012 
0013 #include "FWCore/Framework/interface/Frameworkfwd.h"
0014 #include "FWCore/Sources/interface/RawInputSource.h"
0015 #include "FWCore/Utilities/interface/propagate_const.h"
0016 
0017 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0018 #include "DataFormats/Common/interface/EDProductGetter.h"
0019 
0020 #include <memory>
0021 #include <vector>
0022 
0023 class InitMsgView;
0024 class EventMsgView;
0025 
0026 namespace edm {
0027   class BranchIDListHelper;
0028   class ParameterSetDescription;
0029   class ThinnedAssociationsHelper;
0030 
0031   class StreamerInputSource : public RawInputSource {
0032   public:
0033     explicit StreamerInputSource(ParameterSet const& pset, InputSourceDescription const& desc);
0034     ~StreamerInputSource() override;
0035     static void fillDescription(ParameterSetDescription& description);
0036 
0037     std::unique_ptr<SendJobHeader> deserializeRegistry(InitMsgView const& initView);
0038 
0039     void deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent = false);
0040 
0041     void deserializeEvent(EventMsgView const& eventView);
0042 
0043     static void mergeIntoRegistry(SendJobHeader const& header,
0044                                   ProductRegistry&,
0045                                   BranchIDListHelper&,
0046                                   ThinnedAssociationsHelper&,
0047                                   bool subsequent);
0048 
0049     /**
0050      * Detect if buffer starts with "XZ\0" which means it is compressed in LZMA format
0051      */
0052     bool isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize);
0053 
0054     /**
0055      * Detect if buffer starts with "Z\0" which means it is compressed in ZStandard format
0056      */
0057     bool isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize);
0058 
0059     /**
0060      * Uncompresses the data in the specified input buffer into the
0061      * specified output buffer.  The inputSize should be set to the size
0062      * of the compressed data in the inputBuffer.  The expectedFullSize should
0063      * be set to the original size of the data (before compression).
0064      * Returns the actual size of the uncompressed data.
0065      * Errors are reported by throwing exceptions.
0066      */
0067     static unsigned int uncompressBuffer(unsigned char* inputBuffer,
0068                                          unsigned int inputSize,
0069                                          std::vector<unsigned char>& outputBuffer,
0070                                          unsigned int expectedFullSize);
0071 
0072     static unsigned int uncompressBufferLZMA(unsigned char* inputBuffer,
0073                                              unsigned int inputSize,
0074                                              std::vector<unsigned char>& outputBuffer,
0075                                              unsigned int expectedFullSize,
0076                                              bool hasHeader = true);
0077 
0078     static unsigned int uncompressBufferZSTD(unsigned char* inputBuffer,
0079                                              unsigned int inputSize,
0080                                              std::vector<unsigned char>& outputBuffer,
0081                                              unsigned int expectedFullSize,
0082                                              bool hasHeader = true);
0083 
0084   protected:
0085     static void declareStreamers(SendDescs const& descs);
0086     static void buildClassCache(SendDescs const& descs);
0087     void resetAfterEndRun();
0088 
0089   private:
0090     class EventPrincipalHolder : public EDProductGetter {
0091     public:
0092       EventPrincipalHolder();
0093       ~EventPrincipalHolder() override;
0094 
0095       WrapperBase const* getIt(ProductID const& id) const override;
0096       std::optional<std::tuple<edm::WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
0097                                                                                          unsigned int) const override;
0098       void getThinnedProducts(ProductID const& pid,
0099                               std::vector<WrapperBase const*>& wrappers,
0100                               std::vector<unsigned int>& keys) const override;
0101       OptionalThinnedKey getThinnedKeyFrom(ProductID const&, unsigned int, ProductID const&) const override;
0102 
0103       unsigned int transitionIndex_() const override;
0104 
0105       void setEventPrincipal(EventPrincipal* ep);
0106 
0107     private:
0108       // We don't own the principal.  The lifetime must be managed externally.
0109       EventPrincipal const* eventPrincipal_;
0110     };
0111 
0112     void read(EventPrincipal& eventPrincipal) override;
0113 
0114     void setRun(RunNumber_t r) override;
0115 
0116     edm::propagate_const<TClass*> tc_;
0117     std::vector<unsigned char> dest_;
0118     TBufferFile xbuf_;
0119     edm::propagate_const<std::unique_ptr<SendEvent>> sendEvent_;
0120     edm::propagate_const<std::unique_ptr<EventPrincipalHolder>> eventPrincipalHolder_;
0121     std::vector<edm::propagate_const<std::unique_ptr<EventPrincipalHolder>>> streamToEventPrincipalHolders_;
0122     bool adjustEventToNewProductRegistry_;
0123 
0124     std::string processName_;
0125     unsigned int protocolVersion_;
0126   };  //end-of-class-def
0127 }  // namespace edm
0128 
0129 #endif