1.基础BlockingQueue的生产者消费模型
1.1 BlockQueue
在多线程编程中阻塞队列是一种常用于实现生产者和消费者模型的数据结构,它与普通的队列区别在于,当队列为空时,从队列获取元素的操作将被阻塞,直到队列中放入了新的数据。当队列满的时候,往里面插入数据也会被阻塞直到有元素被取出
1.2 c++ queue模拟实现阻塞队列的生产消费模型
为了理解我们来看一下代码
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
template <typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _block_queue.size() == _cap;
}
bool IsEmpty()
{
return _block_queue.empty();
}
public:
BlockQueue(int cap) : _cap(cap)
{
_productor_wait_num = 0;
_consumer_wait_num = 0;
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_product_cond, nullptr);
pthread_cond_init(&_consum_cond, nullptr);
}
void Enqueue(T &in) // ⽣产者⽤的接⼝
{
pthread_mutex_lock(&_mutex);
while(IsFull()) // 保证代码的健壮性
{
// ⽣产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
// 1. pthread_cond_wait调⽤是: a. 让调⽤线程等待 b. ⾃动释放曾经持有的
_mutex锁 c. 当条件满⾜,线程唤醒,pthread_cond_wait要求线性
// 必须重新竞争_mutex锁,竞争成功,⽅可返回!!!
// 之前:安全
_productor_wait_num++;
pthread_cond_wait(&_product_cond, &_mutex); // 只要等待,必定会有唤
醒,唤醒的时候,就要继续从这个位置向下运⾏!!
_productor_wait_num--;
// 之后:安全
}
// 进⾏⽣产
// _block_queue.push(std::move(in));
// std::cout << in << std::endl;
_block_queue.push(in);
// 通知消费者来消费
if(_consumer_wait_num > 0)
pthread_cond_signal(&_consum_cond); // pthread_cond_broadcast
pthread_mutex_unlock(&_mutex);
}
void Pop(T *out) // 消费者⽤的接⼝ --- 5个消费者
{
pthread_mutex_lock(&_mutex);
while(IsEmpty()) // 保证代码的健壮性
{
// 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
// 1. pthread_cond_wait调⽤是: a. 让调⽤进程等待 b. ⾃动释放曾经持有的
_mutex锁
_consumer_wait_num++;
pthread_cond_wait(&_consum_cond, &_mutex); // 伪唤醒
_consumer_wait_num--;
}// 进⾏消费
*out = _block_queue.front();
_block_queue.pop();
// 通知⽣产者来⽣产
if(_productor_wait_num > 0)
pthread_cond_signal(&_product_cond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_product_cond);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_product_cond);
pthread_cond_destroy(&_consum_cond);
}
private:
std::queue<T> _block_queue; // 阻塞队列,是被整体使⽤的!!!
int _cap; // 总上限
pthread_mutex_t _mutex; // 保护_block_queue的锁
pthread_cond_t _product_cond; // 专⻔给⽣产者提供的条件变量
pthread_cond_t _consum_cond; // 专⻔给消费者提供的条件变量
int _productor_wait_num;
int _consumer_wait_num;
};📌 注意:这⾥采⽤模版,是想告诉我们,队列中不仅仅可以防⽌内置类型,⽐如int, 对象也可
以作为任务来参与⽣产消费的过程哦.
下⾯附上⼀张代码,⽅便课堂使⽤
#pragma once
#include <iostream>
#include <string>
#include <functional>
// 任务类型1
// class Task
// {
// public:
// Task() {}
// Task(int a, int b) : _a(a), _b(b), _result(0)
// {
// }
// void Excute()
// {
// _result = _a + _b;
// }
// std::string ResultToString()
// {
// return std::to_string(_a) + "+" + std::to_string(_b) + "=" +
std::to_string(_result);
// }
// std::string DebugToString()
// {
// return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
// }
// private:
// int _a;
// int _b;
// int _result;
// };
// 任务类型2
using Task = std::function<void()>;
2-3 为什么ptread_cond_wait需要互斥量
条件等待是线程同步的一种手段,如果只有一个线程,条件不满足,一直等待下去都不会被满足,所以必须要有一个线程通过某些手段,去改变共享变量,使原先的条件不满足变成满足,然后再去友好的通知等待在该条件变量上的线程
条件不会无缘无故的突然变得满足了,必然牵扯到共享数据的变化,所以一定要用互斥锁来保护
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
//解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
由于解锁和等待不是原子性,调用解锁之后pthread_cond_wait之前,如果有其他的线程获得互斥量摒弃条件满⾜,发送了信号,那么 pthread_cond_wait 将错过这个信 号,可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是⼀个原⼦操作。
2-4 条件变量的使用规范
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
给条件发现信号
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
2-5条件变量的封装
大家有兴趣的可以看一下
test_4_7_线程池/Cond.h · liu xi peng/linux---ubuntu系统 - 码云 - 开源中国
2-6 POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源,但是OIXIS可以用于线程间同步
初始化
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表⽰线程间共享,⾮零表⽰进程间共享
value:信号量初始值
销毁
int sem_destroy(sem_t *sem);
等待