File indexing completed on 2025-03-13 02:32:07
0001 #ifndef IOPool_Streamer_StreamerInputSource_h
0002 #define IOPool_Streamer_StreamerInputSource_h
0003
0004
0005
0006
0007
0008
0009
0010
0011 #include "TBufferFile.h"
0012
0013 #include "FWCore/Framework/interface/Frameworkfwd.h"
0014 #include "FWCore/Sources/interface/RawInputSource.h"
0015 #include "FWCore/Utilities/interface/propagate_const.h"
0016
0017 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0018 #include "DataFormats/Common/interface/EDProductGetter.h"
0019
0020 #include <memory>
0021 #include <vector>
0022
0023 namespace edm {
0024 class BranchIDListHelper;
0025 class ParameterSetDescription;
0026 class ThinnedAssociationsHelper;
0027 }
0028
0029 namespace edm::streamer {
0030 class InitMsgView;
0031 class EventMsgView;
0032
0033 class StreamerInputSource : public RawInputSource {
0034 public:
0035 explicit StreamerInputSource(ParameterSet const& pset, InputSourceDescription const& desc);
0036 ~StreamerInputSource() override;
0037 static void fillDescription(ParameterSetDescription& description);
0038
0039 std::unique_ptr<SendJobHeader> deserializeRegistry(InitMsgView const& initView);
0040
0041 void deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent = false);
0042
0043
0044 uint32_t eventMetaDataChecksum(EventMsgView const& eventView) const;
0045
0046 void deserializeEventMetaData(EventMsgView const& eventView);
0047 void deserializeEvent(EventMsgView const& eventView);
0048
0049 uint32_t presentEventMetaDataChecksum() const { return eventMetaDataChecksum_; }
0050
0051
0052 void updateEventMetaData();
0053
0054 static void mergeIntoRegistry(SendJobHeader const& header, ProductRegistry&, bool subsequent);
0055
0056
0057
0058
0059 bool isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize);
0060
0061
0062
0063
0064 bool isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize);
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074 static unsigned int uncompressBuffer(unsigned char* inputBuffer,
0075 unsigned int inputSize,
0076 std::vector<unsigned char>& outputBuffer,
0077 unsigned int expectedFullSize);
0078
0079 static unsigned int uncompressBufferLZMA(unsigned char* inputBuffer,
0080 unsigned int inputSize,
0081 std::vector<unsigned char>& outputBuffer,
0082 unsigned int expectedFullSize,
0083 bool hasHeader = true);
0084
0085 static unsigned int uncompressBufferZSTD(unsigned char* inputBuffer,
0086 unsigned int inputSize,
0087 std::vector<unsigned char>& outputBuffer,
0088 unsigned int expectedFullSize,
0089 bool hasHeader = true);
0090
0091 protected:
0092 static void declareStreamers(SendDescs const& descs);
0093 static void buildClassCache(SendDescs const& descs);
0094 void resetAfterEndRun();
0095
0096 private:
0097 void deserializeEventCommon(EventMsgView const& eventView, bool isMetaData);
0098
0099 class EventPrincipalHolder : public EDProductGetter {
0100 public:
0101 EventPrincipalHolder();
0102 ~EventPrincipalHolder() override;
0103
0104 WrapperBase const* getIt(ProductID const& id) const override;
0105 std::optional<std::tuple<edm::WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
0106 unsigned int) const override;
0107 void getThinnedProducts(ProductID const& pid,
0108 std::vector<WrapperBase const*>& wrappers,
0109 std::vector<unsigned int>& keys) const override;
0110 OptionalThinnedKey getThinnedKeyFrom(ProductID const&, unsigned int, ProductID const&) const override;
0111
0112 unsigned int transitionIndex_() const override;
0113
0114 void setEventPrincipal(EventPrincipal* ep);
0115
0116 private:
0117
0118 EventPrincipal const* eventPrincipal_;
0119 };
0120
0121 void read(EventPrincipal& eventPrincipal) override;
0122
0123 void setRun(RunNumber_t r) override;
0124
0125 edm::propagate_const<TClass*> tc_;
0126 std::vector<unsigned char> dest_;
0127 TBufferFile xbuf_;
0128 edm::propagate_const<std::unique_ptr<SendEvent>> sendEvent_;
0129 edm::propagate_const<std::unique_ptr<EventPrincipalHolder>> eventPrincipalHolder_;
0130 std::vector<edm::propagate_const<std::unique_ptr<EventPrincipalHolder>>> streamToEventPrincipalHolders_;
0131
0132 std::string processName_;
0133 unsigned int protocolVersion_;
0134 uint32_t eventMetaDataChecksum_ = 0;
0135 };
0136 }
0137
0138 #endif