在 Spring Boot 中使用 MQTT 可以通过集成 Eclipse Paho 或 HiveMQ 等客户端库实现。以下是完整的整合步骤,包括配置、发布和订阅消息的示例。

1. 添加 MQTT 依赖
在 pom.xml 中添加 Paho MQTT 客户端依赖:

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>


2. 配置 MQTT 连接
2.1 配置文件(application.yml)

mqtt:broker-url: tcp://127.0.0.1:8084client-id: service_001username: 用户名password: 密码default-topic: 话题


运行
2.2 MQTT 配置类
 

package io.renren.config;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true); // 是否持久化会话factory.setConnectionOptions(options);return factory;}
}

3. 发布消息(Publisher)
3.1 创建 MQTT 消息网关

package io.renren.config;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
}


3.2 配置出站通道
 

package io.renren.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;@Configuration
public class MqttOutboundConfig {@Value("${mqtt.default-topic}")private String defaultTopic;@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MqttPahoMessageHandler mqttOutbound(MqttPahoClientFactory factory) {MqttPahoMessageHandler handler = new MqttPahoMessageHandler("publisher-client", factory);handler.setAsync(true); // 异步发送handler.setDefaultTopic(defaultTopic); // 默认Topicreturn handler;}
}

3.3 使用示例(Controller 发布消息)
 

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MqttController {@Autowiredprivate MqttGateway mqttGateway;@PostMapping("/publish")public String publish(@RequestParam String message, @RequestParam(required = false) String topic) {mqttGateway.sendToMqtt(message, topic != null ? topic : "test/topic");return "Message published: " + message;}
}

4. 订阅消息(Subscriber)
4.1 配置入站通道
 

package io.renren.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.messaging.MessageChannel;import io.renren.modules.app.service.BmsDeviceInfoService;@Configuration
public class MqttInboundConfig {@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.default-topic}")private String defaultTopic;@Autowiredprivate BmsDeviceInfoService bmsDeviceInfoService;@Beanpublic MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory factory) {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, factory, defaultTopic);adapter.setErrorChannel(errorChannel()); // 配置错误通道return adapter;}@Beanpublic MessageChannel errorChannel() {return new PublishSubscribeChannel();}@Beanpublic IntegrationFlow mqttInboundFlow(MqttPahoMessageDrivenChannelAdapter adapter) {return IntegrationFlows.from(adapter).handle(message -> {String payload = message.getPayload().toString();String topic = message.getHeaders().get("mqtt_receivedTopic").toString();System.out.println("Received: Topic=" + topic + ", Payload=" + payload);bmsDeviceInfoService.handleMqtt(payload);}).get();}
}

5. 测试 MQTT 功能
5.1 测试发布消息

curl -X POST "http://localhost:8080/publish?message=HelloMQTT&topic=test/topic"


5.2 查看订阅日志
控制台会输出接收到的消息:

Received: Topic=test/topic, Payload=HelloMQTT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87090.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87090.shtml
英文地址,请注明出处:http://en.pswp.cn/web/87090.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 编程之备忘录模式

前言 有时候&#xff0c;我们真希望人生能有“CtrlZ”。在日常生活中&#xff0c;我们经常使用“撤销”功能&#xff0c;例如在写 Word、画图、写代码时一不小心操作失误&#xff0c;就希望能回到之前的状态。这种**“状态快照 恢复”**机制&#xff0c;在设计模式中就叫做&a…

yolov13+bytetrack的目标跟踪实现

目录 1. 介绍 2. 相关工作 (Related Works) 3. 方法 (Method) 4. 统计和结果 5. 技术实现 ByteTrack: Multi-Object Tracking by Associating Every Detection Box 1. Motivation 2. BYTE 3. ByteTrack 具体代码 UI界面设计 历史记录 完整代码实现UI界面 1. 介绍 …

GO类型转换与断言面试题及参考答案

Go 中类型转换与类型断言的区别是什么? 在Go语言里,类型转换和类型断言是两个不同的概念,它们在应用场景、语法格式以及底层实现上都存在明显差异。 类型转换主要用于将一种数据类型转变为另一种数据类型,一般适用于基本数据类型之间的转换,像整数与浮点数、字符串与字节…

【力扣 中等 C】79. 单词搜索

目录 题目 解法一&#xff1a;回溯 题目 解法一&#xff1a;回溯 void swap(char* a, char* b) {char tmp *a;*a *b;*b tmp; }void reverse(char* str) {int start 0, end strlen(str) - 1;while (start < end) {swap(&str[start], &str[end--]);} }bool se…

【数据标注师】分类标注

目录 一、 **分类标注的认知底层逻辑**1. **三大核心挑战2. **四维评估标准** 二、 **五阶成长体系**▶ **阶段1&#xff1a;分类体系深度内化&#xff08;2-4周&#xff09;**▶ **阶段2&#xff1a;标注决策流程固化**▶ **阶段3&#xff1a;场景化标注策略**▶ **阶段4&…

大数据时代UI前端的智能化转型策略:以用户为中心的设计思维

hello宝子们...我们是艾斯视觉擅长ui设计、前端开发、数字孪生、大数据、三维建模、三维动画10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩! 一、引言&#xff1a;大数据驱动的 UI 前端变革浪潮 在数字化体验竞争白热化的今天&#xff…

【python实用小脚本-122】Detect Gender Webcam:基于Python和Keras的实时性别检测工具

