配置 EMQX 存储消息到 MySQL

EMQX 可以通过规则引擎和数据桥接功能将消息和事件存储到 MySQL 数据库。以下是具体实现方法:

创建 MySQL 数据表

在 MySQL 中创建用于存储消息的表结构:

CREATE TABLE `mqtt_messages` (`id` int(11) NOT NULL AUTO_INCREMENT,`client_id` varchar(100) DEFAULT NULL,`topic` varchar(100) DEFAULT NULL,`payload` text,`qos` tinyint(1) DEFAULT NULL,`retain` tinyint(1) DEFAULT NULL,`arrived` timestamp NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

配置 EMQX 规则引擎

在 EMQX Dashboard 中配置规则引擎: 登录 EMQX Dashboard,进入"规则引擎" -> "规则"页面

创建新规则,输入以下 SQL 语句作为规则条件:

SELECT clientid as client_id,topic as topic,payload as payload,qos as qos,retain as retain,timestamp as arrived
FROM "#"

配置 MySQL 数据桥接

在规则动作中添加 MySQL 数据桥接: 在规则动作中选择"添加动作",选择"数据桥接" -> "MySQL"

配置 MySQL 连接参数:

{"server": "mysql_server_ip:3306","database": "emqx_data","pool_size": 8,"username": "root","password": "password","auto_reconnect": true
}

设置 SQL 模板:

INSERT INTO mqtt_messages(client_id, topic, payload, qos, retain, arrived)
VALUES(${client_id},${topic},${payload},${qos},${retain},FROM_UNIXTIME(${arrived}/1000)
)

验证数据存储

发送测试消息到任何主题,检查 MySQL 数据库中的 mqtt_messages 表是否记录新消息

存储客户端事件

创建事件存储表:

CREATE TABLE `mqtt_events` (`id` int(11) NOT NULL AUTO_INCREMENT,`client_id` varchar(100) DEFAULT NULL,`event_type` varchar(50) DEFAULT NULL,`event_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,`details` text,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

配置事件规则:

SELECT clientid as client_id,event as event_type,created_at as event_time,json_encode(metadata) as details
FROM "$events/client_connected"

配置事件存储动作:

INSERT INTO mqtt_events(client_id, event_type, event_time, details)
VALUES(${client_id},${event_type},FROM_UNIXTIME(${event_time}/1000),${details}
)

性能优化建议

增加索引以提高查询性能:

ALTER TABLE `mqtt_messages` ADD INDEX `idx_topic` (`topic`);
ALTER TABLE `mqtt_messages` ADD INDEX `idx_client_id` (`client_id`);
ALTER TABLE `mqtt_messages` ADD INDEX `idx_arrived` (`arrived`);

定期归档旧数据:

CREATE EVENT archive_mqtt_data
ON SCHEDULE EVERY 1 DAY
DO
BEGININSERT INTO mqtt_messages_archiveSELECT * FROM mqtt_messages WHERE arrived < DATE_SUB(NOW(), INTERVAL 30 DAY);DELETE FROM mqtt_messages WHERE arrived < DATE_SUB(NOW(), INTERVAL 30 DAY);
END

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/909813.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/909813.shtml
英文地址,请注明出处:http://en.pswp.cn/news/909813.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot项目,利用docker打包部署

Windows WSL2 Docker Desktop 部署 SpringBoot 项目指南 &#xff08;没有安装docker的&#xff0c;如果是windows家庭中文版的&#xff0c;可以看我上一篇帖子&#xff1a;windows家庭版安装docker和redis-CSDN博客&#xff09; 本教程将说明如何在 Windows 系统 下&#…

MO+内核32位普冉单片机PY32F003开发板

PY32F003开发板是基于普冉半导体PY32F003微控制器设计的低成本入门级开发工具&#xff0c; PY32F003系列微控制器采用高性能的 32 位ARM Cortex-M0内核&#xff0c;宽电压工作范围的 MCU。嵌入高达32Kbytes flash 和 4Kbytes SRAM 存储器&#xff0c;最高工作频率 32MHz。PY32…

MySql 用存储过程删除所有用户表

用拼接语句总是会出问题 -- 1. 禁用外键约束&#xff08;防止级联删除失败&#xff09;[1]() SET SESSION FOREIGN_KEY_CHECKS 0; -- 2. 生成并执行删除语句&#xff08;替换 your_database_name&#xff09; SELECT CONCAT(DROP TABLE IF EXISTS , TABLE_NAME, ;) -- 预览语…

Java八股文——MySQL「锁篇」

讲一下MySQL里有哪些锁&#xff1f; 面试官您好&#xff0c;MySQL中的锁机制非常丰富&#xff0c;它是保证数据一致性和并发安全的核心。我通常会从锁的粒度&#xff08;加锁范围&#xff09; 和锁的模式&#xff08;功能&#xff09; 这两个维度来理解它们。 第一维度&#…

B站精品课程

【Python并发编程】线程&#xff0c;进程&#xff0c;协程&#xff0c;线程安全&#xff0c;多线程&#xff0c;死锁&#xff0c;线程池等与案例解析&#xff0c;从入门到精通 https://www.bilibili.com/video/BV1EfdcYmEff/?spm_id_from333.337.search-card.all.click&v…

# ubuntu中安装使用五笔输入法

先 清理旧输入法并重新安装 fcitx5 五笔输入法&#x1f447; &#x1f4e6; 第一步&#xff1a;清理旧的 Fcitx5 及相关输入法组件 sudo apt purge fcitx* mozc* ibus* -y sudo apt autoremove --purge -y&#x1f4dd; 说明&#xff1a; fcitx* 会清除旧版本 Fcitx/Fcitx5。…

LSM树与B+树优缺点分析

1. LSM树优化了顺序写&#xff0c;因此写性能很好&#xff0c;但在查询上&#xff1a; 需要从Level 0到Level n一直顺序查下去。极端情况是LSM树中不存在该数据&#xff0c;则需要遍历L0->Ln&#xff0c;最后返回空集。 解决方法是用布隆过滤器优化查询。 2. B树范围查询性…

【成都鱼住未来身份证】 身份证读取与解析———未来之窗行业应用跨平台架构——智能编程——仙盟创梦IDE

读取身份证开发配置 function readerid鱼住未来科技(){const webUrl http:// "127.0.0.1:30004" /api/info$.ajax({url: webUrl,type: GET,dataType: json,success: function (result) {// processContent.text(web api接口&#xff1a; webUrl 读取身份证信息…

开启并连接redis以及端口占用问题

开启命令行 redis-server.exe redis.windows.conf 端口占用问题 查看端口&#xff1a; 输入命令行&#xff1a; netstat -ano | findstr :6379 找到并停止pid号&#xff1a; 这个要用到cmd管理员身份打开 taskkill /f /pid 你的端口号 重新开启就行了 再用另一个cmd进行连…

MCP数据可视化服务器配置依赖

# requirements.txt # MCP数据可视化服务器依赖包# 核心MCP包 mcp>=0.1.0# 数据处理 pandas>=2.0.0 numpy>=1.24.0# 可视化 matplotlib>=3.7.0 seaborn>=0.12.0# 异步支持 asyncio-mqtt>=0.13.0# JSON处理 jsonschema>=4.17.0# 图像处理 Pillow>=9.5.0…

量化面试绿皮书:14. 钟表零件

文中内容仅限技术学习与代码实践参考&#xff0c;市场存在不确定性&#xff0c;技术分析需谨慎验证&#xff0c;不构成任何投资建议。 14. 钟表零件 一个时钟(顺时针编号为 1-12)从墙上掉了下来&#xff0c;摔成三块你发现每块上的数字之和是相等的。 Q: 每块上的数字是多少&…

AndroidR平台ToastPresenter引出BinderProxy泄漏

一、问题描述 针对SA8155车机系统Qnx+Android,自动化测试模拟发送CAN信号测试,压测报出多例BinderProxy leak引起system_server重启 问题1 [CRASH] system_server osVersion: V1.2.***,提交时间:2025-06-05 ***,复现了2次java.lang.AssertionError: Binder ProxyMap has…

windows11中切换到其他桌面再切回当前桌面,任务栏的WPS有好几个窗口

感谢知乎的网友 原文链接 新建一个后缀为 .reg 的文本文件&#xff0c;将以下内容粘贴进去&#xff0c;保存&#xff0c;然后双击运行 Windows Registry Editor Version 5.00[HKEY_CURRENT_USER\Software\kingsoft\Office\6.0\wpsoffice\Application Settings]"enable_m…

从汇编的角度揭开C++ this指针的神秘面纱(下)

<接上篇> 我们接着来看一段C的代码&#xff1a; class A { public:int func(int j){return j _i;} private:int _i; };int main() {A a;a.func(3);return 0; } 这里定义了一个类A&#xff0c;在main函数中定义了A类的对像a。同时用a调用了成员函数func。我们来看一…

Kafka代码模板

Kafka 服务器&#xff08;Broker&#xff09; 的配置 server.properties # broker.id: 每个 Kafka Broker 的唯一标识符。broker.id 必须在整个 Kafka 集群中唯一。 broker.id0# 配置 Kafka Broker 监听客户端请求的地址和端口。这个配置决定了 Kafka 服务将接受来自生产者、…

最大子数组和C++

给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组是数组中的一个连续部分。 示例 1&#xff1a; 输入&#xff1a;nums [-2,1,-3,4,-1,2,1,-5,4] 输出&#xff1a;…

centos 7单机安装ceph并创建rbd块设备

1. 安装依赖包 新增阿里云源ceph下载地址 vim /etc/yum.repos.d/ceph.repo [ceph] nameceph baseurlhttp://mirrors.aliyun.com/ceph/rpm-jewel/el7/x86_64/ gpgcheck0 [ceph-noarch] namecephnoarch baseurlhttp://mirrors.aliyun.com/ceph/rpm-jewel/el7/noarch/ gpgcheck…

Jenkins搭建、权限管理、参数化、流水线等详细教程!

部署Jenkins 一、jenkins 安装 官网&#xff1a; https://jenkins.io yum 安装 jenkins *jenkins 依赖 java 环境 #注意2.346之后的版本不再支持jdk8 卸载旧jenkins #查询以前是否安装jenkins rpm -qa |grep jenkins #卸载 jenkins yum -y remove jenkins rpm -e jenkins…

百度飞桨(PaddlePaddle)案例分享:基于 PaddleOCR 的图像文字提取系统

一、案例背景 在实际教学、办公及政务系统中&#xff0c;纸质材料&#xff08;如手写作文、表格、试卷等&#xff09;仍广泛存在。为提升信息处理效率&#xff0c;采用 OCR&#xff08;Optical Character Recognition&#xff09;技术将图像中的文字提取为可编辑文本已成为刚需…

python操控鼠标

在已知屏幕坐标的情况下&#xff0c;可以通过 Python 的 pyautogui 或 pynput 等库实现网页上的鼠标点击操作。以下是具体步骤和代码示例&#xff1a; 1. 使用 PyAutoGUI&#xff08;推荐&#xff09; pyautogui 是一个简单易用的库&#xff0c;可以直接通过屏幕坐标控制鼠标点…