内存池核心介绍

废话不多说,show you code.

我实现了两套内存池,一个是固定大小的内存池,一个是多重不同大小的内存池。

Fixed size memory pool

在这里插入图片描述

设计思路:

我们一个个看,首先我们定义了一个chunk, chunk 里面包含了n个block, 每个block都存放着一个data数据和一个next的指针,这个指针指向下一个block,形成一个链表。我们分配的时候是以一个个chunk来分配的,我们会把空闲的block 存放在freelist 里面。所以每次分配的时候都会先从freelist 中查看是否有空闲的block, 如果没有,再分配一个chunk。 但是呢,我们内存池考虑到thread-safe, 又要考虑到性能,所以使用了无锁操作,也就是原子操作(atomic)。 但是实际下来还是性能比较差,所以进一步优化使用threadcache, 也就是每个thread_local。这样就保证了每个thread都有自己的一份cache, 每次先分配的内存的时候就先从cache 分配,分配完再从freelist 中取。最后我们会把每个chunk数据push到LockFreeStack 里面管理,避免数据泄露。

LockFreeStack

无锁栈代码比较简单:

  • 我们首先定义链表的节点Node,这个类是个模板类型的template, T就是数据的格式,next是指向下个节点的指针。Node的构造函数声明了一个可变参数模板,允许接受任意数量和类型的参数,并使用std::forward 完美转发到data成员的构造函数。
