配置 EMQX 存储消息到 MySQL
EMQX 可以通过规则引擎和数据桥接功能将消息和事件存储到 MySQL 数据库。以下是具体实现方法:
创建 MySQL 数据表
在 MySQL 中创建用于存储消息的表结构:
CREATE TABLE `mqtt_messages` (`id` int(11) NOT NULL AUTO_INCREMENT,`client_id` varchar(100) DEFAULT NULL,`topic` varchar(100) DEFAULT NULL,`payload` text,`qos` tinyint(1) DEFAULT NULL,`retain` tinyint(1) DEFAULT NULL,`arrived` timestamp NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
配置 EMQX 规则引擎
在 EMQX Dashboard 中配置规则引擎: 登录 EMQX Dashboard,进入"规则引擎" -> "规则"页面
创建新规则,输入以下 SQL 语句作为规则条件:
SELECT clientid as client_id,topic as topic,payload as payload,qos as qos,retain as retain,timestamp as arrived
FROM "#"
配置 MySQL 数据桥接
在规则动作中添加 MySQL 数据桥接: 在规则动作中选择"添加动作",选择"数据桥接" -> "MySQL"
配置 MySQL 连接参数:
{"server": "mysql_server_ip:3306","database": "emqx_data","pool_size": 8,"username": "root","password": "password","auto_reconnect": true
}
设置 SQL 模板:
INSERT INTO mqtt_messages(client_id, topic, payload, qos, retain, arrived)
VALUES(${client_id},${topic},${payload},${qos},${retain},FROM_UNIXTIME(${arrived}/1000)
)
验证数据存储
发送测试消息到任何主题,检查 MySQL 数据库中的 mqtt_messages 表是否记录新消息
存储客户端事件
创建事件存储表:
CREATE TABLE `mqtt_events` (`id` int(11) NOT NULL AUTO_INCREMENT,`client_id` varchar(100) DEFAULT NULL,`event_type` varchar(50) DEFAULT NULL,`event_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,`details` text,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
配置事件规则:
SELECT clientid as client_id,event as event_type,created_at as event_time,json_encode(metadata) as details
FROM "$events/client_connected"
配置事件存储动作:
INSERT INTO mqtt_events(client_id, event_type, event_time, details)
VALUES(${client_id},${event_type},FROM_UNIXTIME(${event_time}/1000),${details}
)
性能优化建议
增加索引以提高查询性能:
ALTER TABLE `mqtt_messages` ADD INDEX `idx_topic` (`topic`);
ALTER TABLE `mqtt_messages` ADD INDEX `idx_client_id` (`client_id`);
ALTER TABLE `mqtt_messages` ADD INDEX `idx_arrived` (`arrived`);
定期归档旧数据:
CREATE EVENT archive_mqtt_data
ON SCHEDULE EVERY 1 DAY
DO
BEGININSERT INTO mqtt_messages_archiveSELECT * FROM mqtt_messages WHERE arrived < DATE_SUB(NOW(), INTERVAL 30 DAY);DELETE FROM mqtt_messages WHERE arrived < DATE_SUB(NOW(), INTERVAL 30 DAY);
END