三. CachedThreadPool 的实现

3.1 需求:

动态调整线程数量:与 FixedThreadPool 不同,CachedThreadPool 的线程数量是动态调整的。当有新任务提交时,如果线程池中有空闲的线程,则会立即使用空闲线程执行任务;如果线程池中没有空闲线程,则会创建一个新的线程来执行任务。当线程空闲一段时间后,超过一定的时间(默认为 60 秒),会被回收销毁。

3.2 SyncQueue 同步队列的设计和实现

#ifndef SYNCQUEUE3_HPP
#define SYNCQUEUE3_HPP#include<list>
#include<mutex>
#include<condition_variable>
#include<iostream>using namespace std;template<class T>
class SyncQueue
{
private:std::list<T> m_queue; // 任务缓冲区mutable std::mutex m_mutex;std::condition_variable m_notEmpty; // Cstd::condition_variable m_notFull;  // Psize_t m_waitTime;                  //任务队列满等待时间sint m_maxSize;                      // bool m_needStop;    // true 同步队列不在接受新的任务//bool IsFull() const{bool full = m_queue.size() >= m_maxSize;if (full){printf("m_queue 已经满了,需要等待....\n");}return full;}bool IsEmpty() const{bool empty = m_queue.empty(); //if (empty){printf("m_queue 已经空了,需要等待....\n");}return empty;}// return 0; 成功// 1 ;// full// 2 ;// stop;template<class F>int Add(F&& x){std::unique_lock<std::mutex> locker(m_mutex);if (!m_notFull.wait_for(locker, std::chrono::seconds(m_waitTime),[this] { return m_needStop || !IsFull(); })){cout << "task queue full return 1" << endl;return 1;}if (m_needStop){cout << "同步队列停止工作..." << endl;return 2;}m_queue.push_back(std::forward<F>(x));m_notEmpty.notify_one();return 0;}public:SyncQueue(int maxsize = 100, int timeout = 1):m_maxSize(maxsize),m_waitTime(timeout),m_needStop(false) // 同步队列开始工作{}int Put(const T& x){return Add(x);}int Put(T&& x){return Add(std::forward<T>(x));}int notTask(){std::unique_lock<std::mutex> locker(m_mutex);if (!m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime))== std::cv_status::timeout){return 1;}return 0;}void Take(std::list<T>& list) //{std::unique_lock<std::mutex> locker(m_mutex);while (!m_needStop && IsEmpty()){m_notEmpty.wait(locker);}if (m_needStop){cout << "同步队列停止工作..." << endl;return;}list = std::move(m_queue);m_notFull.notify_one();}//T& GetTake();// return 0; 成功// 1 ;// empty// 2 ;// stop;int Take(T& t) // 1{std::unique_lock<std::mutex> locker(m_mutex);if (!m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime),[this] { return m_needStop || !IsEmpty(); })){return 1;   // 队列空}if (m_needStop){cout << "同步队列停止工作..." << endl;return 2;}t = m_queue.front();m_queue.pop_front();m_notFull.notify_one();return 0;}void Stop(){std::unique_lock<std::mutex> locker(m_mutex);while (!IsEmpty()){m_notFull.wait(locker);}m_needStop = true;m_notFull.notify_all();m_notEmpty.notify_all();}bool Empty() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.empty();}bool Full() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.size() >= m_maxSize;}size_t size() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.size();}
};#endif

3.3 CachedThreadPool 线程池的设计和实现

#ifndef CACHEDTHREADPOOL_HPP
#define CACHEDTHREADPOOL_HPP#include"SyncQueue3.hpp"
#include<functional>
#include<unordered_map>
#include<map>
#include<future>using namespace std;int MaxTaskCount = 2;
const int KeepAliveTime = 10;   //线程最大存活时间 60 ,为测试改为10class CachedThreadPool
{
public:using Task = std::function<void(void)>;private:std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;int m_coreThreadSize;                 // 核心的线程数量,下限阈值 2int m_maxThreadSize;                  // 最大的线程数量,上限阈值std::atomic_int m_idleThreadSize;     // 空闲线程的数量std::atomic_int m_curThreadSize;      // 当前线程池里面的线程总数量mutable std::mutex m_mutex;           // SyncQueue<Task> m_queue;std::atomic_bool m_running; // true ; false stop;std::once_flag m_flag;void Start(int numthreads){m_running = true;m_curThreadSize = numthreads;for (int i = 0; i < numthreads; ++i){auto tha = std::make_shared<std::thread>(std::thread(&CachedThreadPool::RunInThread, this));std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_idleThreadSize++;}}void RunInThread(){auto tid = std::this_thread::get_id();auto startTime = std::chrono::high_resolution_clock().now();while (m_running){Task task;if (m_queue.size() == 0 && m_queue.notTask()){auto now = std::chrono::high_resolution_clock().now();auto intervalTime = std::chrono::duration_cast<std::chrono::seconds>(now - startTime);std::lock_guard<std::mutex> lock(m_mutex);if (intervalTime.count() >= KeepAliveTime &&m_curThreadSize > m_coreThreadSize){m_threadgroup.find(tid)->second->detach();m_threadgroup.erase(tid);m_curThreadSize--;m_idleThreadSize--;cout << "空闲线程销毁" << m_curThreadSize << " " << m_coreThreadSize << endl;return;}}if (!m_queue.Take(task) && m_running){m_idleThreadSize--;task();m_idleThreadSize++;startTime = std::chrono::high_resolution_clock().now();}}}void StopThreadGroup(){m_queue.Stop();m_running = false;for (auto& thread : m_threadgroup){thread.second->join();}m_threadgroup.clear();}public:CachedThreadPool(int initNumThreads = 8, int taskPoolSize = MaxTaskCount):m_coreThreadSize(initNumThreads),m_maxThreadSize(2 * std::thread::hardware_concurrency() + 1),m_idleThreadSize(0),m_curThreadSize(0),m_queue(taskPoolSize),m_running(false){Start(m_coreThreadSize);}~CachedThreadPool(){StopThreadGroup();}template<class Func, class... Args>auto submit(Func&& func, Args&&... args) -> std::future<decltype(func(args...))>{auto task = std::make_shared<std::packaged_task<decltype(func(args...))()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));std::future<decltype(func(args...))> result = task->get_future();if (m_queue.Put([task]() { (*task)(); }) != 0){cout << "调用者运行策略" << endl;(*task)();}if (m_idleThreadSize <= 0 && m_curThreadSize < m_maxThreadSize){std::lock_guard<std::mutex> lock(m_mutex);auto tha = std::make_shared<std::thread>(std::thread(&CachedThreadPool::RunInThread, this));std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_curThreadSize++;m_idleThreadSize++;}return result;}template<class Func, class... Args>void execute(Func&& func, Args&&... args){submit(std::forward<Func>(func), std::forward<Args>(args)...);}
};#endif

3.4 测试

///
void func(int index)
{static int num = 0;cout << "func_" << index << " num: " << ++num << endl;
}int add(int a, int b)
{return a + b;
}int main()
{CachedThreadPool mypool;for (int i = 0; i < 1000; ++i){if (i % 2 == 0){auto pa = mypool.submit(add, i, i + 1);cout << pa.get() << endl;}else{mypool.execute(func, i);}}CachedThreadPool pool(2);int add(int a, int b, int s){std::this_thread::sleep_for(std::chrono::seconds(s));int c = a + b;cout << "add begin ..." << endl;return c;}void add_a(){auto r = pool.submit(add, 10, 20, 4);cout << "add_a: " << r.get() << endl;}void add_b(){auto r = pool.submit(add, 20, 30, 6);cout << "add_b: " << r.get() << endl;}void add_c(){auto r = pool.submit(add, 30, 40, 1);cout << "add_c: " << r.get() << endl;}void add_d(){auto r = pool.submit(add, 10, 40, 9);cout << "add_d: " << r.get() << endl;}int main(){std::thread tha(add_a);std::thread thb(add_b);std::thread thc(add_c);std::thread thd(add_d);tha.join();thb.join();thc.join();thd.join();std::this_thread::sleep_for(std::chrono::seconds(20));std::thread the(add_a);std::thread thf(add_b);the.join();thf.join();return 0;}
}

3.5 FixedThreadPool 与 CachedThreadPool 特性对比

特性FixedThreadPoolCachedThreadPool
重用能 reuse 就用,但不能随时建新的线程先查看池中有无以前建立的线程,有就 reuse;没有就建新线程加入池中
池大小可指定 nThreads,固定数量可增长,最大值 Integer.MAX_VALUE
队列大小无限制无限制
超时无 IDLE默认 60 秒 IDLE
使用场景针对稳定固定的正规并发线程,用于服务器,执行负载重、CPU 使用率高的任务,防止线程频繁切换得不偿失处理大量短生命周期异步任务,执行大量并发短期异步任务,任务负载要轻
结束不会自动销毁放入的线程超过 TIMEOUT 不活动会自动被终止

3.6 最佳实践

