File indexing completed on 2023-10-25 09:50:23
0001 #ifndef HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
0002 #define HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
0003
0004 #include <cassert>
0005 #include <exception>
0006 #include <iomanip>
0007 #include <iostream>
0008 #include <map>
0009 #include <mutex>
0010 #include <optional>
0011 #include <sstream>
0012 #include <string>
0013 #include <tuple>
0014 #include <type_traits>
0015
0016 #include <alpaka/alpaka.hpp>
0017
0018 #include "HeterogeneousCore/AlpakaInterface/interface/traits.h"
0019 #include "HeterogeneousCore/AlpakaInterface/interface/AlpakaServiceFwd.h"
0020
0021
0022
0023 namespace cms::alpakatools {
0024
0025 namespace detail {
0026
0027 inline constexpr unsigned int power(unsigned int base, unsigned int exponent) {
0028 unsigned int power = 1;
0029 while (exponent > 0) {
0030 if (exponent & 1) {
0031 power = power * base;
0032 }
0033 base = base * base;
0034 exponent = exponent >> 1;
0035 }
0036 return power;
0037 }
0038
0039
0040 inline std::string as_bytes(size_t value) {
0041 if (value == std::numeric_limits<size_t>::max()) {
0042 return "unlimited";
0043 } else if (value >= (1 << 30) and value % (1 << 30) == 0) {
0044 return std::to_string(value >> 30) + " GB";
0045 } else if (value >= (1 << 20) and value % (1 << 20) == 0) {
0046 return std::to_string(value >> 20) + " MB";
0047 } else if (value >= (1 << 10) and value % (1 << 10) == 0) {
0048 return std::to_string(value >> 10) + " kB";
0049 } else {
0050 return std::to_string(value) + " B";
0051 }
0052 }
0053
0054 }
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085 template <typename TDev, typename TQueue>
0086 class CachingAllocator {
0087 public:
0088 #ifdef ALPAKA_ACC_GPU_CUDA_ENABLED
0089 friend class alpaka_cuda_async::AlpakaService;
0090 #endif
0091 #ifdef ALPAKA_ACC_GPU_HIP_ENABLED
0092 friend class alpaka_rocm_async::AlpakaService;
0093 #endif
0094 #ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
0095 friend class alpaka_serial_sync::AlpakaService;
0096 #endif
0097 #ifdef ALPAKA_ACC_CPU_B_TBB_T_SEQ_ENABLED
0098 friend class alpaka_tbb_async::AlpakaService;
0099 #endif
0100
0101 using Device = TDev;
0102 using Queue = TQueue;
0103 using Event = alpaka::Event<Queue>;
0104 using Buffer = alpaka::Buf<Device, std::byte, alpaka::DimInt<1u>, size_t>;
0105
0106
0107 static_assert(alpaka::isDevice<Device>, "TDev should be an alpaka Device type.");
0108 static_assert(alpaka::isQueue<Queue>, "TQueue should be an alpaka Queue type.");
0109 static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
0110 "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be the "
0111 "host CPU.");
0112
0113 struct CachedBytes {
0114 size_t free = 0;
0115 size_t live = 0;
0116 size_t requested = 0;
0117 };
0118
0119 explicit CachingAllocator(
0120 Device const& device,
0121 unsigned int binGrowth,
0122 unsigned int minBin,
0123
0124 unsigned int maxBin,
0125
0126 size_t maxCachedBytes,
0127 double maxCachedFraction,
0128
0129
0130 bool reuseSameQueueAllocations,
0131
0132 bool debug)
0133 : device_(device),
0134 binGrowth_(binGrowth),
0135 minBin_(minBin),
0136 maxBin_(maxBin),
0137 minBinBytes_(detail::power(binGrowth, minBin)),
0138 maxBinBytes_(detail::power(binGrowth, maxBin)),
0139 maxCachedBytes_(cacheSize(maxCachedBytes, maxCachedFraction)),
0140 reuseSameQueueAllocations_(reuseSameQueueAllocations),
0141 debug_(debug) {
0142 if (debug_) {
0143 std::ostringstream out;
0144 out << "CachingAllocator settings\n"
0145 << " bin growth " << binGrowth_ << "\n"
0146 << " min bin " << minBin_ << "\n"
0147 << " max bin " << maxBin_ << "\n"
0148 << " resulting bins:\n";
0149 for (auto bin = minBin_; bin <= maxBin_; ++bin) {
0150 auto binSize = detail::power(binGrowth, bin);
0151 out << " " << std::right << std::setw(12) << detail::as_bytes(binSize) << '\n';
0152 }
0153 out << " maximum amount of cached memory: " << detail::as_bytes(maxCachedBytes_);
0154 std::cout << out.str() << std::endl;
0155 }
0156 }
0157
0158 ~CachingAllocator() {
0159 {
0160
0161 std::scoped_lock lock(mutex_);
0162 assert(liveBlocks_.empty());
0163 assert(cachedBytes_.live == 0);
0164 }
0165
0166 freeAllCached();
0167 }
0168
0169
0170 CachedBytes cacheStatus() const {
0171 std::scoped_lock lock(mutex_);
0172 return cachedBytes_;
0173 }
0174
0175
0176 void* allocate(size_t bytes, Queue queue) {
0177
0178 BlockDescriptor block;
0179 block.queue = std::move(queue);
0180 block.requested = bytes;
0181 std::tie(block.bin, block.bytes) = findBin(bytes);
0182
0183
0184 if (not tryReuseCachedBlock(block)) {
0185 allocateNewBlock(block);
0186 }
0187
0188 return block.buffer->data();
0189 }
0190
0191
0192 void free(void* ptr) {
0193 std::scoped_lock lock(mutex_);
0194
0195 auto iBlock = liveBlocks_.find(ptr);
0196 if (iBlock == liveBlocks_.end()) {
0197 std::stringstream ss;
0198 ss << "Trying to free a non-live block at " << ptr;
0199 throw std::runtime_error(ss.str());
0200 }
0201
0202 BlockDescriptor block = std::move(iBlock->second);
0203 liveBlocks_.erase(iBlock);
0204 cachedBytes_.live -= block.bytes;
0205 cachedBytes_.requested -= block.requested;
0206
0207 bool recache = (cachedBytes_.free + block.bytes <= maxCachedBytes_);
0208 if (recache) {
0209 alpaka::enqueue(*(block.queue), *(block.event));
0210 cachedBytes_.free += block.bytes;
0211
0212
0213 cachedBlocks_.insert(std::make_pair(block.bin, block));
0214
0215 if (debug_) {
0216 std::ostringstream out;
0217 out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " returned " << block.bytes << " bytes at "
0218 << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << " , event "
0219 << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
0220 << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
0221 << " bytes) outstanding." << std::endl;
0222 std::cout << out.str() << std::endl;
0223 }
0224 } else {
0225
0226 if (debug_) {
0227 std::ostringstream out;
0228 out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
0229 << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
0230 << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
0231 << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
0232 << " bytes) outstanding." << std::endl;
0233 std::cout << out.str() << std::endl;
0234 }
0235 }
0236 }
0237
0238 private:
0239 struct BlockDescriptor {
0240 std::optional<Buffer> buffer;
0241 std::optional<Queue> queue;
0242 std::optional<Event> event;
0243 size_t bytes = 0;
0244 size_t requested = 0;
0245 unsigned int bin = 0;
0246
0247
0248 auto device() { return alpaka::getDev(*queue); }
0249 };
0250
0251 private:
0252
0253 size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const {
0254
0255 size_t totalMemory = alpaka::getMemBytes(device_);
0256 size_t memoryFraction = static_cast<size_t>(maxCachedFraction * totalMemory);
0257 size_t size = std::numeric_limits<size_t>::max();
0258 if (maxCachedBytes > 0 and maxCachedBytes < size) {
0259 size = maxCachedBytes;
0260 }
0261 if (memoryFraction > 0 and memoryFraction < size) {
0262 size = memoryFraction;
0263 }
0264 return size;
0265 }
0266
0267
0268 std::tuple<unsigned int, size_t> findBin(size_t bytes) const {
0269 if (bytes < minBinBytes_) {
0270 return std::make_tuple(minBin_, minBinBytes_);
0271 }
0272 if (bytes > maxBinBytes_) {
0273 throw std::runtime_error("Requested allocation size " + std::to_string(bytes) +
0274 " bytes is too large for the caching detail with maximum bin " +
0275 std::to_string(maxBinBytes_) +
0276 " bytes. You might want to increase the maximum bin size");
0277 }
0278 unsigned int bin = minBin_;
0279 size_t binBytes = minBinBytes_;
0280 while (binBytes < bytes) {
0281 ++bin;
0282 binBytes *= binGrowth_;
0283 }
0284 return std::make_tuple(bin, binBytes);
0285 }
0286
0287 bool tryReuseCachedBlock(BlockDescriptor& block) {
0288 std::scoped_lock lock(mutex_);
0289
0290
0291 const auto [begin, end] = cachedBlocks_.equal_range(block.bin);
0292 for (auto iBlock = begin; iBlock != end; ++iBlock) {
0293 if ((reuseSameQueueAllocations_ and (*block.queue == *(iBlock->second.queue))) or
0294 alpaka::isComplete(*(iBlock->second.event))) {
0295
0296 auto queue = std::move(*(block.queue));
0297
0298 block = iBlock->second;
0299 block.queue = std::move(queue);
0300
0301
0302 if (block.device() != alpaka::getDev(*(block.event))) {
0303 block.event = Event{block.device()};
0304 }
0305
0306
0307
0308 liveBlocks_[block.buffer->data()] = block;
0309
0310
0311 cachedBytes_.free -= block.bytes;
0312 cachedBytes_.live += block.bytes;
0313 cachedBytes_.requested += block.requested;
0314
0315 if (debug_) {
0316 std::ostringstream out;
0317 out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " reused cached block at "
0318 << block.buffer->data() << " (" << block.bytes << " bytes) for queue "
0319 << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get()
0320 << " (previously associated with queue " << iBlock->second.queue->m_spQueueImpl.get() << " , event "
0321 << iBlock->second.event->m_spEventImpl.get() << ")." << std::endl;
0322 std::cout << out.str() << std::endl;
0323 }
0324
0325
0326 cachedBlocks_.erase(iBlock);
0327 return true;
0328 }
0329 }
0330
0331 return false;
0332 }
0333
0334 Buffer allocateBuffer(size_t bytes, Queue const& queue) {
0335 if constexpr (std::is_same_v<Device, alpaka::Dev<Queue>>) {
0336
0337 return alpaka::allocBuf<std::byte, size_t>(device_, bytes);
0338 } else if constexpr (std::is_same_v<Device, alpaka::DevCpu>) {
0339
0340 return alpaka::allocMappedBuf<alpaka::Pltf<alpaka::Dev<Queue>>, std::byte, size_t>(device_, bytes);
0341 } else {
0342
0343 static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
0344 "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be "
0345 "the host CPU.");
0346 }
0347 }
0348
0349 void allocateNewBlock(BlockDescriptor& block) {
0350 try {
0351 block.buffer = allocateBuffer(block.bytes, *block.queue);
0352 } catch (std::runtime_error const& e) {
0353
0354 if (debug_) {
0355 std::ostringstream out;
0356 out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " failed to allocate " << block.bytes
0357 << " bytes for queue " << block.queue->m_spQueueImpl.get()
0358 << ", retrying after freeing cached allocations" << std::endl;
0359 std::cout << out.str() << std::endl;
0360 }
0361
0362 freeAllCached();
0363
0364
0365 block.buffer = allocateBuffer(block.bytes, *block.queue);
0366 }
0367
0368
0369 block.event = Event{block.device()};
0370
0371 {
0372 std::scoped_lock lock(mutex_);
0373 cachedBytes_.live += block.bytes;
0374 cachedBytes_.requested += block.requested;
0375
0376 liveBlocks_[block.buffer->data()] = block;
0377 }
0378
0379 if (debug_) {
0380 std::ostringstream out;
0381 out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " allocated new block at "
0382 << block.buffer->data() << " (" << block.bytes << " bytes associated with queue "
0383 << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get() << "." << std::endl;
0384 std::cout << out.str() << std::endl;
0385 }
0386 }
0387
0388 void freeAllCached() {
0389 std::scoped_lock lock(mutex_);
0390
0391 while (not cachedBlocks_.empty()) {
0392 auto iBlock = cachedBlocks_.begin();
0393 cachedBytes_.free -= iBlock->second.bytes;
0394
0395 if (debug_) {
0396 std::ostringstream out;
0397 out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << iBlock->second.bytes
0398 << " bytes.\n\t\t " << (cachedBlocks_.size() - 1) << " available blocks cached (" << cachedBytes_.free
0399 << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live << " bytes) outstanding."
0400 << std::endl;
0401 std::cout << out.str() << std::endl;
0402 }
0403
0404 cachedBlocks_.erase(iBlock);
0405 }
0406 }
0407
0408
0409 using CachedBlocks = std::multimap<unsigned int, BlockDescriptor>;
0410
0411 using BusyBlocks = std::map<void*, BlockDescriptor>;
0412
0413 inline static const std::string deviceType_ = alpaka::core::demangled<Device>;
0414
0415 mutable std::mutex mutex_;
0416 Device device_;
0417
0418 CachedBytes cachedBytes_;
0419 CachedBlocks cachedBlocks_;
0420 BusyBlocks liveBlocks_;
0421
0422 const unsigned int binGrowth_;
0423 const unsigned int minBin_;
0424 const unsigned int maxBin_;
0425
0426 const size_t minBinBytes_;
0427 const size_t maxBinBytes_;
0428 const size_t maxCachedBytes_;
0429
0430 const bool reuseSameQueueAllocations_;
0431 const bool debug_;
0432 };
0433
0434 }
0435
0436 #endif