目录

  • 1. 生产者消费者模型的相关概念
    • 1.1 什么是生产者消费者模型
    • 1.2 生产者消费者模型的优势作用
  • 2. 多线程简单实现生产者消费者模型
    • 2.1 设计方案
    • 2.2 代码实现
      • 2.2.1 线程类
      • 2.2.2 BlockQueue类
      • 2.2.3 任务类
      • 2.2.4 主干代码

1. 生产者消费者模型的相关概念

1.1 什么是生产者消费者模型

在这里插入图片描述
  生产者消费者模型是一种经典的并发编程的设计模式。其由三部分组成分别为生产者消费者共享资源缓冲区

  • 生产者: 生产任务、数据的线程或进程
  • 消费者: 处理任务、数据的线程或进程
  • 共享资源缓冲区: 任务与数据的暂存区,生产者向其中存储任务与数据,消费者从中获取任务与数据。简单来说,共享资源缓冲区,是一段临时保存数据的内存空间,一般使用某种数据结构对象充当(阻塞队列)

  生产者消费者模型中,其充当生产、消费角色的线程/进程,它们之间需要满足特定的关系,具体如下:

角色关系
生产者 vs 生产者互斥 || 同步(互斥,可能同步)
消费者 vs 消费者互斥 || 同步(互斥,可能同步)
生产者 vs 消费者互斥 && 同步

1.2 生产者消费者模型的优势作用

  生产者消费者模型是为了协调生产者与消费者之间协作。具体设计为,生产者生产的数据不再直接交给消费者,而是直接存入共享资源缓冲区。而消费者也不再从生产者手中获取数据,则是转为从共享资源缓冲区中获取存入的历史数据。通过这样的设计方式,让生产与消费的操作解耦合,提高更好的并发度,并且支持生产者、消费者之间的忙先不均
  生产者消费者模型高效与并发度好的原因为,支持生产任务与处理任务或数据的并发。当生产者竞争锁或是生产任务、数据时,消费者可执行自己的任务,或是直接从缓冲区中获取存储的历史任务、数据。消费者执行任务不影响生产者获取、生产任务。

2. 多线程简单实现生产者消费者模型

2.1 设计方案

1. 生产者消费者模型的实体选择:
在这里插入图片描述

  • 生产者:创建一批线程向共享资源缓冲区中生产任务
  • 消费者:创建一批线程从共享资源缓冲区获取任务并处理
  • 共享资源缓冲区:此处使用自定义的阻塞队列(BlockQueue)实现,保证生产者、消费者访问其时互斥且同步

2.阻塞队列(BlockQueue)的实现:

成员变量作用
queue<T>用于存储任务、数据的队列
int _cap队列的容量大小
pthread_mutex_t _mutex访问阻塞队列时控制互斥的锁
pthread_cond_t _productor_cond控制生产者同步的条件变量
pthread_cond_t _consumer_cond控制消费者同步的条件变量
int _productor_wait_num在条件变量处阻塞等待的生产者线程数量
int _consumer_wait_num在条件变量处阻塞等待的消费者线程数量
成员函数作用
void Equeue(T& data)将生产者生产的数据入队列
void Pop(T* data)从队列中获取历史的数据,采用输出型参数的方式
bool IsFull()检测队列是否满了
bool IsEmpty()检测队列是否为空
  • 互斥: 阻塞队列的入队与出队操作都必须是互斥的,即保证无论何时,无论是生产者还是消费者线程都只能有一个线程在访问阻塞队列。
  • 同步: 除此之外,还要保证生产者与消费者之间的同步,即队列中数据存储已慢,则阻塞生产者,队列中没有数据,则阻塞消费者,确保生产与消费的整个过程可以正常进行。
  • 条件变量的优化: 当没有生产者或消费者在条件变量下阻塞等待时,就可以选择不需要再去将对应的条件变量唤醒。

3. 程序的主干逻辑与函数

在这里插入图片描述

2.2 代码实现

2.2.1 线程类