FixedThreadPool 和 CachedThreadPool 对高负载应用都不特别友好,CachedThreadPool 更危险。若应用要求高负载、低延迟,最好不选这两种,推荐使用 ThreadPoolExecutor ,可进行细粒度控制:

  1. 将任务队列设置成有边界的队列
  2. 使用合适的 RejectionHandler 拒绝处理程序
  3. 若任务完成前后需执行操作,可重载 beforeExecute(Thread, Runnable)afterExecute(Runnable, Throwable)
  4. 重载 ThreadFactory ,若有线程定制化需求
  5. 运行时动态控制线程池大小(Dynamic Thread Pool)

3.7 使用场景

适用于以下场景:

  1. 大量短期任务:适合处理大量短期任务,任务到来时尽可能创建新线程执行,有空闲线程则复用,避免频繁创建销毁线程的额外开销。
  2. 任务响应快速:可根据任务到来快速创建启动新线程执行,减少任务等待时间。
  3. 不需要限制线程数量:最大线程数不限,只要内存足够,可根据任务动态创建新线程。
  4. 短期性任务的高并发性:可动态创建线程,适合处理需高并发性的短期任务,任务处理完后保持一定空闲线程用于下一批任务。

需注意,CachedThreadPool 线程数量不受限,任务过多可能导致线程数量过多、系统资源过度消耗,使用时需灵活调整线程数量或用其他线程池控制资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/94516.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/94516.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/94516.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WMS+自动化立库:无人仓的现在进行时

传统仓库正面临严峻挑战&#xff1a;效率瓶颈日益凸显&#xff0c;人力成本持续攀升&#xff0c;空间利用率逼近极限&#xff0c;而订单响应速度却难以满足市场需求。如何破局&#xff1f;WMS&#xff08;仓库管理系统&#xff09;与自动化立体库&#xff08;AS/RS&#xff09;…

多模态大模型研究每日简报【2025-08-05】

