消息队列允许一个进程将一个消息发送到一个队列中,另一个进程从该队列中接收这个消息。
使用流程:
写端:
- 使用结构体 mq_attr 设置消息队列属性,有四个选项:
long mq_flags; // 队列属性: 0 表示阻塞
long mq_maxmsg; // 最大消息数
long mq_msgsize;// 每条消息最大字节数
long mq_curmsgs; // 当前消息数(只读)通常只用设置中间两个,其他设为0。
- 使用mq_open()创建或打开消息队列,其返回值为消息队列描述符(mqd_t)。
- 向消息队列中发送消息
- 设置发送超时等待时间,也就是当消息队列已满的情况下继续发送消息,超过等待时间后就丢弃这个消息。
- 定义时间规格结构体 timespec ,使用clock_gettime(CLOCK_REALTIME)将timespec设置为当前时间。
- 自己决定等待时间为多少。
- 使用mq_timedsend()发送消息。
- 清理:使用mq_close() 关闭消息队列描述符
读端:
- 使用mq_open()打开消息队列。
- 从消息队列中读消息。
- 设置接收超时等待时间,具体的超时处理自己决定,可以退出,也可以继续等待。设置方式同上。
- 使用mq_timedreceive()接收消息。
- 清理:使用mq_close()关闭消息队列;使用mq_unlink()删除消息队列。
代码示例:
mq_write.cpp
#include <iostream>
#include <mqueue.h>
#include <cstdio>
#include <cstdlib>
#include <fcntl.h>
#include <time.h>
#include <string>
#include <errno.h>const char *mq_name = "/mq_name";int main(int argc, char const *argv[])
{// 1.设置消息队列属性struct mq_attr attr = {0, 3, 128, 0};// 2.创建或打开消息队列mqd_t mqd = mq_open(mq_name, O_WRONLY | O_CREAT, 0664, &attr);if (mqd == (mqd_t)-1){perror("mq_open");exit(EXIT_FAILURE);}// 3.往消息队列中写数据while (true){std::string input;std::cout << "请输入要传输的数据(exit退出):" << std::endl;std::getline(std::cin, input);// 设置5s的超时等待struct timespec st;clock_gettime(CLOCK_REALTIME, &st);st.tv_sec += 5;// 发送数据if (mq_timedsend(mqd, input.c_str(), input.size(), 0, &st) == -1){perror("mq_timedsend");if (errno == ETIMEDOUT){std::cerr << "消息队列已满,等待中...\n";if (input == "exit")break;continue;}}if (input == "exit")break;}// 4.清除mq_close(mqd);return 0;
}
mq_read.cpp
#include <iostream>
#include <mqueue.h>
#include <cstdio>
#include <unistd.h>
#include <cstdlib>
#include <fcntl.h>
#include <time.h>
#include <string>
#include <errno.h>const char *mq_name = "/mq_name";int main(int argc, char const *argv[])
{// 1.打开消息队列mqd_t mqd = mq_open(mq_name, O_RDONLY);if (mqd == (mqd_t)-1){perror("mq_open");exit(EXIT_FAILURE);}// 2.从消息队列中读数据while (true){// 设置5s的超时等待struct timespec st;clock_gettime(CLOCK_REALTIME, &st);st.tv_sec += 5;// 接收数据char buffer[128] = {0};if (mq_timedreceive(mqd, buffer, 128, nullptr, &st) == -1){perror("mq_timedreceive");if (errno == ETIMEDOUT){std::cerr << "接收超时,继续等待...\n";continue;}elsebreak;}std::string output(buffer);if (output == "exit"){break;}std::cout << "收到消息:" << output << std::endl;}// 4.清除mq_close(mqd);mq_unlink(mq_name);return 0;
}
编译后同时执行两个程序,效果如下: