File indexing completed on 2024-05-11 03:33:55
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <iostream>
0015 #include <fstream>
0016 #include <cassert>
0017 #include "zlib.h"
0018 #include "lzma.h"
0019
0020
0021 #include "HDF5ProductResolver.h"
0022 #include "convertSyncValue.h"
0023 #include "FWCore/Framework/interface/EventSetupRecordImpl.h"
0024 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0025 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0026 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0027 #include "FWCore/Utilities/interface/Exception.h"
0028 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0029
0030 #include "h5_DataSet.h"
0031 #include "h5_Attribute.h"
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044 HDF5ProductResolver::HDF5ProductResolver(edm::SerialTaskQueue* iQueue,
0045 std::unique_ptr<cond::serialization::SerializationHelperBase> iHelper,
0046 cms::h5::File const* iFile,
0047 std::string const& iFileName,
0048 cond::hdf5::Compression iCompression,
0049 cond::hdf5::Record const* iRecord,
0050 cond::hdf5::DataProduct const* iDataProduct)
0051 : edm::eventsetup::ESSourceProductResolverBase(),
0052 queue_(iQueue),
0053 helper_(std::move(iHelper)),
0054 file_(iFile),
0055 fileName_(iFileName),
0056 record_(iRecord),
0057 dataProduct_(iDataProduct),
0058 compression_(iCompression) {}
0059
0060
0061
0062
0063
0064
0065 HDF5ProductResolver::~HDF5ProductResolver() {}
0066
0067
0068
0069
0070
0071 void HDF5ProductResolver::prefetchAsyncImpl(edm::WaitingTaskHolder iTask,
0072 edm::eventsetup::EventSetupRecordImpl const& iRecord,
0073 edm::eventsetup::DataKey const& iKey,
0074 edm::EventSetupImpl const*,
0075 edm::ServiceToken const&,
0076 edm::ESParentContext const& iParent) noexcept {
0077 prefetchAsyncImplTemplate(
0078 [this, iov = iRecord.validityInterval(), iParent, &iRecord](auto& iGroup, auto iActivity) {
0079 queue_->push(iGroup, [this, &iGroup, act = std::move(iActivity), iov, iParent, &iRecord] {
0080 CMS_SA_ALLOW try {
0081 edm::ESModuleCallingContext context(providerDescription(),
0082 reinterpret_cast<std::uintptr_t>(this),
0083 edm::ESModuleCallingContext::State::kRunning,
0084 iParent);
0085 iRecord.activityRegistry()->preESModuleSignal_.emit(iRecord.key(), context);
0086 struct EndGuard {
0087 EndGuard(edm::eventsetup::EventSetupRecordImpl const& iRecord,
0088 edm::ESModuleCallingContext const& iContext)
0089 : record_{iRecord}, context_{iContext} {}
0090 ~EndGuard() { record_.activityRegistry()->postESModuleSignal_.emit(record_.key(), context_); }
0091 edm::eventsetup::EventSetupRecordImpl const& record_;
0092 edm::ESModuleCallingContext const& context_;
0093 } guardAR(iRecord, context);
0094
0095 auto index = indexForInterval(iov);
0096
0097 readFromHDF5api(index);
0098 iGroup.run(std::move(act));
0099 exceptPtr_ = {};
0100 } catch (...) {
0101 exceptPtr_ = std::current_exception();
0102 }
0103 });
0104 },
0105 []() { return true; },
0106 std::move(iTask),
0107 iRecord,
0108 iKey,
0109 iParent);
0110 }
0111
0112 std::ptrdiff_t HDF5ProductResolver::indexForInterval(edm::ValidityInterval const& iIOV) const {
0113 using namespace cond::hdf5;
0114 auto firstSync = convertSyncValue(iIOV.first(), record_->iovIsRunLumi_);
0115
0116 auto itFound = findMatchingFirst(record_->iovFirsts_, firstSync);
0117 assert(itFound != record_->iovFirsts_.end());
0118
0119 return itFound - record_->iovFirsts_.begin();
0120 }
0121
0122 void HDF5ProductResolver::readFromHDF5api(std::ptrdiff_t iIndex) {
0123 auto payloadRef = dataProduct_->payloadForIOVs_[iIndex];
0124 auto ds = file_->derefDataSet(payloadRef);
0125 storageSize_ = ds->storageSize();
0126 if (storageSize_ == 0) {
0127 return;
0128 }
0129
0130 fileOffset_ = ds->fileOffset();
0131 memSize_ = ds->findAttribute("memsize")->readUInt32();
0132 type_ = ds->findAttribute("type")->readString();
0133 }
0134
0135 void HDF5ProductResolver::prefetch(edm::eventsetup::DataKey const& iKey, edm::EventSetupRecordDetails iRecord) {
0136 if (exceptPtr_) {
0137 rethrow_exception(exceptPtr_);
0138 }
0139 if (storageSize_ == 0) {
0140 return;
0141 }
0142 threadFriendlyPrefetch(fileOffset_, storageSize_, memSize_, type_);
0143 }
0144
0145 std::vector<char> HDF5ProductResolver::decompress_zlib(std::vector<char> compressedBuffer, std::size_t iMemSize) const {
0146 std::vector<char> buffer;
0147 if (iMemSize == compressedBuffer.size()) {
0148
0149
0150 buffer = std::move(compressedBuffer);
0151 } else {
0152
0153 z_stream strm;
0154 strm.zalloc = Z_NULL;
0155 strm.zfree = Z_NULL;
0156 strm.opaque = Z_NULL;
0157 strm.avail_in = 0;
0158 strm.next_in = Z_NULL;
0159 auto ret = inflateInit(&strm);
0160 assert(ret == Z_OK);
0161
0162 strm.avail_in = compressedBuffer.size();
0163 strm.next_in = reinterpret_cast<unsigned char*>(compressedBuffer.data());
0164
0165 buffer = std::vector<char>(iMemSize);
0166 strm.avail_out = buffer.size();
0167 strm.next_out = reinterpret_cast<unsigned char*>(buffer.data());
0168 ret = inflate(&strm, Z_FINISH);
0169 assert(ret != Z_STREAM_ERROR);
0170
0171 assert(ret == Z_STREAM_END);
0172
0173 (void)inflateEnd(&strm);
0174 }
0175 return buffer;
0176 }
0177
0178 std::vector<char> HDF5ProductResolver::decompress_lzma(std::vector<char> compressedBuffer, std::size_t iMemSize) const {
0179 std::vector<char> buffer;
0180 if (iMemSize == compressedBuffer.size()) {
0181
0182
0183 buffer = std::move(compressedBuffer);
0184 } else {
0185
0186 lzma_stream stream = LZMA_STREAM_INIT;
0187
0188 auto returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
0189 if (returnStatus != LZMA_OK) {
0190 throw cms::Exception("H5CondFailedDecompress") << "failed to setup lzma";
0191 }
0192
0193 stream.next_in = reinterpret_cast<uint8_t*>(compressedBuffer.data());
0194 stream.avail_in = compressedBuffer.size();
0195
0196 buffer = std::vector<char>(iMemSize);
0197 stream.next_out = reinterpret_cast<uint8_t*>(buffer.data());
0198 stream.avail_out = buffer.size();
0199
0200 returnStatus = lzma_code(&stream, LZMA_FINISH);
0201 lzma_end(&stream);
0202 if (returnStatus != LZMA_STREAM_END) {
0203 throw cms::Exception("H5CondFailedDecompress") << "failed to decompress buffer using lzma";
0204 }
0205 }
0206 return buffer;
0207 }
0208
0209 void HDF5ProductResolver::threadFriendlyPrefetch(uint64_t iFileOffset,
0210 std::size_t iStorageSize,
0211 std::size_t iMemSize,
0212 const std::string& iTypeName) {
0213
0214
0215
0216 std::vector<char> compressedBuffer(iStorageSize);
0217 std::fstream file(fileName_.c_str());
0218 file.seekg(iFileOffset);
0219 file.read(compressedBuffer.data(), compressedBuffer.size());
0220
0221 std::vector<char> buffer;
0222 if (compression_ == cond::hdf5::Compression::kZLIB) {
0223 buffer = decompress_zlib(std::move(compressedBuffer), iMemSize);
0224 } else if (compression_ == cond::hdf5::Compression::kLZMA) {
0225 buffer = decompress_lzma(std::move(compressedBuffer), iMemSize);
0226 } else {
0227 buffer = std::move(compressedBuffer);
0228 }
0229
0230 std::stringbuf sBuffer;
0231 sBuffer.pubsetbuf(&buffer[0], buffer.size());
0232 data_ = helper_->deserialize(sBuffer, iTypeName);
0233 if (data_.get() == nullptr) {
0234 throw cms::Exception("H5CondFailedDeserialization")
0235 << "failed to deserialize: buffer size:" << buffer.size() << " type: '" << iTypeName << "'";
0236 }
0237 }
0238
0239 void HDF5ProductResolver::invalidateCache() {
0240 ESSourceProductResolverBase::invalidateCache();
0241 data_ = cond::serialization::unique_void_ptr();
0242 }
0243
0244
0245
0246
0247 void const* HDF5ProductResolver::getAfterPrefetchImpl() const { return data_.get(); }
0248
0249
0250
0251