训练数据相关 EditGarment: An Instruction-Based Garment Editing Dataset Constructed with Automated MLLM Synthesis and Semantic-Aware Evaluation (https://arxiv.org/abs/2508.03497)&#xff1a;提出了一种自动化的流程&#xff0c;用于构建服装编辑数据集EditGarmen…

4、docker数据卷管理命令 | docker volume

1、命令总览命令作用出现频率备注★ docker volume create新建卷高-d 指定驱动&#xff0c;-o 指定驱动选项★ docker volume ls列出卷高--filter danglingtrue 查孤儿卷★ docker volume inspect查看卷详情高输出 JSON&#xff0c;可加 --format★ docker volume rm删除卷高只…

计数组合学7.14(对偶 RSK 算法)

7.14 对偶 RSK 算法 存在 RSK 算法的一种变体&#xff0c;其与乘积 ∏i,j(1xiyj)\prod_{i,j}(1 x_{i}y_{j})∏i,j​(1xi​yj​) 的关系类似于 RSK 算法本身与 ∏i,j(1−xiyj)−1\prod_{i,j}(1 - x_{i}y_{j})^{-1}∏i,j​(1−xi​yj​)−1 的关系。我们称此变体为对偶 RSK 算法…

C语言中的进程、线程与进程间通信详解

目录 引言 基本概念 1. 进程&#xff08;Process&#xff09; 2. 线程&#xff08;Thread&#xff09; 线程编程实战 1. 常见线程库 2. 合理设置线程数 3. pthread 创建线程 线程同步机制 1. 互斥锁 pthread_mutex_t 2. 条件变量 pthread_cond_t 3. 读写锁 pthread…

[假面骑士] 555浅谈

假面骑士555(faiz)是我最先接触的一部平成系列的假面骑士&#xff0c;同时也是我个人最喜欢的一部假面骑士。一、大纲简介震惊&#xff0c;人类最新的进化形态——奥菲一诺&#xff0c;横空出世&#xff01;日本的顶级财团&#xff0c;Smart Brain&#xff0c;的前任社长&#…

Vue Router 路由的创建和基本使用(超详细)

一、路由的基本概念 你是否好奇单页应用&#xff08;SPA&#xff09;是如何在不刷新页面的情况下实现页面切换的&#xff1f;这就离不开路由的功劳。 路由&#xff1a;本质是一组 key-value 的对应关系&#xff0c;在前端领域中&#xff0c;key 通常是路径&#xff0c;value …

深入理解设计模式:策略模式的艺术与实践

在软件开发中&#xff0c;我们经常会遇到需要根据不同情况选择不同算法或行为的场景。传统的做法可能是使用大量的条件语句&#xff08;if-else或switch-case&#xff09;&#xff0c;但随着需求的增加和变化&#xff0c;这种硬编码的方式会导致代码难以维护和扩展。策略模式&a…

概率/期望 DP llya and Escalator

题目链接&#xff1a;Problem - D - Codeforces 看了这篇文章来的&#xff1a;【算法学习笔记】概率与期望DP - RioTian - 博客园 这篇博客写得挺好的&#xff0c;讲了一些常见方法&#xff0c;概率 / 期望的题多练练就上手了。 题目大意&#xff1a; n 个人排队上电梯&…

大陆电子MBDS开发平台转到其他国产控制器平台产生的问题记录

u8_StComLowSpdGearSwt变量为例&#xff0c;之前用的时候只有输入&#xff0c;没什么实际意义&#xff0c;导致新环境下编译报错&#xff0c;缺少声明&#xff0c;解决办法&#xff1a;注释掉输入模块。今天解决的另一个比较大的问题&#xff0c;不同模型函数公用函数模块生成代…

机器学习模型调优实战指南

文章目录模型选择与调优&#xff1a;从理论到实战1. 引言2. 模型评估&#xff1a;为选择提供依据2.1 偏差-方差权衡2.2 数据集划分与分层抽样2.3 交叉验证&#xff08;Cross-Validation&#xff09;2.4 信息准则&#xff08;AIC / BIC&#xff09;3. 超参数调优&#xff1a;让模…

【教程】Unity CI/CD流程

测试机&#xff1a;红帽 Linux8 源码仓库&#xff1a;Gitee - MrRiver/Unity Example   系统环境准备 1&#xff09;yum 源 sudo curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-8.repo sudo sed -i s/\$releasever/8/g /etc/yum.repos…

文献阅读 | Briefings in Bioinformatics | Hiplot:全面且易于使用的生物医学可视化分析平台

文献介绍文献题目&#xff1a; Hiplot&#xff1a;一个综合且易于使用的 Web 服务&#xff0c;用于增强出版物准备的生物医学数据可视化 研究团队&#xff1a; Openbiox/Hiplot 社区 发表时间&#xff1a; 2022-07-05 发表期刊&#xff1a; Briefings in Bioinformatics 影响因…

【数字图像处理系列笔记】Ch04:灰度变换与空间域图像增强(2)

目录 一、空域滤波基础 一、空域滤波的基本概念 二、空域滤波的数学原理 三、空域滤波器的分类与典型示例 &#xff08;一&#xff09;线性滤波器&#xff08;Linear Filter&#xff09; &#xff08;二&#xff09;非线性滤波器&#xff08;Non-linear Filter&#xff0…

AI浪潮下,FPGA如何实现自我重塑与行业变革

引言&#xff1a;AI 与 FPGA&#xff0c;新时代的碰撞 2025 年&#xff0c;人工智能技术迎来爆发式增长&#xff0c;大模型、生成式 AI 和多模态技术持续突破&#xff0c;人形机器人量产元年正式开启&#xff0c;自动驾驶商业化进程加速&#xff0c;工业数字化转型全面铺开(1)…

系统集成项目管理工程师【第十一章 规划过程组】定义范围、创建WBS、规划进度管理和定义活动篇

系统集成项目管理工程师【第十一章 规划过程组】定义范围、创建WBS、规划进度管理和定义活动篇 一、定义范围&#xff1a;给项目画好"边界线" 定义范围是明确项目和产品"做什么、不做什么"的过程&#xff0c;直接影响后续所有工作的方向。 1. 核心概念与作…

Spring Boot 参数校验全指南

Spring Boot 参数校验全指南 在 Web 开发中&#xff0c;参数校验是保障接口安全性和数据合法性的关键环节。手动编写校验逻辑不仅繁琐&#xff0c;还容易遗漏边界情况。Spring Boot 整合了 validation 工具&#xff0c;提供了一套简洁高效的参数校验方案&#xff0c;可快速实现…

常用技术资料链接

1.team技术 https://zhuanlan.zhihu.com/p/11389323664 https://blog.csdn.net/Lucky_Lu0/article/details/121697151 2.bond切换主备 https://www.xgss.net/3306.html 3.ssh详解&#xff1a; https://cloud.tencent.com/developer/news/105165 https://blog.huochengrm.c…

【Spring Cloud】-- 注册中心

文章目录1. 什么是注册中心2. CPA理论1. 什么是注册中心 注册中心有三种角色&#xff1a; 服务提供者&#xff08;Server&#xff09; &#xff1a;提供接口给其他微服务的程序。服务消费者&#xff08;Client&#xff09;&#xff1a;调用其他微服务提供的接口。**服务注册中…

go-zero 详解

go-zero 详解 go-zero 是一个基于 Go 语言的微服务框架&#xff0c;由字节跳动团队开发并开源&#xff0c;旨在帮助开发者快速构建高可用、高性能的微服务架构。它集成了丰富的组件&#xff0c;简化了微服务开发中的常见问题&#xff08;如服务注册发现、配置管理、限流熔断等&…