Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-10-25 09:50:23

0001 #ifndef HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
0002 #define HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
0003 
0004 #include <cassert>
0005 #include <exception>
0006 #include <iomanip>
0007 #include <iostream>
0008 #include <map>
0009 #include <mutex>
0010 #include <optional>
0011 #include <sstream>
0012 #include <string>
0013 #include <tuple>
0014 #include <type_traits>
0015 
0016 #include <alpaka/alpaka.hpp>
0017 
0018 #include "HeterogeneousCore/AlpakaInterface/interface/traits.h"
0019 #include "HeterogeneousCore/AlpakaInterface/interface/AlpakaServiceFwd.h"
0020 
0021 // Inspired by cub::CachingDeviceAllocator
0022 
0023 namespace cms::alpakatools {
0024 
0025   namespace detail {
0026 
0027     inline constexpr unsigned int power(unsigned int base, unsigned int exponent) {
0028       unsigned int power = 1;
0029       while (exponent > 0) {
0030         if (exponent & 1) {
0031           power = power * base;
0032         }
0033         base = base * base;
0034         exponent = exponent >> 1;
0035       }
0036       return power;
0037     }
0038 
0039     // format a memory size in B/kB/MB/GB
0040     inline std::string as_bytes(size_t value) {
0041       if (value == std::numeric_limits<size_t>::max()) {
0042         return "unlimited";
0043       } else if (value >= (1 << 30) and value % (1 << 30) == 0) {
0044         return std::to_string(value >> 30) + " GB";
0045       } else if (value >= (1 << 20) and value % (1 << 20) == 0) {
0046         return std::to_string(value >> 20) + " MB";
0047       } else if (value >= (1 << 10) and value % (1 << 10) == 0) {
0048         return std::to_string(value >> 10) + " kB";
0049       } else {
0050         return std::to_string(value) + "  B";
0051       }
0052     }
0053 
0054   }  // namespace detail
0055 
0056   /*
0057    * The "memory device" identifies the memory space, i.e. the device where the memory is allocated.
0058    * A caching allocator object is associated to a single memory `Device`, set at construction time, and unchanged for
0059    * the lifetime of the allocator.
0060    *
0061    * Each allocation is associated to an event on a queue, that identifies the "synchronisation device" according to
0062    * which the synchronisation occurs.
0063    * The `Event` type depends only on the synchronisation `Device` type.
0064    * The `Queue` type depends on the synchronisation `Device` type and the queue properties, either `Sync` or `Async`.
0065    *
0066    * **Note**: how to handle different queue and event types in a single allocator ?  store and access type-punned
0067    * queues and events ?  or template the internal structures on them, but with a common base class ?
0068    * alpaka does rely on the compile-time type for dispatch.
0069    *
0070    * Common use case #1: accelerator's memory allocations
0071    *   - the "memory device" is the accelerator device (e.g. a GPU);
0072    *   - the "synchronisation device" is the same accelerator device;
0073    *   - the `Queue` type is usually always the same (either `Sync` or `Async`).
0074    *
0075    * Common use case #2: pinned host memory allocations
0076    *    - the "memory device" is the host device (e.g. system memory);
0077    *    - the "synchronisation device" is the accelerator device (e.g. a GPU) whose work queue will access the host;
0078    *      memory (direct memory access from the accelerator, or scheduling `alpaka::memcpy`/`alpaka::memset`), and can
0079    *      be different for each allocation;
0080    *    - the synchronisation `Device` _type_ could potentially be different, but memory pinning is currently tied to
0081    *      the accelerator's platform (CUDA, HIP, etc.), so the device type needs to be fixed to benefit from caching;
0082    *    - the `Queue` type can be either `Sync` _or_ `Async` on any allocation.
0083    */
0084 
0085   template <typename TDev, typename TQueue>
0086   class CachingAllocator {
0087   public:
0088 #ifdef ALPAKA_ACC_GPU_CUDA_ENABLED
0089     friend class alpaka_cuda_async::AlpakaService;
0090 #endif
0091 #ifdef ALPAKA_ACC_GPU_HIP_ENABLED
0092     friend class alpaka_rocm_async::AlpakaService;
0093 #endif
0094 #ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
0095     friend class alpaka_serial_sync::AlpakaService;
0096 #endif
0097 #ifdef ALPAKA_ACC_CPU_B_TBB_T_SEQ_ENABLED
0098     friend class alpaka_tbb_async::AlpakaService;
0099 #endif
0100 
0101     using Device = TDev;                 // the "memory device", where the memory will be allocated
0102     using Queue = TQueue;                // the queue used to submit the memory operations
0103     using Event = alpaka::Event<Queue>;  // the events used to synchronise the operations
0104     using Buffer = alpaka::Buf<Device, std::byte, alpaka::DimInt<1u>, size_t>;
0105 
0106     // The "memory device" type can either be the same as the "synchronisation device" type, or be the host CPU.
0107     static_assert(alpaka::isDevice<Device>, "TDev should be an alpaka Device type.");
0108     static_assert(alpaka::isQueue<Queue>, "TQueue should be an alpaka Queue type.");
0109     static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
0110                   "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be the "
0111                   "host CPU.");
0112 
0113     struct CachedBytes {
0114       size_t free = 0;       // total bytes freed and cached on this device
0115       size_t live = 0;       // total bytes currently in use oin this device
0116       size_t requested = 0;  // total bytes requested and currently in use on this device
0117     };
0118 
0119     explicit CachingAllocator(
0120         Device const& device,
0121         unsigned int binGrowth,          // bin growth factor;
0122         unsigned int minBin,             // smallest bin, corresponds to binGrowth^minBin bytes;
0123                                          // smaller allocations are rounded to this value;
0124         unsigned int maxBin,             // largest bin, corresponds to binGrowth^maxBin bytes;
0125                                          // larger allocations will fail;
0126         size_t maxCachedBytes,           // total storage for the allocator (0 means no limit);
0127         double maxCachedFraction,        // fraction of total device memory taken for the allocator (0 means no limit);
0128                                          // if both maxCachedBytes and maxCachedFraction are non-zero,
0129                                          // the smallest resulting value is used.
0130         bool reuseSameQueueAllocations,  // reuse non-ready allocations if they are in the same queue as the new one;
0131                                          // this is safe only if all memory operations are scheduled in the same queue
0132         bool debug)
0133         : device_(device),
0134           binGrowth_(binGrowth),
0135           minBin_(minBin),
0136           maxBin_(maxBin),
0137           minBinBytes_(detail::power(binGrowth, minBin)),
0138           maxBinBytes_(detail::power(binGrowth, maxBin)),
0139           maxCachedBytes_(cacheSize(maxCachedBytes, maxCachedFraction)),
0140           reuseSameQueueAllocations_(reuseSameQueueAllocations),
0141           debug_(debug) {
0142       if (debug_) {
0143         std::ostringstream out;
0144         out << "CachingAllocator settings\n"
0145             << "  bin growth " << binGrowth_ << "\n"
0146             << "  min bin    " << minBin_ << "\n"
0147             << "  max bin    " << maxBin_ << "\n"
0148             << "  resulting bins:\n";
0149         for (auto bin = minBin_; bin <= maxBin_; ++bin) {
0150           auto binSize = detail::power(binGrowth, bin);
0151           out << "    " << std::right << std::setw(12) << detail::as_bytes(binSize) << '\n';
0152         }
0153         out << "  maximum amount of cached memory: " << detail::as_bytes(maxCachedBytes_);
0154         std::cout << out.str() << std::endl;
0155       }
0156     }
0157 
0158     ~CachingAllocator() {
0159       {
0160         // this should never be called while some memory blocks are still live
0161         std::scoped_lock lock(mutex_);
0162         assert(liveBlocks_.empty());
0163         assert(cachedBytes_.live == 0);
0164       }
0165 
0166       freeAllCached();
0167     }
0168 
0169     // return a copy of the cache allocation status, for monitoring purposes
0170     CachedBytes cacheStatus() const {
0171       std::scoped_lock lock(mutex_);
0172       return cachedBytes_;
0173     }
0174 
0175     // Allocate given number of bytes on the current device associated to given queue
0176     void* allocate(size_t bytes, Queue queue) {
0177       // create a block descriptor for the requested allocation
0178       BlockDescriptor block;
0179       block.queue = std::move(queue);
0180       block.requested = bytes;
0181       std::tie(block.bin, block.bytes) = findBin(bytes);
0182 
0183       // try to re-use a cached block, or allocate a new buffer
0184       if (not tryReuseCachedBlock(block)) {
0185         allocateNewBlock(block);
0186       }
0187 
0188       return block.buffer->data();
0189     }
0190 
0191     // frees an allocation
0192     void free(void* ptr) {
0193       std::scoped_lock lock(mutex_);
0194 
0195       auto iBlock = liveBlocks_.find(ptr);
0196       if (iBlock == liveBlocks_.end()) {
0197         std::stringstream ss;
0198         ss << "Trying to free a non-live block at " << ptr;
0199         throw std::runtime_error(ss.str());
0200       }
0201       // remove the block from the list of live blocks
0202       BlockDescriptor block = std::move(iBlock->second);
0203       liveBlocks_.erase(iBlock);
0204       cachedBytes_.live -= block.bytes;
0205       cachedBytes_.requested -= block.requested;
0206 
0207       bool recache = (cachedBytes_.free + block.bytes <= maxCachedBytes_);
0208       if (recache) {
0209         alpaka::enqueue(*(block.queue), *(block.event));
0210         cachedBytes_.free += block.bytes;
0211         // after the call to insert(), cachedBlocks_ shares ownership of the buffer
0212         // TODO use std::move ?
0213         cachedBlocks_.insert(std::make_pair(block.bin, block));
0214 
0215         if (debug_) {
0216           std::ostringstream out;
0217           out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " returned " << block.bytes << " bytes at "
0218               << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << " , event "
0219               << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
0220               << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
0221               << " bytes) outstanding." << std::endl;
0222           std::cout << out.str() << std::endl;
0223         }
0224       } else {
0225         // if the buffer is not recached, it is automatically freed when block goes out of scope
0226         if (debug_) {
0227           std::ostringstream out;
0228           out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
0229               << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
0230               << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
0231               << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
0232               << " bytes) outstanding." << std::endl;
0233           std::cout << out.str() << std::endl;
0234         }
0235       }
0236     }
0237 
0238   private:
0239     struct BlockDescriptor {
0240       std::optional<Buffer> buffer;
0241       std::optional<Queue> queue;
0242       std::optional<Event> event;
0243       size_t bytes = 0;
0244       size_t requested = 0;  // for monitoring only
0245       unsigned int bin = 0;
0246 
0247       // the "synchronisation device" for this block
0248       auto device() { return alpaka::getDev(*queue); }
0249     };
0250 
0251   private:
0252     // return the maximum amount of memory that should be cached on this device
0253     size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const {
0254       // note that getMemBytes() returns 0 if the platform does not support querying the device memory
0255       size_t totalMemory = alpaka::getMemBytes(device_);
0256       size_t memoryFraction = static_cast<size_t>(maxCachedFraction * totalMemory);
0257       size_t size = std::numeric_limits<size_t>::max();
0258       if (maxCachedBytes > 0 and maxCachedBytes < size) {
0259         size = maxCachedBytes;
0260       }
0261       if (memoryFraction > 0 and memoryFraction < size) {
0262         size = memoryFraction;
0263       }
0264       return size;
0265     }
0266 
0267     // return (bin, bin size)
0268     std::tuple<unsigned int, size_t> findBin(size_t bytes) const {
0269       if (bytes < minBinBytes_) {
0270         return std::make_tuple(minBin_, minBinBytes_);
0271       }
0272       if (bytes > maxBinBytes_) {
0273         throw std::runtime_error("Requested allocation size " + std::to_string(bytes) +
0274                                  " bytes is too large for the caching detail with maximum bin " +
0275                                  std::to_string(maxBinBytes_) +
0276                                  " bytes. You might want to increase the maximum bin size");
0277       }
0278       unsigned int bin = minBin_;
0279       size_t binBytes = minBinBytes_;
0280       while (binBytes < bytes) {
0281         ++bin;
0282         binBytes *= binGrowth_;
0283       }
0284       return std::make_tuple(bin, binBytes);
0285     }
0286 
0287     bool tryReuseCachedBlock(BlockDescriptor& block) {
0288       std::scoped_lock lock(mutex_);
0289 
0290       // iterate through the range of cached blocks in the same bin
0291       const auto [begin, end] = cachedBlocks_.equal_range(block.bin);
0292       for (auto iBlock = begin; iBlock != end; ++iBlock) {
0293         if ((reuseSameQueueAllocations_ and (*block.queue == *(iBlock->second.queue))) or
0294             alpaka::isComplete(*(iBlock->second.event))) {
0295           // associate the cached buffer to the new queue
0296           auto queue = std::move(*(block.queue));
0297           // TODO cache (or remove) the debug information and use std::move()
0298           block = iBlock->second;
0299           block.queue = std::move(queue);
0300 
0301           // if the new queue is on different device than the old event, create a new event
0302           if (block.device() != alpaka::getDev(*(block.event))) {
0303             block.event = Event{block.device()};
0304           }
0305 
0306           // insert the cached block into the live blocks
0307           // TODO cache (or remove) the debug information and use std::move()
0308           liveBlocks_[block.buffer->data()] = block;
0309 
0310           // update the accounting information
0311           cachedBytes_.free -= block.bytes;
0312           cachedBytes_.live += block.bytes;
0313           cachedBytes_.requested += block.requested;
0314 
0315           if (debug_) {
0316             std::ostringstream out;
0317             out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " reused cached block at "
0318                 << block.buffer->data() << " (" << block.bytes << " bytes) for queue "
0319                 << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get()
0320                 << " (previously associated with queue " << iBlock->second.queue->m_spQueueImpl.get() << " , event "
0321                 << iBlock->second.event->m_spEventImpl.get() << ")." << std::endl;
0322             std::cout << out.str() << std::endl;
0323           }
0324 
0325           // remove the reused block from the list of cached blocks
0326           cachedBlocks_.erase(iBlock);
0327           return true;
0328         }
0329       }
0330 
0331       return false;
0332     }
0333 
0334     Buffer allocateBuffer(size_t bytes, Queue const& queue) {
0335       if constexpr (std::is_same_v<Device, alpaka::Dev<Queue>>) {
0336         // allocate device memory
0337         return alpaka::allocBuf<std::byte, size_t>(device_, bytes);
0338       } else if constexpr (std::is_same_v<Device, alpaka::DevCpu>) {
0339         // allocate pinned host memory accessible by the queue's platform
0340         return alpaka::allocMappedBuf<alpaka::Pltf<alpaka::Dev<Queue>>, std::byte, size_t>(device_, bytes);
0341       } else {
0342         // unsupported combination
0343         static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
0344                       "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be "
0345                       "the host CPU.");
0346       }
0347     }
0348 
0349     void allocateNewBlock(BlockDescriptor& block) {
0350       try {
0351         block.buffer = allocateBuffer(block.bytes, *block.queue);
0352       } catch (std::runtime_error const& e) {
0353         // the allocation attempt failed: free all cached blocks on the device and retry
0354         if (debug_) {
0355           std::ostringstream out;
0356           out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " failed to allocate " << block.bytes
0357               << " bytes for queue " << block.queue->m_spQueueImpl.get()
0358               << ", retrying after freeing cached allocations" << std::endl;
0359           std::cout << out.str() << std::endl;
0360         }
0361         // TODO implement a method that frees only up to block.bytes bytes
0362         freeAllCached();
0363 
0364         // throw an exception if it fails again
0365         block.buffer = allocateBuffer(block.bytes, *block.queue);
0366       }
0367 
0368       // create a new event associated to the "synchronisation device"
0369       block.event = Event{block.device()};
0370 
0371       {
0372         std::scoped_lock lock(mutex_);
0373         cachedBytes_.live += block.bytes;
0374         cachedBytes_.requested += block.requested;
0375         // TODO use std::move() ?
0376         liveBlocks_[block.buffer->data()] = block;
0377       }
0378 
0379       if (debug_) {
0380         std::ostringstream out;
0381         out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " allocated new block at "
0382             << block.buffer->data() << " (" << block.bytes << " bytes associated with queue "
0383             << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get() << "." << std::endl;
0384         std::cout << out.str() << std::endl;
0385       }
0386     }
0387 
0388     void freeAllCached() {
0389       std::scoped_lock lock(mutex_);
0390 
0391       while (not cachedBlocks_.empty()) {
0392         auto iBlock = cachedBlocks_.begin();
0393         cachedBytes_.free -= iBlock->second.bytes;
0394 
0395         if (debug_) {
0396           std::ostringstream out;
0397           out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << iBlock->second.bytes
0398               << " bytes.\n\t\t  " << (cachedBlocks_.size() - 1) << " available blocks cached (" << cachedBytes_.free
0399               << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live << " bytes) outstanding."
0400               << std::endl;
0401           std::cout << out.str() << std::endl;
0402         }
0403 
0404         cachedBlocks_.erase(iBlock);
0405       }
0406     }
0407 
0408     // TODO replace with a tbb::concurrent_multimap ?
0409     using CachedBlocks = std::multimap<unsigned int, BlockDescriptor>;  // ordered by the allocation bin
0410     // TODO replace with a tbb::concurrent_map ?
0411     using BusyBlocks = std::map<void*, BlockDescriptor>;  // ordered by the address of the allocated memory
0412 
0413     inline static const std::string deviceType_ = alpaka::core::demangled<Device>;
0414 
0415     mutable std::mutex mutex_;
0416     Device device_;  // the device where the memory is allocated
0417 
0418     CachedBytes cachedBytes_;
0419     CachedBlocks cachedBlocks_;  // Set of cached device allocations available for reuse
0420     BusyBlocks liveBlocks_;      // map of pointers to the live device allocations currently in use
0421 
0422     const unsigned int binGrowth_;  // Geometric growth factor for bin-sizes
0423     const unsigned int minBin_;
0424     const unsigned int maxBin_;
0425 
0426     const size_t minBinBytes_;
0427     const size_t maxBinBytes_;
0428     const size_t maxCachedBytes_;  // Maximum aggregate cached bytes per device
0429 
0430     const bool reuseSameQueueAllocations_;
0431     const bool debug_;
0432   };
0433 
0434 }  // namespace cms::alpakatools
0435 
0436 #endif  // HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h