一、认证架构设计
1.1 WebSocket安全认证流程
1.2 安全组件矩阵
组件 | 职责 | 关键技术点 |
---|---|---|
HandshakeInterceptor | 握手前认证 | Token解析/校验 |
ChannelInterceptor | 消息级鉴权 | @PreAuthorize |
SimpUserRegistry | 用户会话管理 | Principal关联 |
SecurityContext | 安全上下文 | 线程局部变量 |
二、生产级认证实现
2.1 JWT认证拦截器
public class JwtHandshakeInterceptor implements HandshakeInterceptor {private final JwtDecoder jwtDecoder;@Overridepublic boolean beforeHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Map<String, Object> attributes) throws Exception {String token = extractToken(request);if (token == null) {response.setStatusCode(HttpStatus.UNAUTHORIZED);return false;}try {Jwt jwt = jwtDecoder.decode(token);attributes.put("jwt", jwt);return true;} catch (JwtException e) {response.setStatusCode(HttpStatus.FORBIDDEN);return false;}}private String extractToken(ServerHttpRequest request) {// 从Header/QueryParam/Cookie中提取TokenList<String> headers = request.getHeaders().get("Authorization");if (headers != null && !headers.isEmpty()) {String header = headers.get(0);if (header.startsWith("Bearer ")) {return header.substring(7);}}return null;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Exception exception) {// 握手后处理逻辑}
}
2.2 消息级权限控制
@Configuration
@EnableWebSocketSecurity
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {@Overrideprotected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {messages.simpDestMatchers("/app/device/**").hasRole("DEVICE").simpSubscribeDestMatchers("/topic/admin/**").hasRole("ADMIN").simpSubscribeDestMatchers("/user/queue/**").authenticated().anyMessage().denyAll();}@Overrideprotected boolean sameOriginDisabled() {return true; // 禁用CSRF保护(根据需求配置)}@Beanpublic ChannelInterceptor authorizationChannelInterceptor() {return new ChannelInterceptor() {@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);if (StompCommand.CONNECT.equals(accessor.getCommand())) {Authentication auth = (Authentication) accessor.getHeader("simpUser");if (auth == null || !auth.isAuthenticated()) {throw new AuthenticationCredentialsNotFoundException("未认证");}}return message;}};}
}
三、高级安全特性
3.1 双因素认证集成
public class TwoFactorAuthInterceptor implements ChannelInterceptor {@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {Principal principal = accessor.getUser();String sessionId = accessor.getSessionId();if (isSensitiveDestination(accessor.getDestination()) &&!twoFactorService.isVerified(principal.getName(), sessionId)) {throw new TwoFactorRequiredException("需要二次认证");}}return message;}private boolean isSensitiveDestination(String destination) {return destination != null &&(destination.startsWith("/topic/finance") ||destination.startsWith("/app/admin"));}
}
3.2 速率限制控制
@Bean
public ChannelInterceptor rateLimitingInterceptor() {return new ChannelInterceptor() {private final RateLimiter limiter = RateLimiter.create(100); // 100条/秒@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {if (!limiter.tryAcquire()) {throw new RateLimitExceededException("消息发送频率过高");}return message;}};
}
四、集群环境方案
4.1 分布式会话管理
@Configuration
@EnableRedisHttpSession
public class SessionConfig {@Beanpublic RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory,SessionExpiredListener listener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);container.addMessageListener(listener,new PatternTopic("__keyevent@*__:expired"));return container;}@Componentpublic static class SessionExpiredListener implements MessageListener {@Autowiredprivate SimpUserRegistry userRegistry;@Overridepublic void onMessage(Message message, byte[] pattern) {String key = new String(message.getBody());if (key.startsWith("spring:session:sessions:")) {String sessionId = key.substring(25);userRegistry.users().stream().filter(user -> user.getSession(sessionId) != null).findFirst().ifPresent(user -> {SimpUser simpUser = userRegistry.getUser(user.getName());simpUser.getSession(sessionId).close(CloseStatus.SESSION_NOT_RELIABLE);});}}}
}
4.2 跨节点消息审计
@Configuration
public class AuditConfig {@Beanpublic MessageChannel auditChannel() {return new ExecutorSubscribableChannel(taskExecutor());}@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(1000);return executor;}@Bean@ServiceActivator(inputChannel = "auditChannel")public MessageHandler auditHandler() {return message -> {StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);String user = Optional.ofNullable(accessor.getUser()).map(Principal::getName).orElse("anonymous");auditService.log(user,accessor.getSessionId(),accessor.getCommand().name(),accessor.getDestination(),Instant.now());};}
}
五、监控与运维
5.1 健康检查端点
@RestController
@RequestMapping("/actuator/websocket")
public class WebSocketMetricsEndpoint {@Autowiredprivate SimpUserRegistry userRegistry;@GetMapping("/connections")public Map<String, Object> connectionMetrics() {Map<String, Object> metrics = new LinkedHashMap<>();metrics.put("activeConnections", userRegistry.getUserCount());metrics.put("activeSessions", userRegistry.users().stream().mapToInt(u -> u.getSessions().size()).sum());return metrics;}@GetMapping("/users")public Collection<SimpUser> activeUsers() {return userRegistry.users();}
}
5.2 异常处理策略
@ControllerAdvice
public class WebSocketExceptionHandler {@MessageExceptionHandler@SendToUser("/queue/errors")public ErrorResponse handleException(Exception ex) {ErrorResponse response = new ErrorResponse();response.setTimestamp(Instant.now());if (ex instanceof AuthenticationException) {response.setCode("AUTH_ERROR");response.setMessage("认证失败");} else if (ex instanceof AccessDeniedException) {response.setCode("ACCESS_DENIED");response.setMessage("权限不足");} else {response.setCode("INTERNAL_ERROR");response.setMessage("服务器内部错误");}return response;}@Datapublic static class ErrorResponse {private String code;private String message;private Instant timestamp;}
}
六、前端安全集成
6.1 Token刷新机制
function connectWebSocket() {const socket = new SockJS('/ws');const stompClient = Stomp.over(socket);const headers = {'Authorization': `Bearer ${getAccessToken()}`};stompClient.connect(headers, () => {// 连接成功处理}, (error) => {if (error.headers?.message === 'TOKEN_EXPIRED') {refreshToken().then(newToken => {headers.Authorization = `Bearer ${newToken}`;connectWebSocket(); // 重连});}});
}
6.2 安全断开处理
function setupWebSocket() {let reconnectAttempts = 0;const maxReconnectAttempts = 3;function connect() {stompClient.connect(headers, frame => {reconnectAttempts = 0;// 心跳检测const heartbeatInterval = setInterval(() => {if (!stompClient.connected) {clearInterval(heartbeatInterval);return;}stompClient.send("/app/heartbeat");}, 30000);stompClient.onclose = () => {clearInterval(heartbeatInterval);if (reconnectAttempts < maxReconnectAttempts) {setTimeout(connect, 1000 * Math.pow(2, reconnectAttempts));reconnectAttempts++;}};});}window.addEventListener('beforeunload', () => {if (stompClient.connected) {stompClient.disconnect(() => {}, {});}});
}
通过以上方案,可以构建出企业级安全的WebSocket通信系统。建议在实际部署时:
- 根据业务需求调整认证粒度
- 实施完善的监控告警机制
- 定期进行安全审计
- 建立完整的证书管理体系
- 制定详细的应急响应预案