事件循环线程池的理解
- 前置知识
- reactor模型
- thread::start()方法的理解
- 创建线程池
- 子线程被唤醒的几种情况
- 子线程被主线程唤醒
- 新连接到来
- 有消息需要发送时(多reactor情况时)
- 关闭连接时
- 子线程被唤醒执行任务
在 上一篇中,我们讨论了关于简单的线程池的实现,此次我们就基于简单的线程池实现,深入剖析muduo网络库的事件循环线程池的代码:
前置知识
reactor模型
thread::start()方法的理解
#include "Thread.h"
#include "CurrentThread.h"#include <semaphore.h>std::atomic_int Thread::numCreated_(0);Thread::Thread(ThreadFunc func, const std::string &name): started_(false), joined_(false), tid_(0), func_(std::move(func)), name_(name)
{setDefaultName();
}Thread::~Thread()
{if (started_ && !joined_){ // thread类提供了设置分离线程的方法 线程运行后后台自动销毁(非阻塞)thread_->detach(); }
}void Thread::start() // 一个Thread对象 记录的就是一个新线程的详细信息
{started_ = true;//定义一个信号量sem_t sem;//设置信号量sem,是用于线程间,初值为0sem_init(&sem, false, 0); // false指的是 不设置进程间共享// 开启线程thread_ = std::shared_ptr<std::thread>(new std::thread([&]() {tid_ = CurrentThread::tid(); // 获取线程的tid值sem_post(&sem);//sem值+1func_(); // 开启一个新线程 专门执行该线程函数}));// 这里必须等待获取上面新创建的线程的tid值sem_wait(&sem);
}void Thread::join()
{joined_ = true;thread_->join();//阻塞等待当前线程执行完毕
}void Thread::setDefaultName()
{int num = ++numCreated_;if (name_.empty()){char buf[32] = {0};snprintf(buf, sizeof buf, "Thread%d", num);name_ = buf;}
}
这个类是对std::thread进行了一个封装,主要关注Thread::start
方法,核心实现是定义了一个信号量sem,以确保子线程能够成功被创建,具体的创建流程如下图。
创建线程池
- TcpServer启动线程池
- 在线程池中根据传入的线程数量创建事件循环线程EventLoopThread,并且将创建的事件循环线程加到线程池threads中去,并且调用EventLoopThread->loop函数,随后绑定返回的已经创建好事件循环EventLoop
- 创建事件循环EventLoopThread的时候绑定函数threadFunc
- 调用thread_.start()函数,thread_创建了一个子线程,该子线程执行绑定的threadFunc函数。在threadFunc函数中,创建事件循环EventLoop,创建成功了通知主线程,并在开始事件循环EventLoop.loop();与此同时,主线程在循环等待子线程成功创建EventLoop,并接收创建好的EventLoop对象,并返回。
上述过程可以总结为:主线程随着函数调用开辟子线程,子线程成功创建事件循环,并在在子线程中事件事件循环,实现了one loop per thread,主线程返回到创建事件循环线程池中,继续执行其他任务。如下图所示。
关于主线程的生命周期的理解:
EventLoop *EventLoopThread::startLoop()
{thread_.start(); // 启用底层线程Thread类对象thread_中通过start()创建的线程EventLoop *loop = nullptr;{std::unique_lock<std::mutex> lock(mutex_);cond_.wait(lock, [this](){return loop_ != nullptr;});loop = loop_;}return loop;
}
这里是存在开辟了一个新的线程,可以称为子线程,子线程中是执行eventloop.loop()事件循环的,在程序的运行过程中不会被回收;但是这个主线程的,随着return loop
;的返回,主线程是不是会被释放呢?其实不是的,这里的主线程可以看成是负责整个项目的实现的核心线程,当主线程执行startLoop()函数后创建了子线程并返回,主线程就继续执行其他的函数,并不是说返回了就被销毁了。
比如说主线程执行完startLoop()的返回顺序是:EventLoopThread::startLoop()—>>> EventLoopThreadPool::start—>>>然后执行TcpServer::start()中的loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get())),跟调用顺序是相反的。见上图
感悟
:关于这一点理解起来还是花了挺久的,这个项目前前后后也是花了挺多时间看了,但是总是存在这里那里的小问题,不影响理解整体项目,但是这些小点是真的才能感受到项目的高明;也是花时间整理这些小知识点,才可以有更深的感悟。
子线程被唤醒的几种情况
首先,我们需要强调一下,这里存在mainloop(一个)和subloop(多个),mainloop所在的线程为主线程,subloop所在的线程为子线程(可以理解为,subloop1对应为子线程1,subloop2对应为子线程2等)
子线程被主线程唤醒
新连接到来
当一切都初始化好后,MainLoop的Acceptor开始监听,当有新的连接到来时,触发新连接到来的回调函数,在回调函数中处理新连接的相关操作。
- 调用getNextLoop()函数
- 执行轮询算法返回一个事件循环EventLoop
- 对得到的事件循环loop绑定TCP连接,并且设置回调函数(通过TcpConnection设置,但最终是设置到EventLoop中的Channel回调函数中),见下图(图片来源万字长文梳理Muduo库核心代码及优秀编程细节思想剖析,更多可以参考这篇文章)
- 调用runInLoop,在对应的EventLoop中执行相应的回调函数。但是这里存在一个问题,即我是通过MainLoop上所在的线程选择了一个子Loop,假如说为subLoop1,并且需要执行在subLoop1上绑定的函数,而当前的线程是MainLoop对应的线程。如果之间运行,这就不满足每一个线程运行一个EventLoop的条件了(one loop per thread)。所以在这里,就需要判断一下,是否是在当前loop中对应的线程执行的任务。如果不是,需要通过wakeup函数唤醒当前loop对应的线程。
- 然后loop对应的线程就被唤醒起来工作(相当于消费者来消费任务了),执行绑定的connectEstablished()函数,在connectEstablished()函数实现tie()函数绑定,并且设置监听读事件。
建立Tcp连接后,以后发生在这个连接上的所有事件都交由这个SubLoop1来负责了。
有消息需要发送时(多reactor情况时)
在这里插入代码片
关闭连接时
当连接断开或者关闭连接,执行TcpServer::removeConnection()
回调函数。
- 上层调用
TcpServer::removeConnection
函数,调用runInLoop
函数,并绑定执行TcpServer::removeConnectionInLoop
- 因为
removeConnectionInLoop
是执行在mainloop对应的线程(执行TcpServer类下的函数都在mainloop中对应的线程,也可以说在主线程中运行),所以这里之间是在当前的mainloop中执行回调,执行removeConnectionInLoop
函数 - 首先移除TCP连接,并且得到将要移除的连接对应的ioloop;因为当前是处于mainloop对应的主线程,所以ioloop执行调用
queueInLoop
- 在
queueInLoop
函数中,核心是异步唤醒自己对应的线程,并执行等待执行的任务(在这里是connectDestroyed) - 在
connectDestroyed
中,将待移除的TCP连接中的channel中的所有感兴趣的事件从poller中移除掉,并将channel从poller中移除。
子线程被唤醒执行任务
我们知道,在线程池初始化过程中,每一个线程就对应一个事件循环(eventloop)已开始循环,并且当新连接到来时,mainloop按照轮询方法将新连接分发给一个事件循环,以后这个事件循环就负责这个新连接的所有操作了(包括接收消息、发送消息等)。
void EventLoop::loop()
{looping_ = true;quit_ = false;LOG_INFO("EventLoop %p start looping\n", this);while (!quit_){activeChannels_.clear();pollRetureTime_ = poller_->poll(kPollTimeMs, &activeChannels_);for (Channel *channel : activeChannels_){// Poller监听哪些channel发生了事件 然后上报给EventLoop 通知channel处理相应的事件channel->handleEvent(pollRetureTime_);}doPendingFunctors();//执行等待处理的函数,一般是主线程添加的。}LOG_INFO("EventLoop %p stop looping.\n", this);looping_ = false;
}
可以看到,初始化的事件循环eventloop在自己的线程内循环等待任务的来临。
pollRetureTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
这句话是监听对于每一个连接注册的事件是否发生了(如果发生了,注册到activeChannels_并依序执行);否则超时返回。