在计算机视觉和人工智能领域&#xff0c;实时性别检测是一个具有广泛应用前景的技术。从安防监控到智能广告&#xff0c;性别检测可以帮助系统更好地理解和响应用户需求。为了实现这一功能&#xff0c;我们开发了一个基于Python和Keras的实时性别检测工具——detect_gender_web…

Redis4

Redis除了缓存&#xff0c;还有哪些应用? Redis实现消息队列 **使用Pub/Sub模式&#xff1a;**Redis的Pub/Sub是一种基于发布/订阅的消息模式&#xff0c;任何客户端都可以订阅一个或多个频道&#xff0c;发布者可以向特定频道发送消息&#xff0c;所有订阅该频道的客户端都会…

LEFE-Net:一种轴承故障诊断的轻量化高效特征提取网络

一、研究背景与挑战 轴承作为旋转机械的核心部件&#xff0c;其健康状态直接影响设备运行的安全性和可靠性。传统的故障诊断方法&#xff08;如振动分析、油液检测&#xff09;依赖人工经验&#xff0c;效率低且易受主观因素影响。近年来&#xff0c;基于深度学习的数据驱动方…

springboot+Apache POI 写共导入导出

SpringBoot Apache POI 实现数据导入导出 功能特点&#xff1a; 智能列匹配&#xff1a; 支持精确列名匹配 支持忽略大小写的列名匹配 自动匹配字段名&#xff08;当未指定ExcelProperty时&#xff09; 强大的类型转换&#xff1a; 支持基本数据类型&#xff08;Integer/Lon…

Games101 Lecture3,Lecture4

旋转矩阵逻辑推导 齐次坐标&#xff0c;解决平移的特殊情况 引入一个维度&#xff08;无物理意义&#xff1f;&#xff09;&#xff0c;辅助表达平移&#xff0c;为零时&#xff0c;表示向量&#xff0c;不为零时&#xff0c;表示点&#xff08;/w&#xff09; 三维旋转矩阵 相…

折线图多数据处理

前言&#xff1a; skline1有年份和新申请单位数&#xff0c;skline2有年份和有效期内单位数&#xff0c;我想要把1和2的年份放在一起从小到大放&#xff0c;没有重复的&#xff0c;新申请单位数和有效期内单位数和年份的排列顺序一致 实现&#xff1a; // 获取原始数据 List…

documents4j导出pdf

一、前言 上一篇我们介绍了导出word&#xff0c;既然有了导出word&#xff0c;那么到处pdf也将会出现&#xff0c;导出word和pdf基本上是配套的需求&#xff0c;跑不了&#xff0c;那么本次我就简单介绍一下导出pdf。 二、代码实现 2.1、依赖引入 导出pdf是基于documents4j实现…

从零到一体验 Qwen-TTS:用四川话合成语音的全流程技术实录

今天很高兴看到Qwen-TTS开源。试一试四川方言&#xff08;大概是成都版&#xff09;效果如何。本人无法判断、有兴趣的伙伴可以帮忙听一听。 四川方言TTS "胖娃胖嘟嘟&#xff0c;骑马上成都&#xff0c;成都又好耍。胖娃骑白马&#xff0c;白马跳得高。胖娃耍关刀&…

php数据导出pdf文件

一.导出pdf文件&#xff0c;首先要安装相关的类库文件&#xff0c;我用的是dompdf类库。 1.安装类库文件&#xff1a; composer require dompdf/dompdf 2.引入类库文件到你的控制器中&#xff0c;创建方法&#xff1a; public function generatePdf(){//你需要打印的查询内容…

Beam2.61.0版本消费kafka重复问题排查

1.问题出现过程 在测试环境测试flink的job的任务消费kafka的情况&#xff0c;通过往job任务发送一条消息&#xff0c;然后flink web ui上消费出现了两条。然后通过重启JobManager和TaskManager后&#xff0c;任务从checkpoint恢复后就会出现重复消费。当任务不从checkpoint恢复…

关于 java:9. Java 网络编程

一、Socket 编程 Socket&#xff08;套接字&#xff09;是网络通信的端点&#xff0c;是对 TCP/IP 协议的编程抽象&#xff0c;用于实现两台主机间的数据交换。 通俗来说&#xff1a; 可以把 Socket 理解为“电话插口”&#xff0c;插上后客户端和服务端才能“通话”。 Sock…

主流零信任安全产品深度介绍

腾讯 iOA 零信任安全管理系统 功能&#xff1a;提供零信任接入、终端安全、数据防泄密等十余种功能模块。可实现基于身份的动态访问控制、终端安全一体化防护、数据防泄密体系等。核心优势&#xff1a;基于腾讯内部千万级终端实践打磨&#xff0c;沉淀丰富场景方案&#xff0c…

LabVIEW装配车体挠度无线测量

针对轨道交通车辆装配过程中车体挠度测量需求&#xff0c;基于LabVIEW开发无线快速测量系统&#xff0c;采用品牌硬件构建高精度数据采集与传输架构。系统通过 ZigBee 无线传输技术、高精度模数转换模块及激光位移传感器&#xff0c;实现装配车体挠度的实时、自动、非接触测量&…

java微服务-linux单机CPU接近100%优化

你这个场景&#xff1a; 4核16G 机器 同时运行了 8个 Spring Boot 微服务&#xff0c;每个 JAR 文件 100多 MB 导致 CPU 接近100% 确实是一个常见但资源紧绷的部署情境。下面是分层的优化建议&#xff0c;包括 JVM、系统、服务架构等多个方面&#xff0c;帮助你 降 CPU、稳…