struct Node {T data;std::atomic<Node*> next;template<typename... Args>Node(Args&&... args): data(std::forward<Args>(args)...), next(nullptr) {}
};
std::atomic<Node*> head;
  • push 操作:

    • 使用std::move(item)将传入的item值移动到新节点,避免不必要的复制, 并且利用了Node类中的完美转发构造函数
    • 使用std::memory_order_relaxed原子加载当前的头节点, 目前没有同步需求。
    • 使用头插法把新节点插入到当前节点的前一个,这里循环条件使用CAS(Compare-And-Swap)操作尝试更新头节点
      • compare_exchange_weak会比较headoldNode是否相等
      • 如果相等,则将head设为newNode并返回true,循环结束
      • 如果不相等(可能其他线程修改了head),则将oldNode更新为当前的head值并返回false,循环继续
    void push(T item) {auto* newNode = new Node(std::move(item));Node* oldNode = head.load(std::memory_order_relaxed);do{newNode->next.store(oldNode, std::memory_order_relaxed);} while(!head.compare_exchange_weak(oldNode, newNode, std::memory_order_release, std::memory_order_relaxed));
    }
    
  • pop操作:

    • 读取当前栈顶节点指针,使用宽松内存序
    • 如果头节点为nullptr,表示栈为空,返回false
    • 否则:使用CAS操作原子地更新头节点:
      • 比较当前头节点是否仍为oldNode
      • 若相同,将头节点更新为oldNode->next并返回true
      • 若不同(其他线程已修改头节点),则更新oldNode为当前头节点值并返回false
    • 成功更新头节点后,将原头节点的数据移动到结果变量
    • 释放原头节点内存
    • 返回操作成功
    bool pop(T& result) {Node* oldNode = head.load(std::memory_order_relaxed);if (oldNode) {if (head.compare_exchange_weak(oldNode, oldNode->next.load(std::memory_order_relaxed), std::memory_order_release, std::memory_order_relaxed)) {result = std::move(oldNode->data);delete oldNode;return true;}}return false;
    }
    

    这里我们为什么使用compare_exchange_weak 而不是compare_exchange_strong 呢?

    • compare_exchange_weak允许在某些平台上"假失败"(spurious failure),即使比较的值实际相等,但在实现上可能会返回失败。这种设计让它能够:
      • 在某些CPU架构上有更高的性能
      • 避免昂贵的内存栅栏(memory barrier)操作
      • 特别在ARM等架构上实现更高效
    • compare_exchange_strong: 保证只在实际值不等于期望值时返回失败, 如果失败需要重试,性能开销比较大。
    • compare_exchange_weak: 可能出现假失败,但在循环中使用时,这种差异不影响结果正确性

    LockFreeFixedSizePool

    无锁的固定大小内存池。

    • 首先看下我们的block的定义,这个结构体包含三个主要部分:

      1. alignas(T)指令:确保Block结构体与T类型有相同的对齐要求,防止潜在的内存对齐问题
      2. data数组
        • 类型:std::byte数组,大小为sizeof(T)
        • 作用:存储T类型对象的原始内存
        • 布局:位于Block结构体起始位置
      3. next指针
        • 类型:std::atomic<Block*>
        • 作用:指向链表中的下一个Block,用于构建无锁链表
        • 初始值:nullptr
      4. as_object()方法
        • 功能:将data数组转换为T类型指针
        • 实现:使用reinterpret_cast进行类型转换
        • 返回:指向data内存区域的T*指针
      5. 当对象被释放时,Block不会被系统回收,而是返回到池中供后续使用。每个Block作为无锁链表的节点,通过next原子指针链接。 将data数组放在结构体起始位置,使得T对象指针可直接转换为Block指针。 当用户从内存池获取内存时,用户拿到的是指向data数组的T*指针。因为data位于Block结构体的起始位置,并且通过alignas(T)保证了对齐要求,所以可以安全地在deallocate时,将用户的T*指针转回Block*指针。
      struct alignas(T) Block {std::byte data[sizeof(T)]; // 每个block的大小为Tstd::atomic<Block*> next{nullptr};T* as_object() { return reinterpret_cast<T*>(data);}
      };
      
    • chunk的结构体有两个成员:

      1. blocks: 一个指向 Block 数组的智能指针,使用 std::unique_ptr<Block[]> 管理内存
      2. count: 表示这个 Chunk 中包含的 Block 数量.
      3. Chunk 实现了批量内存分配策略:
        • 一次性分配多个 Block 对象
        • 默认情况下,N = 4096,单个 Block 大小为 sizeof(Block)
        • 实际分配的 Block 数量由 BLOCK_PER_CHUNK = N / sizeof(Block) 决定
      4. std::unique_ptr<Block[]>Chunk 提供了自动内存管理:
        • Chunk 被销毁时,blocks 指向的内存会被自动释放
        • 避免了手动内存管理的复杂性和潜在的内存泄漏
      5. 构造函数接收一个 block_count 参数,并完成两个操作:
        1. 分配指定数量的 Block 对象:std::make_unique<Block[]>(block_count)
        2. 存储块数量:count(block_count)
      struct Chunk { // 每个chunk包含N个blockstd::unique_ptr<Block[]> blocks;size_t count;Chunk(size_t block_count):blocks(std::make_unique<Block[]>(block_count)), count(block_count) {}
      };
      
    • 再看下我们的ThreadCache的结构体的实现。ThreadCache是一个线程局部内的缓存系统,用于优化无锁内存池的性能。它主要为每个线程提供本地内存缓存,减少全局竞争,提高分配效率。

      • 这里为什么把缓存的block块设置为32, 并且每次取的大小设置为16。
        • 减少竞争:每次获取多个块,减少对全局空闲列表的访问频率。
        • 适中开销: 16个块是内存和性能的平衡点,既不会一次占用过多内存,又能有效减少同步操作
        • 批量效率:填充缓存时批量获取更高效,如在后面的fill_local_cache()中使用。
      • 这里我们为什么需要一个pool_instance 的指针。因为我们在内存归还给全局freelist, 所以这里我们传递一个指向对象的指针。
      struct ThreadCache {Block* blocks[32];int count = 0;static constexpr size_t BATCH_SIZE = 16; // 批量获取块的数量LockFreeFixedSizePool<T, N>* pool_instance = nullptr;~ThreadCache() {if (count > 0 && pool_instance) {return_thread_cache();}}// 将线程本地缓存归还全局链表void return_thread_cache() {if (count == 0 || !pool_instance) return;//将本地缓存链接成链表for (int i = 0; i < count - 1; ++i) {blocks[i]->next.store(blocks[i+1], std::memory_order_relaxed);}Block* old_head = pool_instance->free_list.load(std::memory_order_relaxed);Block* new_head = blocks[0];Block* new_tail = blocks[count - 1];do {new_tail->next.store(old_head, std::memory_order_relaxed);} while (!pool_instance->free_list.compare_exchange_weak(old_head, new_head, std::memory_order_release, std::memory_order_relaxed));count = 0;}
      };
      
    • 继续看下我们的类成员的定义:

      • 我们要把ThreadCache 声明为thread_local 类型,这样每个线程都会存在一份缓存
      • free_list 为全局缓存,所有空闲的block块都会链接到这个list 里面。
      • chunks 分配的chunk由LockFreeStack 来管理,避免内存泄露
      • allocate_count 为分配一个block的计数, deallocated_count 为归还一个block的计数。
      static inline thread_local ThreadCache local_cache;static inline constexpr size_t BLOCK_PER_CHUNK = N / sizeof(Block);std::atomic<Block*> free_list{nullptr};
      LockFreeStack<std::unique_ptr<Chunk>> chunks; 
      std::atomic<size_t> allocate_count{0};
      std::atomic<size_t> deallocated_count{0};
      
    • 分配一个新的chunk.

      • 先使用make_unique分配一个chunk的大小,其中包含BLOCK_PER_CHUNK的block。
      • 获取block的指针。
      • 这里我们先把block串联起一个链表,然后使用一次原子操作即可。
      • 第一次分配的都是空的,所有把所有的block都放进free_list 里面去。
      • 最后把chunk放到LockFreeStack里面管理。
      void allocate_new_chunk() {auto chunk = std::make_unique<Chunk>(BLOCK_PER_CHUNK);Block* blocks = chunk->blocks.get();// 准备链表,只在最后一步进行一次原子操作auto* first_block = &blocks[0];for (size_t i = 0; i < chunk->count - 1; ++i) {blocks[i].next.store(&blocks[i+1], std::memory_order_relaxed);}Block* old_head = free_list.load(std::memory_order_relaxed);do {blocks[chunk->count-1].next.store(old_head, std::memory_order_relaxed);} while(!free_list.compare_exchange_weak(old_head, first_block, std::memory_order_release, std::memory_order_relaxed));//将所有块链接到空闲列表chunks.push(std::move(chunk));
      }
      
    • 然后我们再看看怎么把free_list 里面的数据批量取出来放到thread_cache里面去。

      • 首先获取这个对象的instance, 如果缓存还有剩余,那么不需要从free_list里面取。
      • 否则检查free_list 有没有空闲的块了,如果没有分配一个新的chunk.
      • 这里我们直接从free_list 里面取ThreadCache::BATCH_SIZE 大小的block的链表,把获取的块放到chche的数组里面。
    // 批量获取块到本地缓存
    void fill_local_cache(ThreadCache& cache) {cache.pool_instance = this;// 如果本地缓存还有剩余,则直接返回if (cache.count > 0) return;// 从全局空闲列表中获取块int batch_size = ThreadCache::BATCH_SIZE;Block* head = nullptr;Block* tail = nullptr;Block* new_head = nullptr;Block* old_head = free_list.load(std::memory_order_acquire);do {// 如果全局链表为空,分配新的块if (!old_head) {allocate_new_chunk();old_head = free_list.load(std::memory_order_acquire);if (!old_head)  return;  // 如果还是仍然为空,返回}head = old_head;Block* current = old_head;int count = 0;while (count < batch_size - 1 && current->next.load(std::memory_order_relaxed)) {current = current->next.load(std::memory_order_relaxed);++count;}tail = current;new_head = tail->next.load(std::memory_order_relaxed);} while(!free_list.compare_exchange_weak(old_head, new_head, std::memory_order_release, std::memory_order_relaxed));// 将获取到的块存储到本地缓存Block* current = head;int i = 0;while (current != new_head) {cache.blocks[i++] = current;current = current->next.load(std::memory_order_relaxed);}cache.count = i;
    }
    
    • 分配函数
      • allocate 是参数是一个可变参数,允许n个参数原封不动传给T对象。
      • 我们首先获取数据从cache里面获取,如果没有,调用fill_local_cache 缓存cache数据。
      • 如果本地缓存失败了(一般不会出现,除非没内存了), 从free_list 分配一个block, 如果free_list 也没有了,则分配一个新的chunk。 最后返回指向数据的指针,也就是Block的首地址。
    template<typename... Args>
    T* allocate(Args&&... args) {if (local_cache.count == 0) {fill_local_cache(local_cache);}Block* block = nullptr;// 尝试从本地缓存获取块if (local_cache.count > 0) {block = local_cache.blocks[--local_cache.count];} else {// 本地缓存填充失败,直接从全局获取单个块Block* old_block = free_list.load(std::memory_order_acquire);// 先尝试从空闲列表中获取一个块while(old_block) {if (free_list.compare_exchange_weak(old_block, old_block->next.load())) {block = old_block;break;}}// 如果空闲列表为空,则分配一个新的块if (!block) {allocate_new_chunk();old_block = free_list.load(std::memory_order_acquire);while(old_block) {if (free_list.compare_exchange_weak(old_block, old_block->next.load())) {block = old_block;break;}}}}allocate_count.fetch_add(1);if (block) {// 构造新对象return new (block->as_object()) T(std::forward<Args>(args)...);}return nullptr;
    }
    
    • 最后看下deallocate的函数
      • 释放的时候先会去调用对象的析构函数
      • 把数据的指针转为block的指针
      • 这里我们有个策略就是如果本地缓存的数量,有可能归还超过总的大小,所以我们会预先检测本地缓存的是否已经大于了容量的80%, 如果已经超过的话,就先把一半的缓存还给free_list 里面,这样确保cache里面有足够的空间。
      • 最后再把block 放到cache里面去。
    void deallocate(T* ptr) {if (!ptr) return;ptr->~T();/*Block 结构的第一个成员就是 data 数组as_object() 方法返回的是 data 数组的起始地址通过 alignas(T) 确保 Block 的对齐要求满足 T 的需求因此,指向 T 对象的指针实际上就是指向 Block 结构体中 data 数组的指针,而 data 数组又在 Block 的起始位置,所以可以安全地将 T 指针转回 Block 指针。内存布局:Block 对象:+----------------------+| data[sizeof(T)]      | <-- 用户获得的 T* 指针指向这里+----------------------+| next                 |+----------------------+*/Block* block = reinterpret_cast<Block*>(ptr);local_cache.pool_instance = this;// 确定本地缓存容量const int cache_capacity = static_cast<int>(sizeof(local_cache.blocks) / sizeof(local_cache.blocks[0]));// 如果本地缓存已近容量的80%,预先归还一半if (local_cache.count >= cache_capacity * 0.8) {int half_count = local_cache.count / 2;// 构建链表for (int i = 0; i < half_count - 1; ++i) {local_cache.blocks[i]->next.store(local_cache.blocks[i + 1], std::memory_order_relaxed);}// 归还到全局链表Block* old_head = free_list.load(std::memory_order_relaxed);Block* cache_head = local_cache.blocks[0];Block* cache_tail = local_cache.blocks[half_count - 1];do {cache_tail->next.store(old_head, std::memory_order_relaxed);} while (!free_list.compare_exchange_weak(old_head, cache_head, std::memory_order_release, std::memory_order_relaxed));// 压缩剩余缓存for (int i = 0; i < local_cache.count - half_count; ++i) {local_cache.blocks[i] = local_cache.blocks[i + half_count];}local_cache.count -= half_count;}// 现在将新块添加到本地缓存local_cache.blocks[local_cache.count++] = block;deallocated_count.fetch_add(1, std::memory_order_relaxed);
    }
    
    • 测试结果(和标准的分配器)
      • 可以看到比标准的分配器还是快了一点。PS: 后面有时间可以继续优化下。

    Standard allocator time: 2450 microseconds
    Fixed size pool time: 1790 microseconds

