以下是一个实现,可以发送和接收任意类型的结构体消息,而不仅限于特定的CustomMsg
类型:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>// 通用消息结构体模板
// 要求: 所有消息结构体的第一个字段必须是long类型的mtype
#define MSG_HEADER long mtype// 消息队列配置
#define MSG_QUEUE_KEY 0x1234 // 自定义消息队列键值// 错误处理宏
#define ERROR_EXIT(msg) do { perror(msg); exit(EXIT_FAILURE); } while(0)// 创建消息队列
int create_message_queue(key_t key) {int msgid;// 创建消息队列 (IPC_CREAT | IPC_EXCL | 0666)if ((msgid = msgget(key, IPC_CREAT | IPC_EXCL | 0666)) == -1) {if (errno == EEXIST) {// 如果已存在,则直接获取msgid = msgget(key, 0666);if (msgid == -1) {ERROR_EXIT("msgget failed to get existing queue");}} else {ERROR_EXIT("msgget failed to create new queue");}}return msgid;
}// 发送任意结构体消息
// msg: 指向包含MSG_HEADER的结构体指针
// msg_size: 结构体总大小
void send_struct_message(int msgid, void *msg, size_t msg_size) {// 计算实际数据大小 (减去mtype的大小)size_t data_size = msg_size - sizeof(long);if (msgsnd(msgid, msg, data_size, IPC_NOWAIT) == -1) {ERROR_EXIT("msgsnd failed");}printf("Sent %zu byte message (type %ld)\n", msg_size, ((long*)msg)[0]);
}// 接收任意结构体消息
// msg: 指向包含MSG_HEADER的结构体指针
// msg_size: 结构体总大小
// msg_type: 期望接收的消息类型
void receive_struct_message(int msgid, void *msg, size_t msg_size, long msg_type) {// 计算实际数据大小 (减去mtype的大小)size_t data_size = msg_size - sizeof(long);ssize_t bytes = msgrcv(msgid, msg, data_size, msg_type, 0);if (bytes == -1) {ERROR_EXIT("msgrcv failed");}printf("Received %zd byte message (type %ld)\n", bytes + sizeof(long), ((long*)msg)[0]);
}// 删除消息队列
void remove_message_queue(int msgid) {if (msgctl(msgid, IPC_RMID, NULL) == -1) {if (errno != EIDRM) { // 忽略已删除的错误perror("msgctl IPC_RMID failed");}}
}// ===================== 使用示例 =====================// 示例消息结构体1
typedef struct {MSG_HEADER; // 必须作为第一个字段int sensor_id;float temperature;float humidity;unsigned long timestamp;
} SensorData;// 示例消息结构体2
typedef struct {MSG_HEADER; // 必须作为第一个字段char device_name[16];int state;int error_code;float voltage;float current;
} DeviceStatus;// 示例消息结构体3
typedef struct {MSG_HEADER; // 必须作为第一个字段short x;short y;short z;unsigned char accuracy;
} MotionData;int main() {int msgid = create_message_queue(MSG_QUEUE_KEY);pid_t pid = fork();if (pid < 0) {ERROR_EXIT("fork failed");}if (pid > 0) { // 父进程 - 发送者// 给接收者时间启动sleep(1);// 发送SensorData消息SensorData sensor_msg = {.mtype = 1,.sensor_id = 101,.temperature = 25.6f,.humidity = 45.7f,.timestamp = 1234567890};send_struct_message(msgid, &sensor_msg, sizeof(SensorData));// 发送DeviceStatus消息DeviceStatus device_msg = {.mtype = 2,.device_name = "Main Controller",.state = 1,.error_code = 0,.voltage = 3.3f,.current = 0.75f};send_struct_message(msgid, &device_msg, sizeof(DeviceStatus));// 发送MotionData消息MotionData motion_msg = {.mtype = 3,.x = 1024,.y = -512,.z = 256,.accuracy = 95};send_struct_message(msgid, &motion_msg, sizeof(MotionData));// 等待接收者处理sleep(1);// 删除消息队列remove_message_queue(msgid);} else { // 子进程 - 接收者// 接收SensorData消息SensorData recv_sensor;receive_struct_message(msgid, &recv_sensor, sizeof(SensorData), 1);printf("SensorData: ID=%d, Temp=%.1f°C, Hum=%.1f%%, Time=%lu\n",recv_sensor.sensor_id, recv_sensor.temperature,recv_sensor.humidity, recv_sensor.timestamp);// 接收DeviceStatus消息DeviceStatus recv_device;receive_struct_message(msgid, &recv_device, sizeof(DeviceStatus), 2);printf("DeviceStatus: Name='%s', State=%d, Error=%d, V=%.2fV, I=%.2fA\n",recv_device.device_name, recv_device.state,recv_device.error_code, recv_device.voltage, recv_device.current);// 接收MotionData消息MotionData recv_motion;receive_struct_message(msgid, &recv_motion, sizeof(MotionData), 3);printf("MotionData: X=%d, Y=%d, Z=%d, Accuracy=%d%%\n",recv_motion.x, recv_motion.y, recv_motion.z, recv_motion.accuracy);}return EXIT_SUCCESS;
}
消息队列限制:
使用
msgctl(IPC_STAT)
检查队列状态监控队列使用情况,避免溢出
这个实现提供了高度灵活的消息传递机制,适用于各种嵌入式场景,从简单的传感器数据采集到复杂的设备控制命令,都可以通过定义适当的结构体来实现高效通信。
=========================阻塞和非阻塞接收方式===============================
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <stdbool.h>// 通用消息头要求
#define MSG_HEADER long mtype// 消息队列配置
#define MSG_QUEUE_KEY 0x1234// 错误处理宏
#define ERROR_EXIT(msg) do { perror(msg); exit(EXIT_FAILURE); } while(0)// 创建消息队列
int create_message_queue(key_t key) {int msgid = msgget(key, IPC_CREAT | 0666);if (msgid == -1) {ERROR_EXIT("msgget failed");}return msgid;
}// 发送任意结构体消息(阻塞)
void send_struct_message(int msgid, void *msg, size_t msg_size) {size_t data_size = msg_size - sizeof(long);if (msgsnd(msgid, msg, data_size, 0) == -1) { // 阻塞发送ERROR_EXIT("msgsnd failed");}printf("Sent %zu byte message (type %ld)\n", msg_size, ((long*)msg)[0]);
}// 阻塞接收任意结构体消息
bool receive_struct_message_blocking(int msgid, void *msg, size_t msg_size, long msg_type) {size_t data_size = msg_size - sizeof(long);ssize_t bytes = msgrcv(msgid, msg, data_size, msg_type, 0);if (bytes == -1) {return false;}printf("Received %zd byte message (type %ld) [blocking]\n", bytes + sizeof(long), ((long*)msg)[0]);return true;
}// 非阻塞接收任意结构体消息
bool receive_struct_message_nonblocking(int msgid, void *msg, size_t msg_size, long msg_type) {size_t data_size = msg_size - sizeof(long);ssize_t bytes = msgrcv(msgid, msg, data_size, msg_type, IPC_NOWAIT);if (bytes == -1) {if (errno == ENOMSG) {// 没有消息不是错误,只是需要重试return false;}ERROR_EXIT("msgrcv failed");}printf("Received %zd byte message (type %ld) [non-blocking]\n", bytes + sizeof(long), ((long*)msg)[0]);return true;
}// 带超时的接收(混合模式)
bool receive_struct_message_timeout(int msgid, void *msg, size_t msg_size, long msg_type, int timeout_sec) {for (int i = 0; i < timeout_sec * 10; i++) {if (receive_struct_message_nonblocking(msgid, msg, msg_size, msg_type)) {return true;}// 等待100ms后重试usleep(100 * 1000);}return false;
}// 删除消息队列
void remove_message_queue(int msgid) {if (msgctl(msgid, IPC_RMID, NULL) == -1 && errno != EIDRM) {perror("msgctl IPC_RMID failed");}
}// ===================== 使用示例 =====================typedef struct {MSG_HEADER;int counter;char data[64];
} TestMessage;int main() {int msgid = create_message_queue(MSG_QUEUE_KEY);pid_t pid = fork();if (pid < 0) {ERROR_EXIT("fork failed");}if (pid > 0) { // 父进程 - 发送者sleep(1); // 等待接收者准备// 发送3条消息for (int i = 1; i <= 3; i++) {TestMessage msg = {.mtype = 1,.counter = i,.data = "Blocking test"};send_struct_message(msgid, &msg, sizeof(TestMessage));sleep(1);}// 发送快速连续消息for (int i = 4; i <= 6; i++) {TestMessage msg = {.mtype = 2,.counter = i,.data = "Non-blocking test"};send_struct_message(msgid, &msg, sizeof(TestMessage));}// 等待接收者处理sleep(2);remove_message_queue(msgid);} else { // 子进程 - 接收者// 1. 阻塞接收演示printf("=== Blocking Receive Test ===\n");for (int i = 0; i < 3; i++) {TestMessage msg;if (receive_struct_message_blocking(msgid, &msg, sizeof(TestMessage), 1)) {printf("Blocking received: counter=%d, data=%s\n", msg.counter, msg.data);}}// 2. 非阻塞接收演示printf("\n=== Non-blocking Receive Test ===\n");int received = 0;while (received < 3) {TestMessage msg;if (receive_struct_message_nonblocking(msgid, &msg, sizeof(TestMessage), 2)) {printf("Non-blocking received: counter=%d, data=%s\n", msg.counter, msg.data);received++;} else {printf("No message available, doing other work...\n");sleep(1); // 模拟其他工作}}// 3. 超时接收演示printf("\n=== Timeout Receive Test ===\n");TestMessage msg;if (receive_struct_message_timeout(msgid, &msg, sizeof(TestMessage), 3, 2)) {printf("Received message within timeout\n");} else {printf("Timeout waiting for message (type 3)\n");}}return EXIT_SUCCESS;
}