在 Go 语言中,goroutine 的轻量特性使得高并发编程变得异常简单。然而,随着并发量的增加,频繁创建对象和无限制启动 goroutine 也可能带来内存浪费、GC 压力和资源抢占等问题。为了解决这些隐患,协程池成为常用的优化手段。用于控制并发数量、避免系统过载。本文将简要介绍golang 中大名鼎鼎的 ants 协程池库的实现原理。


ants包仓库 : https://github.com/panjf2000/ants

为什么用协程池?

  • 提升性能:主要面向一类场景:大批量轻量级并发任务,任务执行成本与协程创建/销毁成本量级接近;
  • 动态调配并发资源 : 能够动态调整所需的协程数量以及各个模块的并发度上限;
  • 协程生命周期控制:实时查看当前全局并发的协程数量;有一个统一的紧急入口释放全局协程.

1. 使用方法

安装ants

go get -u github.com/panjf2000/ants/v2

1.1 创建协程池 NewPool(size int)

用于创建一个容量为 size 的协程池。默认情况下,协程池不会自动扩容,因此超出容量限制的任务会等待空闲 worker。

import "github.com/panjf2000/ants/v2"var pool *ants.Poolfunc init() {var err errorpool, err = ants.NewPool(10) // 创建容量为10的协程池if err != nil {log.Fatalf("Failed to create goroutine pool: %v", err)}
}
  • NewPool() 返回的是一个可复用的固定容量协程池,内部通过任务队列与 worker 协同处理。

1.2 提交任务 Submit(task func())

协程池的核心方法

// Submit submits a task to the pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error 

使用 Submit() 提交一个函数类型任务给协程池异步执行

示例 :

err := pool.Submit(func() {fmt.Println("Task executed by goroutine:", runtime.NumGoroutine())
})
if err != nil {log.Println("Failed to submit task:", err)
}
  • 每次调用 Submit() 不会阻塞主线程。

  • 如果当前运行的 goroutine 已达到上限,任务将等待空闲 worker。

1.3 释放协程池 Release()

释放协程池资源,释放后协程池不再接受新的任务提交。

pool.Release()

⚠️ 注意:一旦调用 Release(),协程池将被永久关闭,不能再次使用。再次提交任务将 panic。

1.4 查询当前运行数 Running()

适合用于实时监控协程池负载状态。

fmt.Printf("Running goroutines: %d\n", pool.Running())

适合用于实时监控协程池负载状态。

1.5 池容量

获取池容量 Cap()
返回协程池的最大容量(即最大 goroutine 数量)。可用于与 Running() 搭配分析使用率。

fmt.Printf("Pool capacity: %d\n", pool.Cap())

动态调整容量 Tune(newSize int)
在运行时动态调整协程池容量,适应系统负载变化。

pool.Tune(20) // 将容量调整为20
  • 扩容会立即生效。
  • 缩容后,多余的 worker 会在任务完成后自动回收。
  • Tune() 不会中断正在执行的任务。

流程

请添加图片描述

2. 底层实现

原理篇前置知识
详细请看以往文章 : Go语言底层(三): sync 包锁与对象池

2.1 核心数据结构

2.1.1 goWorker

type goWorker struct {pool *Pooltask chan func()recycleTime time.Time
}

goWorker 就是我们协程池里的实例 , 简单理解为一个长时间运行而不回收的协程,用于反复处理用户提交的异步任务

  • pool:goWorker 所属的协程池;

  • task:goWorker 用于接收异步任务包的管道;

  • recycleTime:goWorker 回收到协程池的时间.

2.1.2 Pool

type Pool struct {capacity int32running int32lock sync.Lockerworkers workerArraystate int32cond *sync.CondworkerCache sync.Poolwaiting int32heartbeatDone int32stopHeartbeat context.CancelFuncoptions *Options
}
  • capacity:池子的容量
  • running:出于运行中的协程数量
  • lock:自制的自旋锁,保证取 goWorker 时并发安全
  • workers:goWorker 列表,即“真正意义上的协程池”
  • state:池子状态标识,0-打开;1-关闭
  • cond:并发协调器,用于阻塞模式下,挂起和唤醒等待资源的协程
  • waiting:标识出于等待状态的协程数量;
  • heartbeatDone:标识回收协程是否关闭;
  • stopHeartbeat:用于关闭回收协程的控制器函数;
  • options:一些定制化的配置.
  • workerCache:存放 goWorker 的对象池,用于缓存释放的 goworker 资源用于复用. 对象池需要区别于协程池,协程池中的
    goWorker 仍存活,进入对象池的 goWorker 逻辑意义已经销毁;