源代码: MemoryPool

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/86787.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/86787.shtml
英文地址,请注明出处:http://en.pswp.cn/web/86787.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu安装docker遇到权限问题

问题现象&#xff1a; 使用snap安装的docker&#xff0c;执行docker build命令构建景象时报错&#xff1a; [] Building 0.1s (1/1) FINISHED docker:default > [internal] load build definition from Dockerfile 0.0s > > transferring dockerfile: 2B 0.0s ERROR:…

在Linux系统中部署Java项目

1.在Linux中启动mysql的服务: systemctl start mysql可以采用以下代码查看状态: systemctl status mysql如下图展示绿色代表启动成功 2.之后进入mysql mysql -uroot -p输入自己的密码&#xff0c;这里的密码不会显示,直接输入即可 3.在DG中连接Linux的数据库 4.修改配置文件…

C++洛谷P1002 过河卒

题目 链接&#xff1a;https://www.luogu.com.cn/problem/P1002 解析 这道题适用于了解动态规划的同学。 变量初始化 初始化B点坐标&#xff08;n, m&#xff09;和马的坐标&#xff08;a, b&#xff09; 初始化方向数组和动态规划数组 long long dp[30][30]; int dx[8] …

BlogX项目Go-gin--根据IP获取地理位置

先定义一个函数来判断IP地址是否为内网&#xff0c;归为工具类 // utils/ip/enter.go package ipimport "net"func HasLocalIPAddr(ip string) bool {return HasLocalIP(net.ParseIP(ip)) }// HasLocalIP 检测 IP 地址是否是内网地址 // 通过直接对比ip段范围效率更…

鸿蒙系统(HarmonyOS)应用开发之实现瀑布流图片展示效果

项目概述 科技图库是一款基于鸿蒙系统&#xff08;HarmonyOS&#xff09;开发的高品质图片浏览应用&#xff0c;专注于展示精选科技主题图片。应用采用现代化的瀑布流布局&#xff0c;为用户提供流畅、直观的浏览体验&#xff0c;让科技之美尽收眼底。 主要功能 1. 瀑布流布…

【fish-speech】新模型openaudio-s1-mini尝鲜

一、配置 显卡&#xff1a;v100&#xff08;测试简短语句&#xff0c;显存实际占用不足6G&#xff09; 二、安装测试 1. 安装 1.1 下载源码 git clone https://github.com/fishaudio/fish-speech.git1.2 安装系统组件 apt install portaudio19-dev libsox-dev ffmpeg1.3 …

介绍Windows下的由Sysinternals开发的一些小工具

Sysinternals是一个开发了很多Windows下系统工具的公司&#xff0c;这些工具能极大地提高对Windows系统的深入认知。就像它的名字Sys(tem)internals&#xff0c;深入系统里面。这些工具都放在微软的网站上可以下载到。https://learn.microsoft.com/en-us/sysinternals/ 下载网…

云服务器环境下Linux系统epoll机制与高并发服务器优化实践