#ifndef THREAD_MODULE
#define THREAD_MODULE
#include <pthread.h>
#include <iostream>
using namespace std;
#include <functional>namespace ThreadModule
{template<typename T>using func_t = function<void(T&)>;template<typename T>class Thread{public:Thread(func_t<T> func, T& data, string name = "none-thread"):_func(func), _data(data), _name(name), _stop(true){}~Thread(){}void Execute(){_func(_data);}static void* threadroutine(void* arg){Thread<T>* ptd = static_cast<Thread<T>*>(arg);ptd->Execute();return nullptr;}bool start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(n){return false; }_stop = false;return true;}void join(){if(!_stop){pthread_join(_tid, nullptr);}}void detach(){if(!_stop){pthread_detach(_tid);}}string name(){return _name;}void stop(){_stop = true;}private:pthread_t _tid;string _name;func_t<T> _func;T& _data;bool _stop;};}#endif

2.2.2 BlockQueue类

#ifndef BLOCK_QUEUE_HPP
#define BLOCK_QUEUE_HPP#include <queue>
#include <pthread.h>template<typename T>
class BlockQueue
{
public:BlockQueue(int cap){_cap = cap;pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_consumer_cond, nullptr);pthread_cond_init(&_productor_cond, nullptr);_consumer_wait_num = 0;_productor_wait_num = 0;}bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();}void Enqueue(T& data){pthread_mutex_lock(&_mutex);while(IsFull()){_productor_wait_num++;pthread_cond_wait(&_productor_cond, &_mutex);_productor_wait_num--;}_q.push(data);if(_consumer_wait_num > 0)//当有正在等待的消费者时pthread_cond_signal(&_consumer_cond);//生产了继续消费pthread_mutex_unlock(&_mutex);}void Pop(T* data){pthread_mutex_lock(&_mutex);while(IsEmpty()){_consumer_wait_num++;pthread_cond_wait(&_consumer_cond, &_mutex);_consumer_wait_num--;}*data = _q.front();_q.pop();if(_productor_wait_num > 0)//当有正在阻塞等待的生产者时pthread_cond_signal(&_productor_cond);//消费了继续生产pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_consumer_cond);pthread_cond_destroy(&_productor_cond);}private:std::queue<T> _q;    //队列存储数据int _cap;            //阻塞队列的容量pthread_mutex_t _mutex;pthread_cond_t _consumer_cond;pthread_cond_t _productor_cond;int _consumer_wait_num;int _productor_wait_num;
};#endif

  条件变量的阻塞判断条件应设为为while,不能设置为if,这是因为pthread_cond_wait可能会出错返回导致继续执行后续代码。可此时唤醒条件并未满足,条件变量并没有被真的唤醒,此种情况被称为伪唤醒if条件判断语句并不能预防此种伪唤醒的错误情况,所以,一般条件变量的阻塞判断条件会被设置为while检测,保证代码的健壮性

2.2.3 任务类

#ifndef TASK_HPP
#define TASK_HPP
#include <functional>
#include <iostream>
using namespace std;//使用仿函数
class Task
{
public://无参构造,用于消费者创建接收数据Task(){}Task(int x, int y):_x(x), _y(y){}~Task(){}void toDebugQuestion(){cout << _x << " + " << _y << " =?" << endl;}void toDebugAnswer(){cout << _x << " + " << _y << " = " << _x + _y << endl;}private:int _x;int _y;int _sum;
};#endif

2.2.4 主干代码

#include "Thread.hpp"
#include "BlockQueue.hpp"
#include <vector>
using namespace ThreadModule;
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "Task.hpp"using blockqueue_t = BlockQueue<Task>;void ProductorRun(blockqueue_t& bq)
{while(true){sleep(1);//生产慢Task t(rand() % 10, rand() % 10);bq.Enqueue(t);t.toDebugQuestion();}
}void ConsumerRun(blockqueue_t& bq)
{Task t;while(true){//sleep(1);//消费慢bq.Pop(&t);t.toDebugAnswer();}
}void StartComm(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, func_t<blockqueue_t> func, int num, string who)
{for(int i = 0; i < num; i++){string name = who + '-' + to_string(i + 1);threads.emplace_back(func, bq, name);threads.back().start();cout << name << " create success..." << endl;}
}void StartProductor(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, int num)
{StartComm(threads, bq, ProductorRun, num, "Productor");
}void StartConsumer(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, int num)
{StartComm(threads, bq, ConsumerRun, num, "Consumer");
}void WaitAllThreads(vector<Thread<blockqueue_t> >& threads)
{for(auto& thread : threads){thread.join();}
}int main()
{srand((size_t)time(nullptr));vector<Thread<blockqueue_t> > threads;blockqueue_t bq(5);StartProductor(threads, bq, 3);StartConsumer(threads, bq, 5);WaitAllThreads(threads);return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/97860.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/97860.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/97860.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《深度学习》卷积神经网络:数据增强与保存最优模型解析及实现

目录 一、数据增强 1. 核心概念 2. 核心目的 3. 常用方法 4. 实现示例&#xff08;基于 PyTorch&#xff09; 5. 自定义数据集加载 二、保存最优模型 1. 核心概念 2. 实现步骤 &#xff08;1&#xff09;定义 CNN 模型 &#xff08;2&#xff09;定义训练与测试函数…

tcpdump用法

tcpdump用法tcpdump一、什么是tcpdump二、命令格式与参数三、参数列表四、过滤规则组合逻辑运算符过滤器关键字理解 Flag 标识符五、常用例子tcpdump 一、什么是tcpdump 二、命令格式与参数 option 可选参数&#xff1a;将在后边一一解释。 proto 类过滤器&#xff1a;根据协…

平衡车 - 电机调速

&#x1f308;个人主页&#xff1a;羽晨同学 &#x1f4ab;个人格言:“成为自己未来的主人~” 在我们的这篇文章当中&#xff0c;我们主要想要实现的功能的是电机调速功能。在我们的这篇文章中&#xff0c;主要实现的是开环的功能&#xff0c;而非闭环&#xff0c;也就是不加…

从利润率看价值:哪些公司值得长期持有?

&#x1f4a1; 为什么盯紧利润率&#xff1f; 投资者常常盯着营收增长&#xff0c;却忽略了一个更关键的指标——利润率。 收入可以靠规模“堆”出来&#xff0c;但利润率却是企业护城河的真实体现。心理学研究表明&#xff1a;当一个产品或服务被消费者认定为“不可替代”&a…

小迪web自用笔记25

传统文件上传&#xff1a;上传至服务器本身硬盘。云存储&#xff1a;借助云存储oss对象存储&#xff08;只能被访问&#xff0c;不可解析&#xff09;Oss云存储Access key与Access ID&#xff1a;有了这两个东西之后就可以操作云存储&#xff0c;可以向里面发数据了。这玩意儿泄…

分发饼干——很好的解释模板

好的&#xff0c;孩子&#xff0c;我们来玩一个“喂饼干”的游戏。 0. 问题的本质是什么&#xff1f; 想象一下&#xff0c;你就是个超棒的家长&#xff0c;手里有几块大小不一的饼干&#xff0c;而面前有几个饿着肚子的小朋友。每个小朋友都有一个最小的“胃口”值&#xff0c…

场景题:如果一个大型项目,某一个时间所有的CPU的已经被占用了,导致服务不可用,我们开发人员应该如何使服务器尽快恢复正常

问&#xff1a;如果一个大型项目,某一个时间所有的CPU的 已经被占用了&#xff0c;导致服务不可用&#xff0c;我们开发人员 应该如何使服务器尽快恢复正常答&#xff1a;应对CPU 100%导致服务不可用的紧急恢复流程面试官&#xff0c;如果遇到这种情况&#xff0c;我会立即按照…

Docker 安装 RAGFlow保姆教程

前提条件 Ubuntu 服务器(20.04 或 22.04 LTS 推荐) 已安装 Docker 和 Docker Compose 如果尚未安装,请先运行以下命令:# 安装 Docker curl -fsSL https://get.docker.com -o get-docker.sh sudo sh get-docker.sh # 将当前用户加入 docker 组,避免每次都要 sudo sudo user…

为什么实际工程里 C++ 部署深度学习模型更常见?为什么大家更爱用 TensorRT?

很多人刚接触深度学习模型部署的时候&#xff0c;都会习惯用 Python&#xff0c;因为训练的时候就是 PyTorch、TensorFlow 啊&#xff0c;写起来方便。但一到 实际工程&#xff0c;特别是工业设备、医疗影像、上位机系统这种场景&#xff0c;你会发现大多数人都转向了 C 部署。…

深入理解 Java 集合框架:底层原理与实战应用

在日常开发中&#xff0c;集合是 Java 中使用频率最高的工具之一。从最常见的 ArrayList、HashMap 到更复杂的并发集合&#xff0c;几乎每一个 Java 程序员都离不开集合框架。集合框架不仅提供了丰富的数据结构实现&#xff0c;还封装了底层复杂的逻辑&#xff0c;让开发者能够…

爬取m3u8视频完整教程

爬取步骤&#xff1a;1.先找到网页源代码2.从网页源代码中拿到m3u83.下载m3u84.读取m3u8文件&#xff0c;下载视频5.合并视频首先我们来爬取一个星辰影院的电影&#xff1a;下面我以这个为例&#xff1a;我们需要在源代码中找到m3u8这个url&#xff1a;紧接着我们利用下面的方法…

Python爬虫实战: 基于Scrapy的Amazon跨境电商选品数据爬虫方案

概述与设计思路 利用Python的Scrapy框架进行大规模页面抓取和结构化数据提取,配合aiohttp实现高并发请求,从而高效获取Amazon平台上的商品列表、详情、评论等公开信息。通过对这些数据进行清洗与分析,可以识别出有潜力的商品,评估市场竞争程度,并跟踪竞争对手的动态,为跨…

稳定版IM即时通讯 仿默往APP即时通讯im源码聊天社交源码支持二开原生开发独立部署 含搭建教程

内容目录一、详细介绍二、效果展示1.部分代码2.效果图展示三、学习资料下载一、详细介绍 技术开发语言&#xff1a; 后台管理端&#xff1a;Java GO Mysql数据库 安卓端&#xff1a;Java iOS端&#xff1a;ob PC端&#xff1a;c 功能简单介绍&#xff1a; 单聊&#xff…

封装一个redis获取并解析数据的工具类

redis获取并解析数据工具类实现代码使用示例实现代码 import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import lom…

23种设计模式——策略模式 (Strategy Pattern)​详解

✅作者简介&#xff1a;大家好&#xff0c;我是 Meteors., 向往着更加简洁高效的代码写法与编程方式&#xff0c;持续分享Java技术内容。 &#x1f34e;个人主页&#xff1a;Meteors.的博客 &#x1f49e;当前专栏&#xff1a;设计模式 ✨特色专栏&#xff1a;知识分享 &#x…

CI(持续集成)、CD(持续交付/部署)、CT(持续测试)、CICD、CICT

目录 **CI、CD、CT 详解与关系** **1. CI(Continuous Integration,持续集成)** **2. CD(Continuous Delivery/Deployment,持续交付/部署)** **持续交付(Continuous Delivery)** **持续部署(Continuous Deployment)** **3. CT(Continuous Testing,持续测试)** **4.…

【音视频】WebRTC ICE 模块深度剖析

原文链接&#xff1a; https://mp.weixin.qq.com/s?__bizMzIzMjY3MjYyOA&mid2247498075&idx2&sn6021a2f60b1e7c71ce4d7af6df0b9b89&chksme893e540dfe46c56323322e780d41aec1f851925cfce8b76b3f4d5cfddaa9c7cbb03a7ae4c25&scene178&cur_album_id314699…

linux0.12 head.s代码解析

重新设置IDT和GDT&#xff0c;为256个中断门设置默认的中断处理函数检查A20地址线是否启用设置数学协处理器将main函数相关的参数压栈设置分页机制&#xff0c;将页表映射到0~16MB的物理内存上返回main函数执行 源码详细注释如下: /** linux/boot/head.s** (C) 1991 Linus T…

Maven动态控制版本号秘籍:高效发包部署,版本管理不再头疼!

作者&#xff1a;唐叔在学习 专栏&#xff1a;唐叔的Java实践 关键词&#xff1a;Maven版本控制、versions插件、动态版本号、持续集成、自动化部署、Java项目管理 摘要&#xff1a;本文介绍如何使用Maven Versions插件动态控制项目版本号和依赖组件版本号&#xff0c;实现无需…

简述:普瑞时空数据建库软件(国土变更建库)之一(变更预检查部分规则)

简述&#xff1a;普瑞时空数据建库软件&#xff08;国土变更建库&#xff09;之一(变更预检查部分规则) 主要包括三种类型&#xff1a;常规检查、行政区范围检查、20X异常灭失检查 本blog地址&#xff1a;https://blog.csdn.net/hsg77