1.初识线程池
问:线程池是什么?
答:维持管理一定数量的线程的池式结构。(维持:线程复用 。 管理:没有收到任务的线程处于阻塞休眠状态不参与cpu调度 。一定数量:数量太多的线程会给操作系统带来线程频繁切换的开销)
问:线程池解决的问题是什么?
答:异步执行耗时任务,充分利用多核,不过度占用核心业务线程。(耗时:耗时等待(io),耗时计算,不耗时的任务不会启用线程池解放核心线程的压力)
问:线程池是如何解决问题的?(工作原理)
答:生产者(核心线程)来使用线程,抛出任务到任务队列,消费者(线程池线程)从队列中取出任务进行执行。线程为空要阻塞线程池线程,避免轮询队列增加操作系统负担。
问:为什么线程池总是使用队列?
答:生产者对应push,消费者对应pop,操作队列时间为O(1),占用锁的时间短,可以使锁的使用更加灵活。
2.用C++代码实现:
1.线程池的类:
对外提供三个接口,内部两个核心数据结构与一个核心工作函数
#pragma once#include <thread>
#include <functional>
#include <vector>// blockingqueue 仅仅只能用作指针或引用 不直接引入头文件,避免循环依赖
template <typename T>
class BlockingQueue;class ThreadPool {
public:// 初始化线程池 explicit作用:避免ThreadPool tp = 3的赋值情况explicit ThreadPool(int threads_num);// 停止线程池~ThreadPool();// 发布任务到线程池void Post(std::function<void()> task); //function<void()>表示一个没有参数无返回值的可调用对象private:void Worker();std::unique_ptr<BlockingQueue<std::function<void()>>> task_queue_; //unique_ptr作用:线程池独占该任务队列,若调用者轻易浅拷贝,提供报错;在Pool离开作用域的时候,自动调用析构函数删除队列std::vector<std::thread> workers_;
};
2.对外三个接口以及核心工作函数的实现
#include "blockingqueue.h"
#include <memory>
#include "threadpool.h"ThreadPool::ThreadPool(int threads_num) {task_queue_ = std::make_unique<BlockingQueue<std::function<void()>>>();for (size_t i = 0; i < threads_num; ++i) {workers_.emplace_back([this] {Worker(); }); //this捕获当前 调用ThreadPool的pool对象 ;worker为线程执行函数}
}// 停止线程池
ThreadPool::~ThreadPool() {task_queue_->Cancel();for (auto& worker : workers_) {if (worker.joinable())worker.join();}
}void ThreadPool::Post(std::function<void()> task) {task_queue_->Push(task);
}void ThreadPool::Worker() {while (true) {std::function<void()> task;if (!task_queue_->Pop(task)) {break;}task();}
}
3.核心数据结构:自定义队列类的实现
class BlockingQueuePro {
public:BlockingQueuePro(bool nonblock = false) : nonblock_(nonblock) {}//提供给生产者(主)线程使用void Push(const T& value) {std::lock_guard<std::mutex> lock(prod_mutex_);prod_queue_.push(value);not_empty_.notify_one();}//提供给消费者线程使用bool Pop(T& value) {std::unique_lock<std::mutex> lock(cons_mutex_);if (cons_queue_.empty() && SwapQueue_() == 0) { return false;//队列已关闭}value = cons_queue_.front();cons_queue_.pop();return true;}//提供给生产者(主)线程使用 销毁线程池时调用void Cancel() {std::lock_guard<std::mutex> lock(prod_mutex_);nonblock_ = true;not_empty_.notify_all();}private:int SwapQueue_() {std::unique_lock<std::mutex> lock(prod_mutex_);//所有消费者进程阻塞在下面一行等待被唤醒 (正常唤醒 or 异常唤醒(线程池销毁))not_empty_.wait(lock, [this] {return !prod_queue_.empty() || nonblock_; });std::swap(prod_queue_, cons_queue_);return cons_queue_.size();}bool nonblock_;//设置阻塞队列的状态,构造置0表示默认有阻塞,cancel中置为1表示无阻塞std::queue<T> prod_queue_;//STL里的队列std::queue<T> cons_queue_;std::mutex prod_mutex_;std::mutex cons_mutex_;std::condition_variable not_empty_;//条件变量cond 用于线程间通信: 消费者wait 生产者唤醒
};
3.工作流程
初始化线程池,创建线程。线程在执行task()之前,阻塞在任务队列的Pop,等待主业务线程将task 通过post加入到任务队列后将某一线程唤醒(或者要销毁任务队列了唤醒所有线程)。所有任务执行完毕后,销毁线程池(可选)。