在当今云计算时代&#xff0c;云已成为企业部署高并发服务的首选平台。本文将深入探讨Linux系统核心的epoll机制如何赋能云环境下的高并发服务器&#xff0c;解析其底层工作原理与性能优势&#xff0c;并对比传统IO复用模型的差异&#xff0c;帮助开发者构建更高效的云端服务架…

Java爬虫实战指南:按关键字搜索京东商品

在电商领域&#xff0c;快速获取商品信息对于市场分析、选品上架、库存管理和价格策略制定等方面至关重要。京东作为国内领先的电商平台之一&#xff0c;提供了丰富的商品数据。虽然京东开放平台提供了官方API来获取商品信息&#xff0c;但有时使用爬虫技术来抓取数据也是一种有…

aspose.word在IIS后端DLL中高并发运行,线程安全隔离

aspose.word在IIS后端DLL中运行,加载很慢,如何为全部用户加载,再每个用户访问时在各自线程中直接可以打开WORD文件处理 Aspose.Words 在 IIS 中优化加载性能方案 针对 Aspose.Words 在 IIS 后端 DLL 中加载缓慢的问题&#xff0c;我们可以通过单例模式预加载组件并结合线程安…

链表题解——回文链表【LeetCode】

一、算法逻辑&#xff08;通顺讲解每一步思路&#xff09; 我们从 isPalindrome 这个主函数入手&#xff1a; 步骤 1&#xff1a;找到链表的中间节点 middleNode 使用 快慢指针法&#xff08;slow 和 fast&#xff09; 快指针一次走两步&#xff0c;慢指针一次走一步。 当快…

