1 进程间通信的必要性
首先要明确进程间是相互独立的(独享一份虚拟地址空间,页表,资源),那怎么样才能使得两个进程间实现资源的发送?所以,两个进程一定需要看到同一份资源,并且⼀个进程需要向另⼀个或⼀组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程间通信的目的:
1.数据传输
2.资源共享
3.通知事件(也就是⼀个进程向另⼀个或⼀组进程发送消息)
4.进程控制(一些进程希望完全控制另⼀个进程的执行)
1.1 进程间通信分类
管道:(基于文件的通信方法)
1.匿名管道pipe
2.命名管道
System V IPC:(单独设计的通信模块)
System V 消息队列
System V 共享内存
System V 信号量
POSIX IPC:(网络间进程通信)
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
2 管道
管道是从⼀个进程连接到另⼀个进程的⼀个数据流。
管道的本质是一个基于文件系统的一个内存级的单向通信的文件,主要用于进程间通信(IPC)。
所以管道其实也是文件,在前面讲的文件系统中,那个管道文件是不是也要创建,要打开,要有对应的路径解析,要有对应的inode,那么文件使用的接口(write,read等管道也是可以直接使用的),但是这个文件不需要和磁盘进行IO,只需要存在内核中,所以就没有路径,没有文件名,是内核中模拟的一个文件(所以叫匿名管道),创建一个缓冲区,利用缓冲区实现两个进程看到同一份资源。
3 匿名管道
系统调用:
int pipe(int fd[2]);
这里fd是一个输出型参数,fd所对应的也就是之前讲的文件描述符,fd[0]表示读端, fd[1]表示写端,
成功返回0,失败返回错误代码。
3.1 用fork来共享管道原理
父进程创建管道后,创建子进程,子进程会拷贝父进程的资源,但是文件本身并未拷贝,而是访问该文件的文件描述符,所对应的地址是同一个文件,这时父进程关闭读端,子进程关闭写端,(这里关闭读写端的操作可以不做也能使用,但是还是最好做,以防误操作,毕竟是要实现单向通信)这样父进程就可以向管道中写入,子进程就可以从管道中读取文件。
大致逻辑:
给出一个代码例子:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>int main(int argc, char *argv[])
{int pipefd[2];if (pipe(pipefd) == -1){perror("pipe");}pid_t pid;pid = fork();if (pid == -1){perror("fork");} if (pid == 0) {close(pipefd[0]);write(pipefd[1], "hello", 5);close(pipefd[1]);exit(0);}close(pipefd[1]);char buf[10] = {0};read(pipefd[0], buf, 10);printf("buf=%s\n", buf);return 0;
}
这里就是通过子进程向管道写入“hello”,父进程读取并打印。
3.2 管道读写规则
当没有数据可读时:
O_NONBLOCK disable:read调⽤阻塞,即进程暂停执行,⼀直等到有数据来到为止。
O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
当管道满的时候 :
O_NONBLOCK disable:write调⽤阻塞,直到有进程读⾛数据
O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
如果所有管道写端对应的文件描述符被关闭,则read返回0
如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致 write进程退出。
3.3 管道读写的几种情况
情况一:管道里没有数据,读端正常
这时读端就会阻塞,等待写端写入。
情况二:读端不读取,写端一直写
缓冲区写满了,就不会再写入了。
情况三:写端关闭(不写),读端正常
read之后,就会返回0,表示读到文件结尾。
情况四:读端关闭,写端正常
OS直接杀掉该进程。(读端关闭了,写入就没有意义了,OS不会做浪费时间和空间的事,所以直接杀掉该进程)
3.4 管道的特点
只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,⼀个管道由⼀个进 程创建,然后该进程调用fork,此后父,子进程之间就可应用该管道。
管道提供流式服务
进程退出,管道释放,所以管道的生命周期随进程
内核会对管道操作进行同步与互斥
管道是半双工的,数据只能向⼀个方向流动;需要双方通信时,需要建立起两个管道
3.5 创建进程池处理任务
进程池是用父进程创建一批子进程,通过父进程向管道里面写入信息通知对应的进程完成对应的任务。
创建函数:
using callback_t =std::function<void(int fd)>;
bool InitProcessPool(callback_t cb){for (int i = 0; i < _processnum; i++){int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0){perror("pipe");return false;}pid_t id = fork();if (id < 0){perror("fork");return false;}else if (id == 0){close(pipefd[1]);cb(pipefd[0]);exit(0);}close(pipefd[0]);std::string name = "channel-" + std::to_string(i);_channels.emplace_back(pipefd[1], name, id);}return true;}
通过一个数组将这一批子进程(管道写端)进行(channel)类的管理,这里的cb是父进程通知子进程后的处理对应任务的函数。
channel类:
class channel
{
public:channel(){}channel(int wfd, std::string name, pid_t id): _wfd(wfd), _name(name), _sub_target(id){}int fd() { return _wfd; }std::string name() { return _name; }pid_t target() { return _sub_target; }void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_sub_target, nullptr, 0);(void)rid;}~channel(){}
private:int _wfd;std::string _name;pid_t _sub_target;
};
成员包括写端的文件描述符,管道名称,对应pid。以及关闭写端的函数,回收子进程的函数。
任务处理函数:
这里通过一批打印函数来模拟任务的调用
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <functional>
// 4种任务
// task_t[4];using task_t = std::function<void()>;void Download()
{std::cout << "我是一个downlowd任务" << std::endl;
}void MySql()
{std::cout << "我是一个 MySQL 任务" << std::endl;
}void Sync()
{std::cout << "我是一个数据刷新同步的任务" << std::endl;
}void Log()
{std::cout << "我是一个日志保存任务" << std::endl;
}std::vector<task_t> tasks;class Init
{
public:Init(){tasks.push_back(Download);tasks.push_back(MySql);tasks.push_back(Sync);tasks.push_back(Log);}
};Init ginit;
while(true){int code = 0;//std::cout << "子进程阻塞: " << getpid() << std::endl;ssize_t n = read(fd, &code, sizeof(code));if(n == sizeof(code)) // 任务码{std::cout << "子进程被唤醒: " << getpid() << std::endl;if(code >= 0 && code < tasks.size()){tasks[code]();}else{std::cerr << "父进程给我的任务码是不对的: " << code << std::endl;}}else if(n == 0){std::cout << "子进程应该退出了: " << getpid() << std::endl;break;}else{std::cerr << "read fd: " << fd << ", error" << std::endl;break;}}
读取成功进行对应任务处理,为0说明写端关闭了,子进程可以退出了。
控制发送任务的函数:
void PollingCtrlSubProcess(int count){if (count < 0)return;int index = 0;while (count){CtrlSubProcessHelper(index);count--;}}void CtrlSubProcessHelper(int &index){// 1. 选择一个通道(进程)int who = index;index+=rand();index %= _channels.size();// 2. 选择一个任务,随机int x = rand() % tasks.size(); // [0, 3]// 3. 任务推送给子进程std::cout << "选择信道: " << _channels[who].name() << ", subtarget : " << _channels[who].target() << std::endl;write(_channels[who].fd(), &x, sizeof(x));sleep(1);}
count表示需要处理的任务个数。
回收函数和回收子进程:
void WaitSubProcesses(){for (auto &c : _channels){c.Close();c.Wait();}}
注意这里关闭管道是并没有关闭完的,是有问题的?
注意创建子进程时,会拷贝父进程的资源,创建第一个子进程时,父进程关闭读端,第一个子进程会关闭写端,再创建一个新的子进程,会继承父进程的写端,子进程关闭对应的写端(但是关闭的是第二次创建管道的写端,而第一次的写端被这个新的子进程继承下来了),所以往后再创建子进程,也会发送同样的问题,所以需要再每次子进程关闭写端时,需要将之前继承下来的写端也关闭掉。
加入这段代码:
for(auto&e:_channels)
{std::cout << e.fd() << " ";e.Close();
}
这里也是对进程池进行了封装:
processpool.hpp
#pragma once
#include"task.hpp"
#include <iostream>
#include<functional>
#include <cstdio>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include <sys/wait.h>
#include <vector>const int gprocess_num = 5;
using callback_t =std::function<void(int fd)>;class channel
{
public:channel(){}channel(int wfd, std::string name, pid_t id): _wfd(wfd), _name(name), _sub_target(id){}int fd() { return _wfd; }std::string name() { return _name; }pid_t target() { return _sub_target; }void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_sub_target, nullptr, 0);(void)rid;}~channel(){}
private:int _wfd;std::string _name;pid_t _sub_target;
};class Processpool
{
private:void CtrlSubProcessHelper(int &index){// 1. 选择一个通道(进程)int who = index;index++;index %= _channels.size();// 2. 选择一个任务,随机int x = rand() % tasks.size(); // [0, 3]// 3. 任务推送给子进程std::cout << "选择信道: " << _channels[who].name() << ", subtarget : " << _channels[who].target() << std::endl;write(_channels[who].fd(), &x, sizeof(x));sleep(1);}
public:Processpool(int num=gprocess_num) : _processnum(num){srand(time(0));}bool InitProcessPool(callback_t cb){for (int i = 0; i < _processnum; i++){int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0){perror("pipe");return false;}pid_t id = fork();if (id < 0){perror("fork");return false;}else if (id == 0){std::cout << "进程:" << getpid() << ", 关闭了: ";for(auto&e:_channels)//处理关闭之前的读端{std::cout << e.fd() << " ";e.Close();}std::cout<<std::endl;close(pipefd[1]);cb(pipefd[0]);exit(0);}sleep(1);close(pipefd[0]);std::string name = "channel-" + std::to_string(i);_channels.emplace_back(pipefd[1], name, id);}return true;}void PollingCtrlSubProcess(){int index = 0;while (true){CtrlSubProcessHelper(index);}}void PollingCtrlSubProcess(int count){if (count < 0)return;int index = 0;while (count){CtrlSubProcessHelper(index);count--;}}void WaitSubProcesses(){for (auto &c : _channels){c.Close();c.Wait();}}
private:std::vector<channel> _channels;int _processnum;
};
main.cc
#include"processpool.hpp"
#include<vector>int main()
{Processpool pool(5);pool.InitProcessPool([](int fd){while(true){int code = 0;//std::cout << "子进程阻塞: " << getpid() << std::endl;ssize_t n = read(fd, &code, sizeof(code));if(n == sizeof(code)) // 任务码{std::cout << "子进程被唤醒: " << getpid() << std::endl;if(code >= 0 && code < tasks.size()){tasks[code]();}else{std::cerr << "父进程给我的任务码是不对的: " << code << std::endl;}}else if(n == 0){std::cout << "子进程应该退出了: " << getpid() << std::endl;break;}else{std::cerr << "read fd: " << fd << ", error" << std::endl;break;}}});sleep(5);// 3. 控制进程池pool.PollingCtrlSubProcess(10);// 4. 结束线程池pool.WaitSubProcesses();std::cout << "父进程控制子进程完成,父进程结束" << std::endl;}
task.hpp
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <functional>
// 4种任务
// task_t[4];using task_t = std::function<void()>;void Download()
{std::cout << "我是一个downlowd任务" << std::endl;
}void MySql()
{std::cout << "我是一个 MySQL 任务" << std::endl;
}void Sync()
{std::cout << "我是一个数据刷新同步的任务" << std::endl;
}void Log()
{std::cout << "我是一个日志保存任务" << std::endl;
}std::vector<task_t> tasks;class Init
{
public:Init(){tasks.push_back(Download);tasks.push_back(MySql);tasks.push_back(Sync);tasks.push_back(Log);}
};Init ginit;
运行结果: