Websocket客户端

pom依赖

		<dependency><groupId>org.java-websocket</groupId><artifactId>Java-WebSocket</artifactId><version>1.4.0</version></dependency>

客户端代码片段


@Component
@Slf4j
public class PositionAlarmListener {@Autowiredprivate BigScreenService bigScreenService;@Autowiredprivate ConfigurationSystemService configurationSystemService;@Beanpublic WebSocketClient webSocketClient() {WebSocketClient wsc = null;ThirdPartConfDto thirdPartConfDto = configurationSystemService.getConfig();Map<String, String> httpHeaders = new HashMap<>();try {
//            String reqUrl = String.format("ws://%s%s?apikey=%s", servicePort, SOCKET_URL, apikey);String reqUrl = thirdPartConfDto.getAlarmWebsocketUrl();log.info("websocketclient.position.reqUrl:{}",reqUrl);wsc = new WebSocketClient(new URI(reqUrl), httpHeaders) {@Overridepublic void onOpen(ServerHandshake serverHandshake) {log.info("UnmannedPlane==connect==build");}@Overridepublic void onMessage(String message) {log.info("websocketclient.position.receive.message:{}", message);CompletableFuture.runAsync(() -> {try {if (StringUtils.isEmpty(message)) {return;}
//                            JSONObject parse = JSONObject.parseObject(message);ThirdPositionAlarmDto thirdPositionAlarmDto = JSONObject.parseObject(message,ThirdPositionAlarmDto.class);String type = thirdPositionAlarmDto.getType();log.info("websocketclient.position.receive.message-type:{}", type);if (StringUtils.isEmpty(type)) {log.error("websocket.type.is null");return;}if(!type.equals(ThirdPositionAlarmEnum.TYPE_TAG.getCode())){log.error("websocket.type.is not tag");return;}boolean bigScreenPush = bigScreenService.pusdata(thirdPositionAlarmDto,thirdPartConfDto);} catch (Exception e) {log.error("websocketclient.position.error:", e);}});}@Overridepublic void onClose(int i, String s, boolean b) {log.info("websocketclient.position.close code:{} reason:{} {}", i, s, b);}@Overridepublic void onError(Exception e) {log.info("websocketclient.position.connect==error:", e);}};wsc.connect();return wsc;} catch (Exception e) {log.error("websocketclient.position==webSocketClient:", e);}return wsc;}}

客户端代码片段(新增心跳检测、重连)

客户端代码片段–配置新增

# websocketclient config
#websocket.client.config.wsUrl=ws://10.212.188.45:8880/position
websocket.client.config.wsUrl=ws://10.81.12.100:8090/websocket
websocket.client.config.enableHeartbeat=true
websocket.client.config.heartbeatInterval=20000
websocket.client.config.enableReconnection=true
@Configuration
@ConfigurationProperties(prefix="websocket.client.config")
public class WebsocketClientConfiguration {/*** websocket server ws://ip:port*/private String wsUrl;/*** 是否启用心跳监测 默认开启*/private Boolean enableHeartbeat;/*** 心跳监测间隔 默认20000毫秒*/private Integer heartbeatInterval;/*** 是否启用重连接 默认启用*/private Boolean enableReconnection;public String getWsUrl() {return wsUrl;}public void setWsUrl(String wsUrl) {this.wsUrl = wsUrl;}public Boolean getEnableHeartbeat() {return enableHeartbeat;}public void setEnableHeartbeat(Boolean enableHeartbeat) {this.enableHeartbeat = enableHeartbeat;}public Integer getHeartbeatInterval() {return heartbeatInterval;}public void setHeartbeatInterval(Integer heartbeatInterval) {this.heartbeatInterval = heartbeatInterval;}public Boolean getEnableReconnection() {return enableReconnection;}public void setEnableReconnection(Boolean enableReconnection) {this.enableReconnection = enableReconnection;}
}

客户端代码片段–客户端创建

@Slf4j
@Configuration
public class WebsocketClientBeanConfig {/*** 系统配置实现类*/@Autowiredprivate ConfigurationSystemService configurationSystemService;@Beanpublic WebsocketRunClient websocketRunClient(WebsocketClientConfiguration websocketClientConfiguration){String wsUrl = websocketClientConfiguration.getWsUrl();Boolean enableHeartbeat = websocketClientConfiguration.getEnableHeartbeat();Integer heartbeatInterval = websocketClientConfiguration.getHeartbeatInterval();Boolean enableReconnection = websocketClientConfiguration.getEnableReconnection();try {WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(wsUrl));websocketRunClient.connect();websocketRunClient.setConnectionLostTimeout(0);new Thread(()->{while (true){try {Thread.sleep(heartbeatInterval);if(enableHeartbeat){websocketRunClient.send("[websocketclient] 心跳检测");log.info("[websocketclient] 心跳检测");}} catch (Exception e) {log.error("[websocketclient] 发生异常{}",e.getMessage());try {if(enableReconnection){log.info("[websocketclient] 重新连接");websocketRunClient.reconnect();websocketRunClient.setConnectionLostTimeout(0);}}catch (Exception ex){log.error("[websocketclient] 重连异常,{}",ex.getMessage());}}}}).start();return websocketRunClient;} catch (URISyntaxException ex) {log.error("[websocketclient] 连接异常,{}",ex.getMessage());}return null;}}

客户端代码片段–客户端心跳检测

@Slf4j
@Component
public class WebsocketRunClient extends WebSocketClient {/*** 大屏推送地址*/@Value("${thirdpart.bigscreen.positionhttpurl}")private String httpUrl;/*** 位置检测距离*/@Value("${thirdpart.positionrange:100}")private Double positionrange;/*** 大屏接口推送实现*/@Autowiredprivate BigScreenService bigScreenService;public WebsocketRunClient(URI serverUri) {super(serverUri);}@Overridepublic void onOpen(ServerHandshake serverHandshake) {log.info("[websocketclient] Websocket客户端连接成功");}@Overridepublic void onMessage(String message) {log.info("[websocketclient.receive] 收到消息:{}",message);
//        ThirdPartConfDto thirdPartConfDto = configurationSystemService.getConfig();ThirdPartConfDto thirdPartConfDto = ThirdPartConfDto.builder().bigScreenHttpUrl(httpUrl).positionRange(positionrange).build();CompletableFuture.runAsync(() -> {try {if (StringUtils.isEmpty(message.trim())) {return;}if(message.contains("心跳检测")){return;}List<ThirdPositionAlarmDto> thirdPositionAlarmDtoList = JSONObject.parseArray(message,ThirdPositionAlarmDto.class);for(ThirdPositionAlarmDto thirdPositionAlarmDto: thirdPositionAlarmDtoList){String type = thirdPositionAlarmDto.getType();log.info("websocketclient.position.receive.message-type:{}", type);if (StringUtils.isEmpty(type)) {log.error("websocket.type.is null");return;}if(!type.equals(ThirdPositionAlarmEnum.TYPE_TAG.getCode())){log.error("websocket.type.is not tag");return;}boolean bigScreenPush = bigScreenService.pusdata(thirdPositionAlarmDto,thirdPartConfDto);}} catch (Exception e) {log.error("websocketclient.position.error:", e);}});}@Overridepublic void onClose(int code, String reason, boolean remote) {log.info("[websocketclient] Websocket客户端关闭");System.out.println("Connection closed by " + (remote ? "remote peer" : "us") + " Code: " + code + " Reason: " + reason);}@Overridepublic void onError(Exception e) {log.info("[websocketclient] Websocket客户端出现异常, 异常原因为:{}",e.getMessage());}

Websocket 服务端

服务端pom依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

服务端代码片段,websocket-配置

websocket-服务配置
@Configuration
public class WebSocketConfig {/*** ServerEndpointExporter注入* 该Bean会自动注册使用@ServerEndpoint注解申明的WebSocket endpoint** @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

服务端代码片段,websocket-服务端广播消息

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.CopyOnWriteArraySet;/*** @author 谪居马道* @describe:websocket,消息广播* @date 2025/5/21*/
@Component
@ServerEndpoint("/websocket")
public class WebSocket {private final Logger log = LoggerFactory.getLogger(WebSocket.class);//与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();/**         * 连接建立成功调用的方法         */@OnOpenpublic void onOpen(Session session){this.session=session;webSocketSet.add(this);//加入set中log.info("【WebSocket消息】有新的连接,总数:{}",webSocketSet.size());}/**         * 连接关闭调用的方法         */@OnClosepublic void onClose(){webSocketSet.remove(this);//从set中删除log.info("【WebSocket消息】连接断开,总数:{}",webSocketSet.size());        }/**         * 收到客户端消息后调用的方法         * @param message 客户端发送过来的消息         */@OnMessagepublic void onMessage(String message ){log.info("【WebSocket消息】收到客户端发来的消息:{}",message);sendMessage(message);}public void sendMessage(String message){for (WebSocket webSocket:webSocketSet) {log.info("【webSocket消息】广播消息,message={}",message);try {webSocket.session.getBasicRemote ().sendText(message);} catch (Exception e) {e.printStackTrace ();}            }}}

服务端代码片段,websocket-服务端一对一消息

@Component
@ServerEndpoint("/websocket/{terminalId}")
public class WebSocketService {private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);/*** 保存连接信息*/private static final Map<String, Session> CLIENTS = new ConcurrentHashMap<>();private static final Map<String, AtomicInteger> TERMINAL_IDS = new HashMap<>();/*** 需要注入的Service声明为静态,让其属于类*/private static TerminalService terminalService;/*** 注入的时候,给类的Service注入*/@Autowiredpublic void setMchDeviceInfoService(TerminalService terminalService) {WebSocketService.terminalService = terminalService;}@OnOpenpublic void onOpen(@PathParam("terminalId") String terminalId, Session session) throws Exception {logger.info(session.getRequestURI().getPath() + ",打开连接开始:" + session.getId());//当前连接已存在,关闭if (CLIENTS.containsKey(terminalId)) {onClose(CLIENTS.get(terminalId));}TERMINAL_IDS.put(terminalId, new AtomicInteger(0));CLIENTS.put(terminalId, session);logger.info(session.getRequestURI().getPath() + ",打开连接完成:" + session.getId());terminalService.terminal();}@OnClosepublic void onClose(@PathParam("terminalId") String terminalId, Session session) throws Exception {logger.info(session.getRequestURI().getPath() + ",关闭连接开始:" + session.getId());CLIENTS.remove(terminalId);TERMINAL_IDS.remove(terminalId);logger.info(session.getRequestURI().getPath() + ",关闭连接完成:" + session.getId());}@OnMessagepublic void onMessage(String message, Session session) {logger.info("前台发送消息:" + message);if ("心跳".equals(message)) {//重置当前终端心跳次数TERMINAL_IDS.get(message).set(0);return;}sendMessage("收到消息:" + message, session);}@OnErrorpublic void onError(Session session, Throwable error) {logger.error(error.toString());}public void onClose(Session session) {//判断当前连接是否在线
//        if (!session.isOpen()) {
//            return;
//        }try {session.close();} catch (IOException e) {logger.error("金斗云关闭连接异常:" + e);}}public void heartbeat() {//检查所有终端心跳次数for (String key : TERMINAL_IDS.keySet()) {//心跳3次及以上的主动断开if ((TERMINAL_IDS.get(key).intValue() >= 3)) {logger.info("心跳超时,关闭连接:" + key);onClose(CLIENTS.get(key));}}for (String key : CLIENTS.keySet()) {//记录当前终端心跳次数TERMINAL_IDS.get(key).incrementAndGet();sendMessage("心跳", CLIENTS.get(key));}}public void sendMessage(String message, Session session) {try {session.getAsyncRemote().sendText(message);logger.info("推送成功:" + message);} catch (Exception e) {logger.error("推送异常:" + e);}}public boolean sendMessage(String terminalId, String message) {try {Session session = CLIENTS.get(terminalId);session.getAsyncRemote().sendText(message);logger.info("推送成功:" + message);return true;} catch (Exception e) {logger.error("推送异常:" + e);return false;}}
}

Websocket测试工具

postman-测试

参考:
site1: https://maimai.cn/article/detail?fid=1747304025&efid=p7JdUMG2Gi0PrMX7xSXpXw
site2: https://blog.csdn.net/weixin_46768610/article/details/128711019

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/82444.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/82444.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/82444.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java Collection(集合) 接口

Date: 2025-05-21 20:21:32 author: lijianzhan Java 集合框架提供了一组接口和类&#xff0c;以实现各种数据结构和算法。 以下是关于 Java 集合的核心内容说明&#xff1a; /*** Java Collection Framework 说明&#xff1a;** 在 Java 中&#xff0c;集合&#xff08;Collec…

让MySQL更快:EXPLAIN语句详尽解析

前言 在数据库性能调优中&#xff0c;SQL 查询的执行效率是影响系统整体性能的关键因素之一。MySQL 提供了强大的工具——EXPLAIN 语句&#xff0c;帮助开发者和数据库管理员深入分析查询的执行计划&#xff0c;从而发现潜在的性能瓶颈并进行针对性优化。 EXPLAIN 语句能够模…

Java基础 Day20

一、HashSet 集合类 1、简介 HashSet 集合底层采取哈希表存储数据 底层是HashMap 不能使存取有序 JDK8之前的哈希表是数组和链表&#xff0c;头插法 JDK8之后的哈希表是数组、链表和红黑树&#xff0c;尾插法 2、存储元素 &#xff08;1&#xff09;如果要保证元素的唯…

2505C++,32位转64位

原文 假设有个想要将一个32位值传递给一个带64位值的函数的函数.你不关心高32位的内容,因为该值是传递给回调函数的直通值,回调函数会把它截断为32位值. 因此,你都担心编译器一般生成的将32位值扩展到64位值的那条指令的性能影响. 我怀疑这条指令不是程序中的性能瓶颈. 我想出…

光伏电站及时巡检:守护清洁能源的“生命线”

在“双碳”目标驱动下&#xff0c;光伏电站作为清洁能源的主力军&#xff0c;正以年均20%以上的装机增速重塑全球能源格局。然而&#xff0c;这些遍布荒漠、屋顶的“光伏矩阵”并非一劳永逸的能源提款机&#xff0c;其稳定运行高度依赖精细化的巡检维护。山东枣庄触电事故、衢州…

C++初阶-list的使用2

目录 1.std::list::splice的使用 2.std::list::remove和std::list::remove_if的使用 2.1remove_if函数的简单介绍 基本用法 函数原型 使用函数对象作为谓词 使用普通函数作为谓词 注意事项 复杂对象示例 2.2remove与remove_if的简单使用 3.std::list::unique的使用 …

OpenHarmony平台驱动使用(一),ADC

OpenHarmony平台驱动使用&#xff08;一&#xff09; ADC 概述 功能简介 ADC&#xff08;Analog to Digital Converter&#xff09;&#xff0c;即模拟-数字转换器&#xff0c;可将模拟信号转换成对应的数字信号&#xff0c;便于存储与计算等操作。除电源线和地线之外&#…

CSS【详解】弹性布局 flex

适用场景 一维&#xff08;行或列&#xff09;布局 基本概念 包裹所有被布局元素的父元素为容器 所有被布局的元素为项目 项目的排列方向&#xff08;垂直/水平&#xff09;为主轴 与主轴垂直的方向交交叉轴 容器上启用 flex 布局 将容器的 display 样式设置为 flex 或 i…

基于MATLAB实现传统谱减法以及两种改进的谱减法(增益函数谱减法、多带谱减法)的语音增强

基于MATLAB实现传统谱减法以及两种改进的谱减法&#xff08;增益函数谱减法、多带谱减法&#xff09;的语音增强代码示例&#xff1a; 传统谱减法 function enhanced traditional_spectral_subtraction(noisy, fs, wlen, inc, NIS, a, b)% 参数说明&#xff1a;% noisy - 带…

symbol【ES6】

你一闭眼世界就黑了&#xff0c;你不是主角是什么&#xff1f; 目录 什么是Symbol&#xff1f;‌Symbol特点‌&#xff1a;创建方法&#xff1a;注意点&#xff1a;不能进行运算&#xff1a;显示调用toString() --没有意义隐式转换boolean 如果属性名冲突了怎么办&#xff1f;o…

LeetCode 649. Dota2 参议院 java题解

https://leetcode.cn/problems/dota2-senate/description/ 贪心。不会写。 class Solution {public String predictPartyVictory(String senate) {boolean rtrue,dtrue;int flag0;//flag>0,d前面有r;flag<0,r前面有dchar[] senatessenate.toCharArray();//每一轮while(r…

机器学习第二十二讲:感知机 → 模仿大脑神经元的开关系统

机器学习第二十二讲&#xff1a;感知机 → 模仿大脑神经元的开关系统 资料取自《零基础学机器学习》。 查看总目录&#xff1a;学习大纲 关于DeepSeek本地部署指南可以看下我之前写的文章&#xff1a;DeepSeek R1本地与线上满血版部署&#xff1a;超详细手把手指南 感知机详解…

maven快速上手

之前我们项目如果要用到其他额外的jar包&#xff0c;需要自己去官网下载并且导入。但是有maven后&#xff0c;直接在maven的pom.xml文件里用代码配置即可&#xff0c;配置好后maven会自动帮我们联网下载并且会自动导入该jar包 在右边的maven中&#xff0c;我们可以看到下载安装…

科学养生指南:解锁健康生活密码

健康是人生最宝贵的财富&#xff0c;在快节奏的现代生活中&#xff0c;科学养生成为保持良好状态的关键。遵循现代医学与营养学的研究成果&#xff0c;无需依赖传统中医理论&#xff0c;我们也能找到适合自己的养生之道。​ 均衡饮食是健康的基石。现代营养学强调 “食物多样&…

Qt状态机QStateMachine

QStateMachine QState 提供了一种强大且灵活的方式来表示状态机中的状态&#xff0c;通过与状态机类(QStateMachine)和转换类(QSignalTransition&#xff0c; QEventTransition)结合&#xff0c;可以实现复杂的状态逻辑和用户交互。合理使用嵌套状态机、信号转换、动作与动画、…

C++八股 —— 原子操作

文章目录 1. 什么是原子操作2. 原子操作的特点3. 原子操作的底层原理4. 内存序内存屏障 5. 原子操作和互斥锁的对比6. 常用的原子操作7. 相关问题讨论 参考&#xff1a; C atomic 原子操作_c 原子操作-CSDN博客DeepSeek 1. 什么是原子操作 原子操作&#xff08;Atomic Opera…

双紫擒龙紫紫红指标源码学习,2025升级版紫紫红指标公式-重点技术

VAR1:MA((LOWHIGHCLOSE)/3,5); VAR2:CLOSEHHV(C,4) AND REF(C,1)LLV(C,4); 双紫擒龙:REF(C,1)LLV(C,4) AND C>REF(C,2) OR REF(C,2)LLV(C,4) AND REF(C,1)<REF(C,3) AND REF(C,2)<REF(C,4) AND C>REF(C,1); VAR4:VAR1>REF(VAR1,1) AND REF(VAR1,1)<REF(VAR1,…

NeuralRecon技术详解:从单目视频中实现三维重建

引言 三维重建是计算机视觉领域中的一项关键技术&#xff0c;它能够从二维图像中恢复出三维形状和结构。随着深度学习的发展&#xff0c;基于学习的方法已经成为三维重建的主流。NeuralRecon是一种先进的三维重建方法&#xff0c;它能够从单目视频中实时生成高质量的三维模型。…

Ubuntu 上开启 SSH 服务、禁用密码登录并仅允许密钥认证

1. 安装 OpenSSH 服务 如果尚未安装 SSH 服务&#xff0c;运行以下命令&#xff1a; sudo apt update sudo apt install openssh-server2. 启动 SSH 服务并设置开机自启 sudo systemctl start ssh sudo systemctl enable ssh3. 生成 SSH 密钥对&#xff08;本地机器&#xf…

MySQL 索引的增删改查

MySQL 索引的增删改查 1 建表时创建索引 [UNIQUE|FULLTEXT|SPATIAL] INDEX|KEY [别名] (字段名 [(长度)] [ASC|DESC] )主键直接写&#xff1a; PRIMARY KEY (Id)例如&#xff1a; CREATE TABLE people (id int NOT NULL PRIMARY KEY AUTO_INCREMENT,last_name varchar(10)…