一:前提

        安装了Emqx开源版、MQTTX客户端

二:订阅发布实现步骤

1.引入依赖 

<!--MQTT客户端-->
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</dependency>

2.编辑配置文件

mqtt:broker:uri: tcp://127.0.0.1:31883client:id: mqtt-am-client-${random.uuid}# 订阅主题配置(支持多个)inTopics:- topic: test/topic1qos: 0- topic: test/topic2qos: 1- topic: test/topic3qos: 2# 发布主题配置(支持多个)outTopics:- topic: out/topic1qos: 0username: ampassword: LGyPtuAB4th5pkeepAliveInterval: 60

3.读取配置文件

package com.wtzn.web.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.List;@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProperties {private Broker broker;private Client client;private List<TopicConfig> inTopics;private List<TopicConfig> outTopics;private String userName;private String password;private int KeepAliveInterval;@Datapublic static class Broker {private String uri;}@Datapublic static class Client {private String id;}@Datapublic static class TopicConfig {private String topic;private int qos;}}

4.创建Mqtt客户端

package com.wtzn.web.config;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConfig {@Autowiredprivate MqttProperties mqttProperties;@Beanpublic MqttClient mqttClient() throws MqttException {MqttClient client = new MqttClient(mqttProperties.getBroker().getUri(), mqttProperties.getClient().getId(), new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();// 此客户端的用户名和密码options.setUserName(mqttProperties.getUserName());options.setPassword(mqttProperties.getPassword().toCharArray());options.setCleanSession(true);// 设置遗嘱消息//  options.setWill(mqttProperties.getOutTopic(), "我是mqtt-am-client,我已下线,这是我的遗嘱".getBytes(), 2, true);// 连接超时重试options.setConnectionTimeout(5000); //毫秒options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());options.setAutomaticReconnect(true);//网络中断重连client.connect(options);return client;}
}

5.controller层

package com.wtzn.web.controller;import cn.dev33.satoken.annotation.SaIgnore;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.domain.bo.Payload;
import com.wtzn.web.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.LinkedList;@RestController
@Slf4j
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttService mqttService;@SaIgnore@PostMapping("/mqtt")public void publish() {try {//  LinkedList<Payload> payloadLinkedList=new LinkedList<>();for(int i=1; i<=10000; i++){Payload payload=new Payload();payload.setTemperature(i);//  payloadLinkedList.add(payload);mqttService.publish("test/topic1",0,JsonUtils.toJsonString(payload));}} catch (MqttException e) {log.error("发布消息失败{}", e.getMessage());}log.info("发布消息成功");}}

6.service层

package com.wtzn.web.service;import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.config.MqttProperties;
import com.wtzn.web.domain.bo.Payload;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Arrays;@Service
@Slf4j
public class MqttService implements MqttCallbackExtended {@Autowiredprivate MqttClient mqttClient;@Autowiredprivate MqttProperties mqttProperties;@PostConstructpublic void init() throws MqttException {mqttClient.setCallback(this);/*       mqttClient.subscribe(mqttProperties.getInTopic());log.info("订阅主题{}", mqttProperties.getInTopic());
*/mqttProperties.getInTopics().forEach(x -> {try {mqttClient.subscribe(x.getTopic(), x.getQos());log.info("订阅主题{}", x.getTopic());} catch (MqttException e) {throw new RuntimeException(e);}});}@PreDestroypublic void destroy() throws MqttException {mqttClient.disconnect();log.info("与服务器断开连接");}/*** @description: 发送消息* @param: [message]* @return: void**/public void publish(String topic,int qos,String message) throws MqttException {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(qos);mqttClient.publish(topic, mqttMessage);log.info("向主题【{}】发布消息:【{}】", topic, message);}/*** @description: 接收消息* @param: [topic, message]* @return: void**/@Overridepublic void messageArrived(String topic, MqttMessage message) throws MqttException {Payload payload = JsonUtils.parseObject(new String(message.getPayload()), Payload.class);log.info("接收到来自【{}】的消息【{}】", topic, payload.getTemperature());/*  if (payload.getTemperature() > 37) {publish("发烧");}*/}@Overridepublic void connectionLost(Throwable cause) {log.error("连接丢失:{}", cause.getMessage());}@SneakyThrows@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {if( token!=null ){MqttMessage message = null;try {message = token.getMessage();} catch (MqttException e) {throw new RuntimeException(e);}String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();String str = message==null ? null : new String(message.getPayload());log.info("deliveryComplete: topic={}, message={}", topic, str);} else {log.info("deliveryComplete: null");}log.info("消息已送达");}@Overridepublic void connectComplete(boolean b, String s) {mqttProperties.getInTopics().forEach(x -> {try {mqttClient.subscribe(x.getTopic(), x.getQos());log.info("订阅主题{}", x.getTopic());} catch (MqttException e) {throw new RuntimeException(e);}});}
}

7.dao层

package com.wtzn.web.domain.bo;import lombok.Data;@Data
public class Payload {private Integer temperature;
}

三:测试

1.PostMan直接调用测试

2、下载MQTTX客户端进行测试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913815.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913815.shtml
英文地址,请注明出处:http://en.pswp.cn/news/913815.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ReactNative【实战系列教程】我的小红书 7 -- 消息(含弹窗菜单,右上角角标,空白页等)

最终效果弹窗菜单 点击右上角群聊按钮后&#xff0c;弹窗菜单无消息代码实现app/(tabs)/message.tsx import icon_no_collection from "/assets/icons/icon_no_collection.webp"; import FloatMenu, {FloatMenuRef, } from "/modules/message/components/FloatM…

Jenkins详细教程 - 从入门到精通

目录 1. 什么是Jenkins 1.1 简单理解 1.2 技术定义 1.3 核心特点 2. 为什么需要Jenkins 2.1 传统开发的痛点 手工发布的问题 真实场景举例 2.2 Jenkins的解决方案 自动化CI/CD流程 3. 核心概念解析 3.1 Job(任务) Job示例 3.2 Build(构建) 3.3 Pipeline(流水…

bash 判断 /opt/wslibs-cuda11.8 是否为软连接, 如果是,获取连接目的目录并自动创建

以下是实现该功能的 Bash 脚本&#xff1a; bash #!/bin/bash LINK_PATH“/opt/wslibs-cuda11.8” 检查是否为软链接 if [ -L "KaTeX parse error: Expected EOF, got # at position 24: …H" ]; then#̲ 获取软链接的绝对目标路径…(readlink -f “$LINK_PATH”) # …

【性能测试】jmeter+Linux环境部署和分布式压测,一篇打通...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、linux获取动态…

Java 17 新特性笔记

Java 17 是一个 长期支持版本&#xff08;LTS&#xff09;&#xff0c;于 2021 年 9 月发布&#xff0c;是继 Java 11 之后的重要里程碑。它整合了 Java 12~16 的众多特性&#xff0c;并引入新的语言增强、JDK API 改进、性能优化和安全增强。 Java 17 版本信息 发布时间&…

WWDC 25 风云再起:SwiftUI 7 Charts 心法从 2D 到 3D 的华丽蜕变

概述 在 iOS 开发这个波谲云诡的江湖中&#xff0c;SwiftUI 可谓是一位后起之秀&#xff0c;以其简洁明快的招式迅速在 UI 框架领域中崭露头角。 而其中的 Charts 框架&#xff0c;更是如同江湖中的 “数据可视化宝典”那样&#xff0c;让各位秃头少侠们能够轻松将复杂的数据转…

Vue+Element Plus 中按回车刷新页面问题排查与解决

VueElement Plus 中按回车刷新页面问题排查与解决原因分析解决方案方法一&#xff1a;阻止默认行为 submit.prevent方法二&#xff1a;只监听回车并触发搜索最终推荐写法如下&#xff1a;在使用 Vue 3 Element Plus 开发后台系统时&#xff0c;我们常常会通过 搭配 实现搜索功…

x86汇编语言入门基础(三)汇编指令篇3 位移运算

位移运算指令&#xff1a;SHL逻辑移位&#xff0c;SAR算术移位&#xff0c; ROR循环右移 1. SHL 逻辑移位 Shift Left, SHL代表向左移位&#xff0c;SHR代表向右移位 指令格式&#xff1a;shl op1, op2 目的操作数 op1&#xff1a;寄存器/内存地址源操作数 op2&#xff1a;寄…

Java-69 深入浅出 RPC 单体架构 垂直架构 分布式架构 微服务架构

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; AI炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有…

Android 如何阻止应用自升级

问题背景 1.打开PlayStore,然后登陆账户 2.退出应用过几分钟后,应用会自动更新到新版本 3.再次打开应用,问题即可复现 一联网进入playStore应用并且登录谷歌账号,退出几分钟,在进入,发现应用版本号更新了,应用进行了自我升级,关键是升级之后谷歌商店就用不了了,就…

Docker-构建镜像并实现LNMP架构

一、搭建LNMP基础配置1、制作Nginx镜像制作dockerfilevim dockerfileFROM centos:7 RUN rm -rf /etc/yum.repos.d/* RUN curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo RUN yum clean all RUN yum makecache RUN yum -y install z…

Python之--基本知识

基本输出语法结构: print(输出内容)print()函数完整的语法格出: print (value,...,sep,end\n,fileNone)只有字符串可以用连接基本输入语法结构: xinput(提示文字’)注意事项: 无论输入的数据是什么 x 的数据类型都是字符串类型示例&#xff1a;name input("Enter your na…

VS CodeC51 单片机开发环境搭建

文章目录前言1.安装插件2.创建EIDE项目&#xff08;51单片机&#xff09;3.配置工具链&#xff08;第一次使用需要配置&#xff09;4.编译与下载5.项目文件简介与串口调试工具6.推荐插件7.打包模板与导出模板8.51单片机串口无法识别问题前言 需要安装keil c51版本需要配置好C/…

国密算法(SM2/SM3/SM4)

文章目录国密算法&#xff08;SM2/SM3/SM4&#xff09;详解&#xff1a;从性能对比到Java手机号安全处理实战一、 国密核心算法简介二、 性能深度对比三、 Java实战&#xff1a;手机号的安全处理方案一&#xff1a;使用SM3哈希存储&#xff08;推荐用于验证场景&#xff09;方案…

从前端转go开发的学习路线

从前端开发转向 Go&#xff08;Golang&#xff09;后端开发&#xff0c;是一个非常可行也很实用的方向&#xff0c;特别是在做 高性能微服务、分布式系统、云原生&#xff08;如Kubernetes&#xff09; 等方面。以下是一份适合你&#xff08;有多年开发经验的前端开发者&#x…

node或浏览器上传文件到阿里云OSS

阿里云配置 进入阿里云OSS Bucket 列表的某个 Bucket 仓库下&#xff0c;点击访问控制 RAM 创建用户 勾上 创建 AccessKey ID 和 AccessKey Secret 复制 AccessKey 信息 用文档保存 创建角色 选择云账号 复制 ARN 用文档保存&#xff0c;然后 新增权限 搜索 oss 选择 AliyunOSS…

26考研物理复试面试常见问答问题汇总,物理专业保研推免夏令营面试问题汇总,物理本科知识专业面试最全攻略!

还在为物理考研复试面试发愁&#xff1f;还在为物理招聘的专业面试抓狂&#xff1f;还在为即将到来的物理夏令营面试不知从何下手、翻遍了厚厚的教材却抓不住重点&#xff1f;别慌&#xff0c;接下来我会从「考研的物理复试经历」「物理面试攻略」「物理面试基础问答题汇总很全…

(5)机器学习小白入门 YOLOv:数据需求与图像不足应对策略

(1)机器学习小白入门YOLOv &#xff1a;从概念到实践 (2)机器学习小白入门 YOLOv&#xff1a;从模块优化到工程部署 (3)机器学习小白入门 YOLOv&#xff1a; 解锁图片分类新技能 (4)机器学习小白入门YOLOv &#xff1a;图片标注实操手册 (5)机器学习小白入门 YOLOv&#xff1a;…

百年制造名企,三菱重工引领“智”造新范式

日前&#xff0c;由深圳软件协会指导、法大大和信息侠联合出品的《制造行业合同数智化升级白皮书》&#xff08;以下简称“白皮书”&#xff09;正式发布&#xff0c;并首次提出 “电子签法律AI” 双轮驱动模型。在制造行业面临供应链协同、合规风控及全球化出海等多重挑战的当…

【学习笔记】计算机操作系统(七)—— 文件管理

第七章 文件管理 文章目录第七章 文件管理7.1 文件和文件系统7.1.1 数据项、记录和文件7.1.2 文件名和类型7.1.3 文件系统的层次结构7.1.4 文件操作7.2 文件的逻辑结构7.2.1 文件逻辑结构的类型7.2.2 顺序文件(Sequential File)7.2.3 记录寻址7.2.4 索引文件(Index File)7.2.5 …