Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-11 03:33:55

0001 // -*- C++ -*-
0002 //
0003 // Package:     CondCore/HDF5ESSource
0004 // Class  :     HDF5ProductResolver
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Christopher Jones
0010 //         Created:  Tue, 20 Jun 2023 13:52:59 GMT
0011 //
0012 
0013 // system include files
0014 #include <iostream>
0015 #include <fstream>
0016 #include <cassert>
0017 #include "zlib.h"
0018 #include "lzma.h"
0019 
0020 // user include files
0021 #include "HDF5ProductResolver.h"
0022 #include "convertSyncValue.h"
0023 #include "FWCore/Framework/interface/EventSetupRecordImpl.h"
0024 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0025 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0026 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0027 #include "FWCore/Utilities/interface/Exception.h"
0028 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0029 
0030 #include "h5_DataSet.h"
0031 #include "h5_Attribute.h"
0032 
0033 //
0034 // constants, enums and typedefs
0035 //
0036 
0037 //
0038 // static data member definitions
0039 //
0040 
0041 //
0042 // constructors and destructor
0043 //
0044 HDF5ProductResolver::HDF5ProductResolver(edm::SerialTaskQueue* iQueue,
0045                                          std::unique_ptr<cond::serialization::SerializationHelperBase> iHelper,
0046                                          cms::h5::File const* iFile,
0047                                          std::string const& iFileName,
0048                                          cond::hdf5::Compression iCompression,
0049                                          cond::hdf5::Record const* iRecord,
0050                                          cond::hdf5::DataProduct const* iDataProduct)
0051     : edm::eventsetup::ESSourceProductResolverBase(),
0052       queue_(iQueue),
0053       helper_(std::move(iHelper)),
0054       file_(iFile),
0055       fileName_(iFileName),
0056       record_(iRecord),
0057       dataProduct_(iDataProduct),
0058       compression_(iCompression) {}
0059 
0060 // HDF5ProductResolver::HDF5ProductResolver(const HDF5ProductResolver& rhs)
0061 // {
0062 //    // do actual copying here;
0063 // }
0064 
0065 HDF5ProductResolver::~HDF5ProductResolver() {}
0066 
0067 //
0068 // member functions
0069 //
0070 
0071 void HDF5ProductResolver::prefetchAsyncImpl(edm::WaitingTaskHolder iTask,
0072                                             edm::eventsetup::EventSetupRecordImpl const& iRecord,
0073                                             edm::eventsetup::DataKey const& iKey,
0074                                             edm::EventSetupImpl const*,
0075                                             edm::ServiceToken const&,
0076                                             edm::ESParentContext const& iParent) noexcept {
0077   prefetchAsyncImplTemplate(
0078       [this, iov = iRecord.validityInterval(), iParent, &iRecord](auto& iGroup, auto iActivity) {
0079         queue_->push(iGroup, [this, &iGroup, act = std::move(iActivity), iov, iParent, &iRecord] {
0080           CMS_SA_ALLOW try {
0081             edm::ESModuleCallingContext context(providerDescription(),
0082                                                 reinterpret_cast<std::uintptr_t>(this),
0083                                                 edm::ESModuleCallingContext::State::kRunning,
0084                                                 iParent);
0085             iRecord.activityRegistry()->preESModuleSignal_.emit(iRecord.key(), context);
0086             struct EndGuard {
0087               EndGuard(edm::eventsetup::EventSetupRecordImpl const& iRecord,
0088                        edm::ESModuleCallingContext const& iContext)
0089                   : record_{iRecord}, context_{iContext} {}
0090               ~EndGuard() { record_.activityRegistry()->postESModuleSignal_.emit(record_.key(), context_); }
0091               edm::eventsetup::EventSetupRecordImpl const& record_;
0092               edm::ESModuleCallingContext const& context_;
0093             } guardAR(iRecord, context);
0094 
0095             auto index = indexForInterval(iov);
0096 
0097             readFromHDF5api(index);
0098             iGroup.run(std::move(act));
0099             exceptPtr_ = {};
0100           } catch (...) {
0101             exceptPtr_ = std::current_exception();
0102           }
0103         });
0104       },
0105       []() { return true; },
0106       std::move(iTask),
0107       iRecord,
0108       iKey,
0109       iParent);
0110 }
0111 
0112 std::ptrdiff_t HDF5ProductResolver::indexForInterval(edm::ValidityInterval const& iIOV) const {
0113   using namespace cond::hdf5;
0114   auto firstSync = convertSyncValue(iIOV.first(), record_->iovIsRunLumi_);
0115 
0116   auto itFound = findMatchingFirst(record_->iovFirsts_, firstSync);
0117   assert(itFound != record_->iovFirsts_.end());
0118 
0119   return itFound - record_->iovFirsts_.begin();
0120 }
0121 
0122 void HDF5ProductResolver::readFromHDF5api(std::ptrdiff_t iIndex) {
0123   auto payloadRef = dataProduct_->payloadForIOVs_[iIndex];
0124   auto ds = file_->derefDataSet(payloadRef);
0125   storageSize_ = ds->storageSize();
0126   if (storageSize_ == 0) {
0127     return;
0128   }
0129 
0130   fileOffset_ = ds->fileOffset();
0131   memSize_ = ds->findAttribute("memsize")->readUInt32();
0132   type_ = ds->findAttribute("type")->readString();
0133 }
0134 
0135 void HDF5ProductResolver::prefetch(edm::eventsetup::DataKey const& iKey, edm::EventSetupRecordDetails iRecord) {
0136   if (exceptPtr_) {
0137     rethrow_exception(exceptPtr_);
0138   }
0139   if (storageSize_ == 0) {
0140     return;
0141   }
0142   threadFriendlyPrefetch(fileOffset_, storageSize_, memSize_, type_);
0143 }
0144 
0145 std::vector<char> HDF5ProductResolver::decompress_zlib(std::vector<char> compressedBuffer, std::size_t iMemSize) const {
0146   std::vector<char> buffer;
0147   if (iMemSize == compressedBuffer.size()) {
0148     //memory was not compressed
0149     //std::cout <<"NOT COMPRESSED"<<std::endl;
0150     buffer = std::move(compressedBuffer);
0151   } else {
0152     //zlib compression was used
0153     z_stream strm;
0154     strm.zalloc = Z_NULL;
0155     strm.zfree = Z_NULL;
0156     strm.opaque = Z_NULL;
0157     strm.avail_in = 0;
0158     strm.next_in = Z_NULL;
0159     auto ret = inflateInit(&strm);
0160     assert(ret == Z_OK);
0161 
0162     strm.avail_in = compressedBuffer.size();
0163     strm.next_in = reinterpret_cast<unsigned char*>(compressedBuffer.data());
0164 
0165     buffer = std::vector<char>(iMemSize);
0166     strm.avail_out = buffer.size();
0167     strm.next_out = reinterpret_cast<unsigned char*>(buffer.data());
0168     ret = inflate(&strm, Z_FINISH);
0169     assert(ret != Z_STREAM_ERROR);
0170     //if(ret != Z_STREAM_END) {std::cout <<"mem "<<memSize<<" "<<ret<<" out "<<strm.avail_out<<std::endl;}
0171     assert(ret == Z_STREAM_END);
0172 
0173     (void)inflateEnd(&strm);
0174   }
0175   return buffer;
0176 }
0177 
0178 std::vector<char> HDF5ProductResolver::decompress_lzma(std::vector<char> compressedBuffer, std::size_t iMemSize) const {
0179   std::vector<char> buffer;
0180   if (iMemSize == compressedBuffer.size()) {
0181     //memory was not compressed
0182     //std::cout <<"NOT COMPRESSED"<<std::endl;
0183     buffer = std::move(compressedBuffer);
0184   } else {
0185     // code 'cribbed' from ROOT
0186     lzma_stream stream = LZMA_STREAM_INIT;
0187 
0188     auto returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
0189     if (returnStatus != LZMA_OK) {
0190       throw cms::Exception("H5CondFailedDecompress") << "failed to setup lzma";
0191     }
0192 
0193     stream.next_in = reinterpret_cast<uint8_t*>(compressedBuffer.data());
0194     stream.avail_in = compressedBuffer.size();
0195 
0196     buffer = std::vector<char>(iMemSize);
0197     stream.next_out = reinterpret_cast<uint8_t*>(buffer.data());
0198     stream.avail_out = buffer.size();
0199 
0200     returnStatus = lzma_code(&stream, LZMA_FINISH);
0201     lzma_end(&stream);
0202     if (returnStatus != LZMA_STREAM_END) {
0203       throw cms::Exception("H5CondFailedDecompress") << "failed to decompress buffer using lzma";
0204     }
0205   }
0206   return buffer;
0207 }
0208 
0209 void HDF5ProductResolver::threadFriendlyPrefetch(uint64_t iFileOffset,
0210                                                  std::size_t iStorageSize,
0211                                                  std::size_t iMemSize,
0212                                                  const std::string& iTypeName) {
0213   //Done interacting with the hdf5 API
0214 
0215   //std::cout <<" prefetch "<<dataProduct_->fileOffsets_[index]<<" "<<dataProduct_->storageSizes_[index]<<" "<<memSize<<std::endl;
0216   std::vector<char> compressedBuffer(iStorageSize);
0217   std::fstream file(fileName_.c_str());
0218   file.seekg(iFileOffset);
0219   file.read(compressedBuffer.data(), compressedBuffer.size());
0220 
0221   std::vector<char> buffer;
0222   if (compression_ == cond::hdf5::Compression::kZLIB) {
0223     buffer = decompress_zlib(std::move(compressedBuffer), iMemSize);
0224   } else if (compression_ == cond::hdf5::Compression::kLZMA) {
0225     buffer = decompress_lzma(std::move(compressedBuffer), iMemSize);
0226   } else {
0227     buffer = std::move(compressedBuffer);
0228   }
0229 
0230   std::stringbuf sBuffer;
0231   sBuffer.pubsetbuf(&buffer[0], buffer.size());
0232   data_ = helper_->deserialize(sBuffer, iTypeName);
0233   if (data_.get() == nullptr) {
0234     throw cms::Exception("H5CondFailedDeserialization")
0235         << "failed to deserialize: buffer size:" << buffer.size() << " type: '" << iTypeName << "'";
0236   }
0237 }
0238 
0239 void HDF5ProductResolver::invalidateCache() {
0240   ESSourceProductResolverBase::invalidateCache();
0241   data_ = cond::serialization::unique_void_ptr();
0242 }
0243 
0244 //
0245 // const member functions
0246 //
0247 void const* HDF5ProductResolver::getAfterPrefetchImpl() const { return data_.get(); }
0248 
0249 //
0250 // static member functions
0251 //