1. 什么是线程同步
为什么会有线程同步,那一定是有了新问题。互斥可以解决临界资源被同时访问的问题,但是纯互斥也会带来新的问题。由于当前被执行的线程离cpu最近【其他线程被阻塞挂起还要被唤醒】,所以,当前进程对于竞争锁天然就有极大的优势。这势必会导致当前线程重复申请和释放锁,其他线程很难拿到锁,也就会造成线程饥饿的问题。纯粹的互斥,不高效,也不公平!
因此,我们提出新的要求,当前线程一旦释放锁就不能立即申请锁,外面的线程也不能乱作一团的竞争锁,必须排队申请锁。而刚释放锁的线程要想再次申请锁,就必须排到队列的末尾!!
我们把多个执行流在临界区安全的前提下,按照顺序依次访问临界资源叫做线程同步!
2. 条件变量
> 理解条件变量
• 当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
• 例如⼀个线程访问队列时,发现队列为空,它只能等待,直到其它线程将⼀个节点添加到队列 中。这种情况就需要用到条件变量。
为什么需要用到条件变量呢??因为线程a需要访问队列,线程b添加节点。这两个线程互相竞争锁,但是,只有队列中添加节点后【即满足一定的条件】,线程a拿到锁才能做有效动作。因此,我们引入了条件变量。
当每个线程a访问队列时发现并不满足条件,那么就挂起到队列中等待。当线程b将节点添加之后,满足了条件变量,此时再唤醒等待队列中的一个或全部线程让他们来访问临界资源。这样做就高效多了!
> 条件变量接口
定义和初始化条件变量和互斥锁的使用和互斥锁一样。
这一批接口就是线程在指定条件变量下等待,很好理解。
唤醒在指定条件变量下等待的一个或所有线程。
> 使用条件变量接口demon
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <string>const int num = 5; // 创建5个线程
int cnt = 0;// 定义初始化锁和条件变量
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;void *routine(void *args)
{std::string name = static_cast<char *>(args);while (true){// 加锁pthread_mutex_lock(&glock);// 使用条件变量等待pthread_cond_wait(&gcond, &glock);std::cout << name << "计算" << cnt << std::endl;cnt++;// 解锁pthread_mutex_unlock(&glock);}return nullptr;
}int main()
{std::vector<pthread_t> tids;// 创建5个线程for (int i = 0; i < num; i++){char *name = new char[64];snprintf(name, 64, "线程%d", i);pthread_t tid;pthread_create(&tid, nullptr, routine, name);tids.push_back(tid);// 每个1秒创建一个线程sleep(1);}// 主线程每隔一秒主动唤醒线程while (true){std::cout << "唤醒线程" << std::endl;// pthread_cond_signal(&gcond);//每隔一秒主动唤醒一个线程pthread_cond_broadcast(&gcond); // 每隔一秒主动唤醒所有线程sleep(1);}for (auto &e : tids){pthread_join(e, nullptr);}return 0;
}
每次唤醒一个线程: 线程依次被唤醒在队列中同步串行运行。
每次唤醒所有线程:所有线程竞争锁。
3. 生产者消费者模型
在现实生活中,我们不是直接去工厂购物,而是在超市中购物,因为我们直接去工厂购物成本高,效率低。
在生产者消费者模型中,一共有3种关系,2种角色,1个交易场所。【321原则】
三种关系:消费者和消费者之间【互斥竞争关系】,生产者和生产者之间【互斥竞争关系】,生产者和消费者之间【互斥和同步关系】。
两种角色:生产者和消费者角色【由线程承担】。
一个交易场所:以特定结构构成的一种”内存“空间。
为什么要有生产者消费者模型呢??
1. 生产过程和消费过程解耦。【由于互斥和同步机制,生产和消费之间是不互相干扰的】
2. 支持忙闲不均。【即便消费者线程不工作,生产者线程也可以不断执行。反之也是如此】
3. 提高效率。【获取任务和处理任务是并发的!!】
4. 基于阻塞队列的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是⼀种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
block_queue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>const int defaultcap = 5;template <typename T>
class block_queue
{bool is_empty(){return _q.size() <= 0;}bool is_full(){return _q.size() >= _cap;}public:block_queue(int num = defaultcap): _cap(num), _consum_sleep_num(0), _product_sleep_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);}// 消费T &consum(){pthread_mutex_lock(&_mutex);while (is_empty()){// 空了就在阻塞队列等待被唤醒_consum_sleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_consum_sleep_num--; // 被唤醒}T &out = _q.front();_q.pop();// 队列一定不满,唤醒生产者if (_product_sleep_num != 0){std::cout << "唤醒生产者……" << std::endl;pthread_cond_signal(&_full_cond);}// 解锁pthread_mutex_unlock(&_mutex);return out;}// 生产void product(const T &in){pthread_mutex_lock(&_mutex);while (is_full()){// 满了就在阻塞队列等待被唤醒_product_sleep_num++;pthread_cond_wait(&_full_cond, &_mutex);_product_sleep_num--; // 被唤醒}_q.push(in);// 队列一定不空,唤醒消费者if (_consum_sleep_num != 0){std::cout << "唤醒消费者……" << std::endl;pthread_cond_signal(&_empty_cond);}// 解锁pthread_mutex_unlock(&_mutex);}~block_queue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q;int _cap; // 队列容量上限pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _consum_sleep_num; // 消费者休眠个数int _product_sleep_num; // 生产者休眠个数
};
消费者这里为什么要写成循环呢?? 假设生产者只生产了一个商品,但是生产者却唤醒了一批消费者!!那么消费者就全部来竞争锁,一个消费者消费完一个商品后释放锁。其他消费者并不在阻塞队列中等待,而是在锁上等待。如果消费者立即拿到锁,生产者还没有来得及生产,那么消费者就没有商品可以消费了,但是此时消费者已近持有锁区消费空队列了,也就是_q.pop()。这样在代码层面就出问题了!!我们把这种情况称为伪唤醒!所以我们要在判断上加上循环,即便被唤醒了,也要再次检查队列中商品是否为空!
main.cc
#include "block_queue.hpp"
#include <unistd.h>// 生产者
void *product(void *args)
{int cnt = 1;block_queue<int> *bq = static_cast<block_queue<int> *>(args);while (true){sleep(1);bq->product(cnt);std::cout << "生产者生产了:" << cnt << std::endl;cnt++;}
}// 消费者
void *consum(void *args)
{block_queue<int> *bq = static_cast<block_queue<int> *>(args);while (true){//sleep(1);int t = bq->consum();std::cout << "生产者消费了:" << t << std::endl;}
}int main()
{// 申请阻塞队列block_queue<int> *bq = new block_queue<int>();// 构建生产者和消费者pthread_t p, c;pthread_create(&p, nullptr, product, bq);pthread_create(&p, nullptr, consum, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);return 0;
}
5. 封装条件变量
mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace mutex_module
{class mutex{public:mutex(){int n = pthread_mutex_init(&_lock, nullptr);(void)n;}void lock(){pthread_mutex_lock(&_lock);}void unlock(){pthread_mutex_unlock(&_lock);}~mutex(){int n = pthread_mutex_destroy(&_lock);}pthread_mutex_t* get(){return &_lock;}private:pthread_mutex_t _lock;};// 采⽤RAII⻛格,进⾏锁管理class lock_guard{public:lock_guard(mutex &m) : _m(m){_m.lock();}~lock_guard(){_m.unlock();}private:mutex &_m;};
}
cond.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include "mutex.hpp"using namespace mutex_module;namespace cond_module
{class cond{public:cond(){pthread_cond_init(&_cond, nullptr);}void wait(mutex &lock){pthread_cond_wait(&_cond, lock.get());}void signal(){pthread_cond_signal(&_cond);}void broadcast(){pthread_cond_broadcast(&_cond);}~cond(){pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};
}