请添加图片描述

2.1.3 workerArray

type workerArray interface {len() intisEmpty() boolinsert(worker *goWorker) errordetach() *goWorkerretrieveExpiry(duration time.Duration) []*goWorkerreset()
}

该 interface 主要定义了作为数据集合的几个通用 api,以及用于回收过期 goWorker 的 api.

  • insert 插入一个 goWorker
  • detach 取出一个 goWorker
  • retrieveExpiry 获取池中空闲时间超过 duration 的 已经过期的 goWorker 集合 ,其中 goWorker 的回收时间与入栈先后顺序相关,因此可以借助 binarySearch 方法基于二分法快速获取到目标集合.

2.2 核心方法的实现

2.2.1 NewPool 创建协程池

func NewPool(size int, options ...Option) (*Pool, error) {// 读取用户配置,做一些前置校验,默认值赋值等前处理动作...opts := loadOptions(options...)// 构造好 Pool 数据结构;p := &Pool{capacity: int32(size),lock:     internal.NewSpinLock(),options:  opts,}// 构造对象池p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}// 构造好 goWorker 对象池 workerCache,声明好工厂函数;p.workers = newWorkerArray(stackType, 0)//  golang 标准库提供的并发协调器,用于实现指定条件下阻塞和唤醒协程的操作.p.cond = sync.NewCond(p.lock)// 异步启动 goWorker 过期销毁协程.var ctx context.Contextctx, p.stopHeartbeat = context.WithCancel(context.Background())go p.purgePeriodically(ctx)return p, nil
}

2.2.2 pool.Submit 提交任务

func (p *Pool) Submit(task func()) error {// 从 Pool 中取出一个可用的 goWorker;var w *goWorkerif w = p.retrieveWorker(); w == nil {return ErrPoolOverload}// 将用户提交的任务包添加到 goWorker 的 channel 中.w.task <- taskreturn nil
}

取出goWorker 的实现:

func (p *Pool) retrieveWorker() (w *goWorker) {// 声明了一个构造 goWorker 的函数 spawnWorker 用于兜底,从对象池 workerCache 中获取 goWorker;spawnWorker := func() {w = p.workerCache.Get().(*goWorker)w.run()}p.lock.Lock()// 尝试从池中取出一个空闲的 goWorker;w = p.workers.detach()if w != nil { p.lock.Unlock()// 倘若池子容量未超过上限, 从对象池中取出一个 goWorker } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()spawnWorker()} else { // 倘若池子容量超限,且池子为非阻塞模式,直接抛回错误;if p.options.Nonblocking {p.lock.Unlock()return}// 倘若池子容量超限,且池子为阻塞模式,则基于并发协调器 cond 挂起等待有空闲 worker;retry:// 若阻塞任务已达最大限制,也直接返回;if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {p.lock.Unlock()return}// 增加等待数并使用 cond 条件变量挂起当前协程;p.addWaiting(1)p.cond.Wait() // block and wait for an available workerp.addWaiting(-1)// 被唤醒后(可能是因为 scavenger 清理协程),判断是否还有运行中的 worker;var nw intif nw = p.Running(); nw == 0 { // awakened by the scavengerp.lock.Unlock()spawnWorker()return}// 再次尝试重新获取一个空闲 worker;if w = p.workers.detach(); w == nil {if nw < p.Cap() {p.lock.Unlock()spawnWorker()return}goto retry}// 获取到了可用 worker,解锁并返回;p.lock.Unlock()}return
}

2.2.3 goWorker 运行

func (w *goWorker) run() {w.pool.addRunning(1)go func() {defer func() {w.pool.addRunning(-1)w.pool.workerCache.Put(w)if p := recover(); p != nil {// panic 后处理}w.pool.cond.Signal()}()for f := range w.task {if f == nil {return}f()if ok := w.pool.revertWorker(w); !ok {return}}}()
  • 循环 + 阻塞等待,直到获取到用户提交的异步任务包 task 并执行;
  • 执行完成 task 后,会将自己交还给协程池;
  • 倘若回归协程池失败,或者用户提交了一个空的任务包,则该 goWorker 会被销毁,销毁方式是将自身放回协程池的对象池 workerCache. 并且会调用协调器 cond 唤醒一个阻塞等待的协程.

参考文章 : 小徐的编程世界

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/84937.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/84937.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/84937.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

React Native【实战范例】网格导航 FlatList

import React from "react"; import {FlatList,Image,SafeAreaView,StyleSheet,Text,View, } from "react-native"; interface GridItem {id: string;title: string;imageUrl: string; } // 网格布局数据 const gridData Array.from({ length: 30 }, (_, …

KJY0047-J1阶段测试

KJY0047 - J1阶段测试题解 题目1&#xff1a;SYAP0001. 闯关 解题思路&#xff1a; 暴力思路&#xff1a;每次碰到奇数都使用一次 f o r for for 循环将后续的数值 1 1 1, 时间复杂度 O ( n 2 ) O(n^2) O(n2) 优化思路&#xff1a;可以用一个计数器 c n t cnt cnt 来存…

键盘按键枚举 Key 说明文档

键盘按键枚举 Key 说明文档 该文档介绍了 Key 枚举中定义的键盘按键常量及其对应编号&#xff0c;适用于标准 105 键的美式键盘布局。常用于浏览器或桌面端的键盘事件监听、游戏开发、快捷键映射等场景。 electron-jest ⌨️ 功能键&#xff08;Function Keys&#xff09; …

函数调用过程中的栈帧变化

int add(int a, int b) {int c a b;return c; }int main() {int result add(1, 2);return 0; }生成汇编代码&#xff1a;g -S Cplus.cpp -o Cplus.s .file "Cplus.cpp".text.globl _Z3addii.def _Z3addii; .scl 2; .type 32; .endef.seh_proc _Z3addii _Z3addii:p…

【Java面试笔记:实战】41、Java面试核心考点!AQS原理及应用生态全解析

引言:AQS在Java并发体系中的核心地位 AQS(AbstractQueuedSynchronizer)作为Java并发包的底层基石,是理解ReentrantLock、Semaphore等同步工具的关键。 在Java架构师面试中,AQS的原理与应用是高频考点,掌握其核心机制对理解JUC包和构建高并发系统至关重要。 本文将从原…

硕士课题常用命令

ros常用命令&#xff1a; 1.环境变量刷新 source devel/setup.bash2.ROS_INFO的信息在终端显示为乱码或者问号&#xff0c;则在main函数中加入&#xff1a; setlocale(LC_ALL, "");3.刷新bashrc文件 source ~/.bashrcPX4 roslaunch px4 mavros_posix_sitl.launc…

2.6 激光雷达消息格式

新建终端&#xff0c;执行命令 roslaunch wpr_simulation wpb_simple.launch 在新建终端&#xff0c;执行命令 roslaunch wpr_simulation wpb_rviz.launch 显示/Scan话题消息&#xff0c;后面的参数是noarr无数组&#xff0c;防止刷屏 rostopic echo /scan --noarr 参考官…

常见的网络协议有哪些

1.应用层 1.1 HTTP/HTTPS 前端与服务器通信的基础协议&#xff0c;用于传输 HTML、CSS、JS、图片等资源。 1.2WebSocket&#xff08;如社交聊天、股票实时报价、视频会议、在线教育等&#xff09; WebSocket协议建立在TCP协议之上&#xff0c;实现了浏览器与服务器之间的实时…

Prometheus + Grafana 监控 RabbitMQ 实践指南

文章目录 Prometheus Grafana 监控 RabbitMQ 实践教程一、前言二、环境搭建2.1 环境准备2.2 安装 Prometheus2.3 安装 Grafana 三、集成 RabbitMQ Exporter3.1 下载 RabbitMQ Exporter3.2 解压文件3.3 配置环境变量3.4 启动 RabbitMQ Exporter3.6 验证 Exporter 状态 四、Prom…

Babylon.js场景加载器(Scene Loader)使用指南

在3D开发中&#xff0c;Babylon.js的场景加载器(Scene Loader)是加载各种3D模型格式的核心工具。本文将详细介绍如何高效使用Scene Loader加载多种格式的3D模型文件。 一、基本概念与支持格式 要加载特定类型的文件&#xff0c;Babylon.js需要先注册对应的文件类型插件。目前…

编程学习网站大全(C++/OpenCV/QT方向)—— 资源导航与深度评测

工欲善其事&#xff0c;必先利其器 本文系统整理了C、OpenCV、QT三大方向的优质学习网站&#xff0c;结合技术特点与平台优势&#xff0c;助你精准选择学习资源&#xff0c;少走弯路&#xff01; 一、C 学习网站精选 &#x1f4da; 1. cppreference.com 权威性最高&#xff1a…

逆向入门(5)程序逆向篇-AD_CM#2

打开程序 常规注册界面&#xff0c;打开OD&#xff0c;随便找找就看到关键字了 没有壳逻辑也挺简单的 获取输入框&#xff0c;用5比较输入内容的长度&#xff0c;小于则跳转提示密码长度不够 否则就进入下一个流程&#xff0c;去获取序列号&#xff0c;其实可以直接将jnz换…

OD 算法题 B卷【路灯照明II】

文章目录 路灯照明II 路灯照明II 在一条笔直的公路上安装了N个路灯&#xff0c;从位置0开始安装&#xff0c;间距固定为100米&#xff1b;每个路灯都有自己的照明半径&#xff0c;计算第一个路灯和最后一个路灯之间&#xff0c;无法照明的区间长度和&#xff1b; 输入描述: 第…

JUC核心解析系列(四)——同步工具类 (Synchronizers)深度解析

在多线程开发中&#xff0c;死锁、资源竞争、线程协调等问题如同暗礁&#xff0c;稍有不慎就会导致程序崩溃。而JUC同步工具类正是解决这些问题的瑞士军刀&#xff01; 一、同步工具类核心价值&#xff1a;线程协作的艺术 在高并发系统中&#xff0c;线程协作是保证数据一致性…

板凳-------Mysql cookbook学习 (十--6)

第7章&#xff1a;排序查询结果 7.0 引言 mysql> use cookbook Database changed mysql> select * from driver_log; ---------------------------------- | rec_id | name | trav_date | miles | ---------------------------------- | 1 | Ben | 2014-07-30 …

从入门到精通:C# 中 AutoMapper 的深度解析与实战应用

在 C# 开发领域&#xff0c;尤其是企业级应用开发过程中&#xff0c;不同层次和模块之间的数据传递与对象转换是常见需求。例如&#xff0c;从数据库读取的实体类&#xff0c;在传递到前端时&#xff0c;往往需要转换为更简洁、安全的数据传输对象&#xff08;DTO&#xff09; …

【热更新知识】学习一 Lua语法学习

1、注释 1.1 单行注释 --注释内容 --单行注释 print打印函数 1.2 多行注释&#xff0c;三种方式 --[[注释内容]] --[[注释内容]]-- --[[注释内容--]] --[[ 多行 注释 ]]--[[ 第二种多行注释 1 2 ]]----[[ 第三种 多行 注释 --]] 2、简单变量 2.1 声明变量&#xff0c…

React 第三方状态管理库的比较与选择

在现代前端开发中,状态管理是一个重要的环节。选择合适的状态管理库可以极大地提高项目的可维护性和开发效率。本文将对几种流行的状态管理库进行比较,包括Valtio、XState、MobX、Recoil和Zustand,帮助开发者在实际项目中做出明智的选择。 1. Valtio 1.1. 设计理念 Valti…

《Kafka 在实时消息系统中的高可用架构设计》

Kafka 在实时消息系统中的高可用架构设计 引言 在当今互联网社交应用中&#xff0c;实时消息系统已成为核心基础设施。以中性互联网公司为例&#xff0c;其每天需要处理数十亿条消息&#xff0c;涵盖一对一聊天、群组互动、直播弹幕等多种场景。特别是在大型直播活动中&#…

SKUA-GOCAD入门教程-第八节 线的创建与编辑3

8.1.4根据面对象创建曲线 (1)从曲面生成曲线 从曲面边界生成曲线您可以从选定的曲面边界创建一条单段曲线。 1、选择 Curve commands > New > Borders > One 打开从曲面的一条边界创建曲线对话框。 图1 在“Name名称”框中,输入要创建的曲线的名称。