通用请求体类
@Data
@ApiModel("websocket请求消息")
public class WebSocketRequest<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 参考:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum;*/private @NotNull(message = "业务操作类型不能为空") Integer aiBroadcastEventEnum;private T data;public T getRealData(Class<T> clazz) {if (this.data == null) {return null;} else {String jsonStr = JsonUtil.toJsonStr(this.data);return (T) JsonUtil.parseObject(jsonStr, clazz);}}}
通用响应类
@ApiModel("websocket响应消息")
@Data
public class WebSocketResponse<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 参考枚举:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum*/private Integer aiBroadcastEventEnum;private String code;private String msg;private T data;public static <T> Mono<WebSocketResponse<T>> ok(Integer eventEnum, T data) {WebSocketResponse<T> response = new WebSocketResponse<T>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(RespStatusEnum.OK.getCode());response.setMsg(RespStatusEnum.OK.getMsg());response.setData(data);return Mono.just(response);}public static Mono<WebSocketResponse<Void>> ok(Integer eventEnum) {WebSocketResponse<Void> response = new WebSocketResponse<Void>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(RespStatusEnum.OK.getCode());response.setMsg(RespStatusEnum.OK.getMsg());return Mono.just(response);}public static <T> Mono<WebSocketResponse<T>> fail(Integer eventEnum, RespStatusEnum status, String err) {WebSocketResponse<T> response = new WebSocketResponse<T>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(status.getCode());response.setMsg(err);return Mono.just(response);}}
连接类型类
@Data
@Accessors(chain = true)
public class ConnectDTO {/*** 连接类型 参考枚举:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum*/private Integer type;}
- 配置类
@Configuration
public class WebFluxWebSocketConfig {/** 让 Spring 注入已经带依赖的 Handler */@Beanpublic HandlerMapping webSocketMapping(WebSocketReceivedHandler handler) {return new SimpleUrlHandlerMapping(Map.of("/api/xxx/ws", handler), // 用注入的 handler-1);}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
- 实现类
@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketReceivedHandler implements WebSocketHandler {@Autowiredprivate AiBroadcastEventHandlerDispatcher<?, ?> dispatcher;@Autowiredprivate WsSessionPool wsSessionPool;@Overridepublic @NotNull Mono<Void> handle(@NotNull WebSocketSession session) {wsSessionPool.add(session);String sid = session.getId();// 处理客户端请求消息,生成响应消息流Flux<WebSocketMessage> inputFlux = session.receive().map(WebSocketMessage::getPayloadAsText).flatMap(payload -> dispatcher.doDispatch(session, payload).map(session::textMessage));// 服务端广播消息流Flux<WebSocketMessage> broadcastFlux = wsSessionPool.getPersonalFlux(sid).map(session::textMessage);// 合并两个流,确保 session.send 只调用一次Flux<WebSocketMessage> mergedFlux = Flux.merge(inputFlux, broadcastFlux);return session.send(mergedFlux).doFinally(sig -> {wsSessionPool.remove(session);log.info("websocket 关闭,sessionId:{},signal:{}", session.getId(), sig);});}
}
3.处理类 aiBroadcastEventEnum 是枚举类型,根据不同的枚举类型进入不同的处理类进行处理不同的消息返回
@Component
@Slf4j
public class AiBroadcastEventHandlerDispatcher<T, R> {private final Map<Integer, AiBroadcastEventHandler<T, R>> eventMap = new HashMap<>();/** 由 Spring 注入所有事件处理器 */public AiBroadcastEventHandlerDispatcher(List<AiBroadcastEventHandler<T, R>> handlers) {handlers.forEach(h -> eventMap.put(h.aiBroadcastEvent(), h));}/*** 处理前端发来的 Payload 并把响应写回当前 session** @param session 当前 WebFlux Session* @param payload 文本 JSON* @return Mono<Void> 供调用方链式订阅*/public Mono<String> doDispatch(WebSocketSession session, String payload) {WebSocketRequest<R> webSocketRequest = JsonUtil.parseObject(payload, new TypeReference<WebSocketRequest<R>>() {});log.info("webSocketRequest:{}, sessionID:{}", webSocketRequest, session.getId());// 发送响应并记录日志return handlerRequest(webSocketRequest, session).map(JsonUtil::toJson).doOnNext(json -> log.info("准备发送: {}", json)).onErrorResume(e -> {log.error("发送异常", e);return Mono.just(JsonUtil.toJson(WebSocketResponse.fail(webSocketRequest != null ? webSocketRequest.getAiBroadcastEventEnum() : null,RespStatusEnum.INTERNAL_SERVICE_ERROR,"系统异常:" + e.getMessage())));});}/** 实际路由到具体 Handler */private Mono<WebSocketResponse<T>> handlerRequest(WebSocketRequest<R> req, WebSocketSession session) {if (ObjectUtil.isNull(req) || !eventMap.containsKey(req.getAiBroadcastEventEnum())) {return WebSocketResponse.fail(req.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR,"aiBroadcastEventEnum not find");}try {return eventMap.get(req.getAiBroadcastEventEnum()).handler(req, session);} catch (Exception e) {log.error("websocket 处理消息异常,webSocketRequest:{}, sessionID:{}",req, session.getId(), e);return WebSocketResponse.fail(req.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR, e.getMessage());}}}
- 接口
public interface AiBroadcastEventHandler<T, R> {/*** 对应事件枚举值(例如 MAP_ALARM_CHARGING 的 code)*/Integer aiBroadcastEvent();/*** 执行处理逻辑,并返回响应,最终由 dispatcher 推送给前端** @param webSocketRequest 请求体* @param session 当前连接* @return Mono<WebSocketResponse<T>> 最终会转成 JSON 发给前端*/Mono<WebSocketResponse<T>> handler(WebSocketRequest<R> webSocketRequest, WebSocketSession session);/*** 校验参数*/void validateParam(WebSocketRequest<R> webSocketRequest) throws ParameterException;
- 通用处理逻辑
public abstract class AbstractAiBroadcastEventHandler<T, R>implements AiBroadcastEventHandler<T, R> {/*** websocket 请求事件处理统一流程:参数校验 → 业务处理*/@Overridepublic Mono<WebSocketResponse<T>> handler(WebSocketRequest<R> webSocketRequest,WebSocketSession session) {try {validateParam(webSocketRequest);return doHandler(webSocketRequest, session);} catch (ParameterException e) {return WebSocketResponse.fail(webSocketRequest.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR,e.getMessage());}}/*** 子类实现真正的业务逻辑:* 1. 更新本地 sessionId / Redis 映射* 2. 查询并返回最新业务数据*/public abstract Mono<WebSocketResponse<T>> doHandler(WebSocketRequest<R> webSocketRequest, WebSocketSession session);
- 实现类
@Component
@Slf4j
public class SubscribeFireContentHandler extends AbstractAiBroadcastEventHandler<VO, ConnectDTO> implements AiBroadcastEventHandler<VO, ConnectDTO>{@Autowiredprivate BizServiceSafeScreenClient bizServiceSafeScreenClient;@Overridepublic Integer aiBroadcastEvent() {return AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode();}@Overridepublic Mono<WebSocketResponse<VO>> doHandler(WebSocketRequest<ConnectDTO> webSocketRequest, WebSocketSession session) {log.info("SubscribeFireContentHandler doHandler start");//session订阅该订单数据ConnectDTO dto = webSocketRequest.getRealData(ConnectDTO.class);if (!AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode().equals(dto.getType())) {return Mono.error(new RespException("参数异常", RespStatusEnum.CLIENT_ERROR));}// 查询火灾站 前端拼接内容FinalResultVO<VO> xxx = 调用接口获取数据;if (xxx != null) {Mono<WebSocketResponse<FireStationVO>> ok = WebSocketResponse.ok(AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode(), xxx);return ok;}return Mono.empty();}@Overridepublic void validateParam(WebSocketRequest<ConnectDTO> webSocketRequest) {ConnectDTO dto = webSocketRequest.getRealData(ConnectDTO.class);if (ObjUtil.isNull(dto.getType())) {throw new RespException("参数异常", RespStatusEnum.CLIENT_ERROR);}}
7.心跳
@Component
@Log4j2
public class WsHeartbeatTask {private final WsSessionPool wsSessionPool;public WsHeartbeatTask(WsSessionPool wsSessionPool) {this.wsSessionPool = wsSessionPool;}@PostConstructpublic void init() {log.info("WebSocket心跳任务已启动");}// 每30秒广播一个心跳消息@Scheduled(fixedRate = 30_000)public void sendHeartbeat() {String timeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));String json = String.format("{\"type\":\"ping\",\"timestamp\":\"%s\"}", timeStr);wsSessionPool.broadcast(json);}
}