allegro 铜皮的直角边怎么快速变成多边形?

像这种&#xff1a; 变成这种&#xff1a; 解决方案&#xff1a; shape edit boundary 点击铺铜边缘就能裁剪

从厨房到代码台:用做菜思维理解iOS开发 - Swift入门篇②

从厨房到代码台&#xff1a;用做菜思维理解iOS开发 - Swift入门篇② 本章重点​ 理解App开发的整体流程熟悉Xcode主界面结构与常用分区跟着步骤动手创建第一个App项目&#xff0c;认识模拟器掌握"打扫厨房"高频快捷键&#xff0c;解决常见疑难杂症 1、目标 像一个专…

EloqCloud for KV 初体验:兼容redis的云原生KV数据库

最近在做一些AI应用的时候&#xff0c;我在想尝试利用redis的能力缓存一些信息&#xff0c;这使我想去找一个免费的redis来进行使用&#xff0c;在调研的过程中我发现了一款产品EloqCloud for KV可以提供类似的能力&#xff0c;于是尝试使用了一下&#xff0c;本文记录了这次体…

企业级路由器技术全解析:从基础原理到实战开发

简介 在当今数字化时代,路由器作为网络的核心设备,其技术深度与应用广度直接影响着企业网络的性能与安全性。本文将全面解析路由器的基础原理、工作机制以及企业级开发技术,从网络层寻址到路由协议算法,从安全配置到QoS实现,再到多厂商API开发实战,旨在帮助网络工程师和…

day041-web集群架构搭建

文章目录 0. 老男孩思想-高薪四板斧1. web集群架构图2. 搭建异地备份服务2.1 服务端-阿里云服务器2.1.1 查看rsync软件包2.1.2 添加rsync配置文件2.1.3 添加虚拟用户2.1.4 创建校验用户密码文件2.1.5 创建备份目录2.1.6 启动服务2.1.7 开放安全组端口2.1.8 发送检查邮件 2.2 客…

day44-Django RestFramework(drf)下

1.5 Django RestFramework(下) drf 内置了很多便捷的功能,在接下来的课程中会给大家依次讲解下面的内容: 快速上手请求的封装版本管理认证权限限流序列化视图条件搜索分页路由解析器10. 分页 在查看数据列表的API中,如果 数据量 比较大,肯定不能把所有的数据都展示给用…

机器学习基础 线性回归与 Softmax 回归

机器学习基础 线性回归与 Softmax 回归 文章目录 机器学习基础 线性回归与 Softmax 回归1. 最小二乘法1.1 数据集定义1.2 最小二乘的矩阵推导1.3 最小二乘的几何解释1.4 概率视角下的最小二乘估计 2. 正则化2.1 L1 范数与 L2 范数2.2 正则化的作用2.3 Lasso 回归的求解2.3.1 L-…

6.27_JAVA_面试(被抽到了)

1.MYSQL支持的存储引擎有哪些, 有什么区别 ? In-no-DB&#xff08;默认&#xff09;&#xff1a;支持事务安全&#xff08;数据库运行时&#xff0c;能保证数据的一致性、完整性&#xff09;&#xff0c;支持表行锁&#xff0c;支持物理和逻辑外键。占用磁盘空间大。 MEMORY&…

YOLOv13震撼发布:超图增强引领目标检测新纪元

YOLOV13最近发布了&#xff0c;速速来看。 论文标题&#xff1a;YOLOv13&#xff1a;融合超图增强的自适应视觉感知的实时目标检测 论文链接&#xff1a;https://arxiv.org/pdf/2506.17733 代码链接&#xff1a;https://github.com/iMoonLab/yolov13 话不多说&#xff0c;直…