在阅读这篇文章前,推荐先阅读以下内容:

  1. [netty5: WebSocketFrame]-源码分析
  2. [netty5: WebSocketFrameEncoder & WebSocketFrameDecoder]-源码解析

WebSocketServerHandshakerFactory

WebSocketServerHandshakerFactory 用于根据客户端请求中的 WebSocket 版本构造对应的 WebSocketServerHandshaker 实例,完成握手协议版本的协商与支持判断。

public class WebSocketServerHandshakerFactory {private final String webSocketURL;private final String subprotocols;private final WebSocketDecoderConfig decoderConfig;// ...public WebSocketServerHandshakerFactory(String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {this.webSocketURL = webSocketURL;this.subprotocols = subprotocols;this.decoderConfig = Objects.requireNonNull(decoderConfig, "decoderConfig");}public WebSocketServerHandshaker newHandshaker(HttpRequest req) {return resolveHandshaker0(req, webSocketURL, subprotocols, decoderConfig);}public static WebSocketServerHandshaker resolveHandshaker(HttpRequest req, String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {Objects.requireNonNull(decoderConfig, "decoderConfig");return resolveHandshaker0(req, webSocketURL, subprotocols, decoderConfig);}private static WebSocketServerHandshaker resolveHandshaker0(HttpRequest req, String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);if (version != null && AsciiString.contentEqualsIgnoreCase(version, WebSocketVersion.V13.toAsciiString())) {// Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification).return new WebSocketServerHandshaker13(webSocketURL, subprotocols, decoderConfig);}return null;}public static Future<Void> sendUnsupportedVersionResponse(Channel channel) {HttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.UPGRADE_REQUIRED, channel.bufferAllocator().allocate(0));res.headers().set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, WebSocketVersion.V13.toHttpHeaderValue());HttpUtil.setContentLength(res, 0);return channel.writeAndFlush(res);}
}

WebSocketServerHandshaker13

WebSocketServerHandshaker13 负责基于 RFC 6455 实现 WebSocket 版本 13 的服务端握手处理流程,包括请求校验、响应生成、子协议协商和帧编解码器的安装。

public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {public WebSocketServerHandshaker13(String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {super(WebSocketVersion.V13, webSocketURL, subprotocols, decoderConfig);}/*** <p>* Handle the web socket handshake for the web socket specification <a href=* "https://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-17">HyBi versions 13-17</a>. Versions 13-17* share the same wire protocol.* </p>** <p>* Browser request to the server:* </p>** <pre>* GET /chat HTTP/1.1* Host: server.example.com* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==* Origin: http://example.com* Sec-WebSocket-Protocol: chat, superchat* Sec-WebSocket-Version: 13* </pre>** <p>* Server response:* </p>** <pre>* HTTP/1.1 101 Switching Protocols* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=* Sec-WebSocket-Protocol: chat* </pre>*/@Overrideprotected FullHttpResponse newHandshakeResponse(BufferAllocator allocator, FullHttpRequest req, HttpHeaders headers) {HttpMethod method = req.method();if (!HttpMethod.GET.equals(method)) {throw new WebSocketServerHandshakeException("Invalid WebSocket handshake method: " + method, req);}HttpHeaders reqHeaders = req.headers();if (!reqHeaders.contains(HttpHeaderNames.CONNECTION) || !reqHeaders.containsIgnoreCase(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)) {throw new WebSocketServerHandshakeException("not a WebSocket request: a |Connection| header must includes a token 'Upgrade'", req);}if (!reqHeaders.containsIgnoreCase(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET)) {throw new WebSocketServerHandshakeException("not a WebSocket request: a |Upgrade| header must containing the value 'websocket'", req);}CharSequence key = reqHeaders.get(HttpHeaderNames.SEC_WEBSOCKET_KEY);if (key == null) {throw new WebSocketServerHandshakeException("not a WebSocket request: missing key", req);}FullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS,allocator.allocate(0));if (headers != null) {res.headers().add(headers);}String accept = WebSocketUtil.calculateV13Accept(key.toString());res.headers().set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET).set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE).set(HttpHeaderNames.SEC_WEBSOCKET_ACCEPT, accept);CharSequence subprotocols = reqHeaders.get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);if (subprotocols != null) {String selectedSubprotocol = selectSubprotocol(subprotocols.toString());if (selectedSubprotocol == null) {if (logger.isDebugEnabled()) {logger.debug("Requested subprotocol(s) not supported: {}", subprotocols);}} else {res.headers().set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, selectedSubprotocol);}}return res;}@Overrideprotected WebSocketFrameDecoder newWebsocketDecoder() {return new WebSocket13FrameDecoder(decoderConfig());}@Overrideprotected WebSocketFrameEncoder newWebSocketEncoder() {return new WebSocket13FrameEncoder(false);}
}

WebSocketServerHandshaker

WebSocketServerHandshaker 是 WebSocket 握手处理的抽象基类,定义了服务端握手响应、子协议选择和编解码器安装等通用逻辑,供具体版本(如 V13)实现。

public abstract class WebSocketServerHandshaker {protected static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandshaker.class);private final String uri;private final String[] subprotocols;private final WebSocketVersion version;private final WebSocketDecoderConfig decoderConfig;private String selectedSubprotocol;public static final String SUB_PROTOCOL_WILDCARD = "*";protected WebSocketServerHandshaker(WebSocketVersion version, String uri, String subprotocols, WebSocketDecoderConfig decoderConfig) {this.version = version;this.uri = uri;if (subprotocols != null) {String[] subprotocolArray = subprotocols.split(",");for (int i = 0; i < subprotocolArray.length; i++) {subprotocolArray[i] = subprotocolArray[i].trim();}this.subprotocols = subprotocolArray;} else {this.subprotocols = EmptyArrays.EMPTY_STRINGS;}this.decoderConfig = requireNonNull(decoderConfig, "decoderConfig");}// 将当前 Handshaker 支持的子协议数组转换为有序去重的 Set 返回,用于后续子协议协商。public Set<String> subprotocols() {Set<String> ret = new LinkedHashSet<>();Collections.addAll(ret, subprotocols);return ret;}// WebSocketServerProtocolHandshakeHandler.channelRead// 执行 WebSocket 握手响应、替换或插入编解码器并清理不兼容的 HTTP 处理器,最终完成协议切换。public Future<Void> handshake(Channel channel, FullHttpRequest req) {return handshake(channel, req, null);}public final Future<Void> handshake(Channel channel, FullHttpRequest req, HttpHeaders responseHeaders) {if (logger.isDebugEnabled()) {logger.debug("{} WebSocket version {} server handshake", channel, version());}//  WebSocketServerHandshaker13.newHandshakeResponseFullHttpResponse response = newHandshakeResponse(channel.bufferAllocator(), req, responseHeaders);// 移除 HttpObjectAggregator 和 HttpContentCompressorChannelPipeline p = channel.pipeline();if (p.get(HttpObjectAggregator.class) != null) {p.remove(HttpObjectAggregator.class);}if (p.get(HttpContentCompressor.class) != null) {p.remove(HttpContentCompressor.class);}ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);final String encoderName;if (ctx == null) {// this means the user use an HttpServerCodecctx = p.context(HttpServerCodec.class);if (ctx == null) {response.close();return channel.newFailedFuture(new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));}p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());encoderName = ctx.name();} else {p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());encoderName = p.context(HttpResponseEncoder.class).name();p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());}return channel.writeAndFlush(response).addListener(channel, (ch, future) -> {if (future.isSuccess()) {ChannelPipeline p1 = ch.pipeline();p1.remove(encoderName);}});}// 处理非 FullHttpRequest 的 WebSocket 握手场景,通过临时注入 ChannelHandler 聚合请求数据并完成协议切换public Future<Void> handshake(Channel channel, HttpRequest req) {return handshake(channel, req, null);}// 在没有使用 HttpObjectAggregator 的情况下,// 动态地通过临时注入一个 ChannelHandler 来手动聚合 HTTP 请求的各个部分// 最终组装成一个 FullHttpRequest,完成 WebSocket 握手的流程public final Future<Void> handshake(final Channel channel, HttpRequest req, final HttpHeaders responseHeaders) {// 如果传进来的 req 已经是 FullHttpRequest,直接调用已有的 handshake(Channel, FullHttpRequest, HttpHeaders) 方法处理。// 否则,说明请求是分段的(HttpRequest + HttpContent),需要手动聚合。if (req instanceof FullHttpRequest) {return handshake(channel, (FullHttpRequest) req, responseHeaders);}ChannelPipeline pipeline = channel.pipeline();//  先在 ChannelPipeline 里找 HttpRequestDecoder 的 ChannelHandlerContext。// 如果没找到,再找 HttpServerCodec。// 如果都没找到,直接失败,返回异常。ChannelHandlerContext ctx = pipeline.context(HttpRequestDecoder.class);if (ctx == null) {// This means the user use a HttpServerCodecctx = pipeline.context(HttpServerCodec.class);if (ctx == null) {return channel.newFailedFuture(new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));}}// 动态注入一个临时的 ChannelHandlerAdapter,名字叫 "handshaker"// 它的职责是监听接下来流入的 HttpObject 消息,把 HttpRequest、HttpContent、LastHttpContent 等部分组装成一个完整的 FullHttpRequest// 当完整请求组装完成后:// 	1. 立刻移除自己(ctx.pipeline().remove(this)),避免继续拦截后续消息。// 	2. 调用真正的 handshake(Channel, FullHttpRequest, HttpHeaders) 继续 WebSocket 握手。// 	3. 把握手的 Future 结果关联到当前的 promise 上。final Promise<Void> promise = channel.newPromise();pipeline.addAfter(ctx.name(), "handshaker", new ChannelHandlerAdapter() {private FullHttpRequest fullHttpRequest;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof HttpObject) {try {handleHandshakeRequest(ctx, (HttpObject) msg);} finally {Resource.dispose(msg);}} else {super.channelRead(ctx, msg);}}@Overridepublic void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.pipeline().remove(this);promise.tryFailure(cause);super.channelExceptionCaught(ctx, cause);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {try {// Fail promise if Channel was closedif (!promise.isDone()) {promise.tryFailure(new ClosedChannelException());}ctx.fireChannelInactive();} finally {releaseFullHttpRequest();}}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {releaseFullHttpRequest();}private void handleHandshakeRequest(ChannelHandlerContext ctx, HttpObject httpObject) {if (httpObject instanceof FullHttpRequest) {ctx.pipeline().remove(this);handshake(channel, (FullHttpRequest) httpObject, responseHeaders).cascadeTo(promise);return;}if (httpObject instanceof LastHttpContent) {assert fullHttpRequest != null;try (FullHttpRequest handshakeRequest = fullHttpRequest) {fullHttpRequest = null;ctx.pipeline().remove(this);handshake(channel, handshakeRequest, responseHeaders).cascadeTo(promise);}return;}if (httpObject instanceof HttpRequest) {HttpRequest httpRequest = (HttpRequest) httpObject;fullHttpRequest = new DefaultFullHttpRequest(httpRequest.protocolVersion(), httpRequest.method(),httpRequest.uri(), ctx.bufferAllocator().allocate(0),httpRequest.headers(), HttpHeaders.emptyHeaders());if (httpRequest.decoderResult().isFailure()) {fullHttpRequest.setDecoderResult(httpRequest.decoderResult());}}}private void releaseFullHttpRequest() {if (fullHttpRequest != null) {fullHttpRequest.close();fullHttpRequest = null;}}});try {ctx.fireChannelRead(ReferenceCountUtil.retain(req));} catch (Throwable cause) {promise.setFailure(cause);}return promise.asFuture();}public Future<Void> close(Channel channel, CloseWebSocketFrame frame) {requireNonNull(channel, "channel");return close0(channel, frame);}public Future<Void> close(ChannelHandlerContext ctx, CloseWebSocketFrame frame) {requireNonNull(ctx, "ctx");return close0(ctx, frame);}private static Future<Void> close0(ChannelOutboundInvoker invoker, CloseWebSocketFrame frame) {return invoker.writeAndFlush(frame).addListener(invoker, ChannelFutureListeners.CLOSE);}// WebSocketServerHandshaker13.newHandshakeResponse// 服务端从客户端请求的子协议中选出一个自己支持的返回给客户端的过程protected String selectSubprotocol(String requestedSubprotocols) {if (requestedSubprotocols == null || subprotocols.length == 0) {return null;}String[] requestedSubprotocolArray = requestedSubprotocols.split(",");for (String p : requestedSubprotocolArray) {String requestedSubprotocol = p.trim();for (String supportedSubprotocol : subprotocols) {if (SUB_PROTOCOL_WILDCARD.equals(supportedSubprotocol) || requestedSubprotocol.equals(supportedSubprotocol)) {selectedSubprotocol = requestedSubprotocol;return requestedSubprotocol;}}}// No match foundreturn null;}protected abstract FullHttpResponse newHandshakeResponse(BufferAllocator allocator, FullHttpRequest req,HttpHeaders responseHeaders);protected abstract WebSocketFrameDecoder newWebsocketDecoder();protected abstract WebSocketFrameEncoder newWebSocketEncoder();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/89991.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/89991.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/89991.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据挖掘:深度解析与实战应用

在当今数字化时代&#xff0c;数据挖掘已经成为企业获取竞争优势的关键技术之一。通过从大量数据中提取有价值的信息&#xff0c;企业可以更好地理解客户需求、优化业务流程、提高运营效率。本文将深入探讨数据挖掘的核心技术、实际应用案例以及如何在企业中实施数据挖掘项目。…

LLM面试题14

算法岗面试题 介绍下Transformer模型。 Transformer本身是一个典型的encoder-decoder模型&#xff0c;Encoder端和Decoder端均有6个Block,Encoder端的Block包括两个模块&#xff0c;多头self-attention模块以及一个前馈神经网络模块&#xff1b;Decoder端的Block包括三个模块&…

Java金融场景中为什么金额字段禁止使用浮点类型(float/double)?

引言 Java金融场景中为什么金额字段禁止使用浮点类型&#xff1f;这是一篇你不能忽视的“爆雷”警告&#xff01; 在金融、电商、支付、清结算等业务系统中&#xff0c;浮点类型是绝对禁区&#xff01; &#x1f6a8;一、核心警告&#xff1a;浮点类型不是十进制数&#xff01;…

SVN下载与拉取

大家好我是苏麟&#xff0c;今天聊一聊SVN。 SVN官网&#xff1a;下载 TortoiseSVN - TortoiseSVN 软件 根据系统选择32位还是64位 打开文件 安装&#xff0c;下一步&#xff0c;下一步 安装成功后&#xff0c;右键找到SVNcheck 输入地址 输入用户名和密码就OK了 这期就到这里…

数据结构笔记8:堆

目录 满二叉树&#xff1a; 完全二叉树&#xff1a; 堆是一种特殊的完全二叉树&#xff1a; 我们可以以数组的方式存储堆。 父节点和子节点下标关系的推导&#xff1a; 1.使用数学归纳法证明n2 1 n0&#xff1a; 2.使用边和节点的关系证明n2 1 n0&#xff1a; 我们…

3. lvgl 9.3 vscode 模拟环境搭建 lv_port_pc_vscode-release-v9.3

文章目录1. 资源下载1. 1 lv_port_pc_vscode1.2 cmake 和 mingw 环境搭建1.3 sdl 下载1.4 下载lvgl_v9.32. 环境搭建2.1 拷贝lvgl 源码到工程2.2 添加SDL2 依赖2.3 执行工程3. 运行示例1. 资源下载 1. 1 lv_port_pc_vscode 那么多模拟器&#xff0c;为什么选择这个&#xff1…

【牛客刷题】小红的爆炸串(二)

一、题目介绍 本题链接为:小红的爆炸串(二) 小红定义一个字符串会爆炸,当且仅当至少有k对相邻的字母不同。 例如,当 k k k=2时,"arc"会爆炸,而"aabb"则不会爆炸。 小红拿到了一个长度为

【实战】如何训练一个客服语音对话场景VAD模型

1. 引言:客服场景下的VAD模型 在客服中心,每天都会产生海量的通话录音。对这些录音进行有效分析,可以用于服务质量监控、客户意图洞察、流程优化等。VAD在其中扮演着“预处理器”和“过滤器”的关键角色: 提升ASR效率与准确性:只将检测到的语音片段送入ASR引擎,可以避免…

在 Dokploy 中为 PostgreSQL 搭建 PgBouncer 数据库连接池(图文)

前言&#xff1a;为什么你需要一个连接池&#xff1f; 如果你正在使用 Node.js (尤其是像 Next.js 这样的框架) 配合 Prisma 操作 PostgreSQL 数据库&#xff0c;你很可能在某个阶段会遇到那个令人头疼的错误&#xff1a;“Error: Too many clients already”。这通常发生在应…

Mac获取终端历史

在 macOS 中&#xff0c;历史记录文件的位置取决于你使用的 shell。以下是针对不同 shell 的历史记录文件的默认位置&#xff1a;对于 Bash 用户&#xff1a; 历史记录文件通常位于 ~/.bash_history。对于 Zsh 用户&#xff08;macOS Catalina及以后版本默认使用的shell&#x…

高频交易服务器篇

在 Binance 进行高频交易&#xff08;HFT&#xff09;时&#xff0c;服务器的低延迟、高稳定性和快速网络是关键。亚马逊云&#xff08;AWS&#xff09; 提供了多种适合高频交易的方案&#xff0c;以下是推荐的配置和优化策略&#xff1a;1. 选择 AWS 区域&#xff08;Region&a…

MVC与MVVM架构模式详解:原理、区别与JavaScript实现

Hi&#xff0c;我是布兰妮甜 &#xff01;在当今复杂的前端开发领域&#xff0c;如何组织代码结构一直是开发者面临的核心挑战。MVC和MVVM作为两种经典的架构模式&#xff0c;为前端应用提供了清晰的责任划分和可维护的代码组织方案。本文将深入探讨这两种模式的原理、实现差异…

从小白到进阶:解锁linux与c语言高级编程知识点嵌入式开发的任督二脉(2)

【硬核揭秘】Linux与C高级编程&#xff1a;从入门到精通&#xff0c;你的全栈之路&#xff01; 第三部分&#xff1a;Shell脚本编程——自动化你的Linux世界&#xff0c;让效率飞起来&#xff01; 嘿&#xff0c;各位C语言的“卷王”们&#xff01; 在Linux的世界里&#xf…

锁和事务的关系

事务的4大特性(ACID) 原子性&#xff08;Atomicity&#xff09;&#xff1a;事务被视为一个单一的、不可分割的工作单元一致性&#xff08;Consistency&#xff09;&#xff1a;事务执行前后&#xff0c;数据库从一个一致状态转变为另一个一致状态&#xff0c;并且强制执行所有…

电动车信用免押小程序免押租赁小程序php方案

电动车信用免押租赁小程序&#xff0c;免押租小程序&#xff0c;信用免押接口申请、对接开发&#xff0c;可源码搭建&#xff0c;可二开或定制。开发语言后端php&#xff0c;前端uniapp。可二开定制 在线选择门店&#xff0c;选择车辆类型&#xff0c;选择租赁方式&#xff08…

机器学习在智能安防中的应用:视频监控与异常行为检测

随着人工智能技术的飞速发展&#xff0c;智能安防领域正经历着一场深刻的变革。智能安防通过整合先进的信息技术&#xff0c;如物联网&#xff08;IoT&#xff09;、大数据和机器学习&#xff0c;能够实现从传统的被动防御到主动预防的转变。机器学习技术在智能安防中的应用尤为…

MySQL中DROP、DELETE与TRUNCATE的深度解析

在MySQL数据库操作中&#xff0c;DROP、DELETE和TRUNCATE是三个常用的数据操作命令&#xff0c;它们都可以用于删除数据&#xff0c;但在功能、执行效率、事务处理以及对表结构的影响等方面存在显著差异。本文将从多个维度对这三个命令进行详细对比和解析&#xff0c;帮助读者更…

一条 SQL 语句的内部执行流程详解(MySQL为例)

当执行如下 SQL&#xff1a; SELECT * FROM users WHERE id 1;在数据库内部&#xff0c;其实会经历多个复杂且有序的阶段。以下是 MySQL&#xff08;InnoDB 引擎&#xff09;中 SQL 查询语句从发送到结果返回的完整执行流程。 客户端连接阶段 客户端&#xff08;如 JDBC、My…

超详细yolo8/11-detect目标检测全流程概述:配置环境、数据标注、训练、验证/预测、onnx部署(c++/python)详解

文章目录 一、配置环境二、数据标注三、模型训练四、验证预测五、onnx部署c 版python版本 一、配置环境 我的都是在Linux系统下&#xff0c;训练部署的&#xff1b;模型训练之前&#xff0c;需要配置好环境&#xff0c;Anaconda、显卡驱动、cuda、cudnn、pytorch等&#xff1b…

阿里云Flink:开启大数据实时处理新时代

走进阿里云 Flink 在大数据处理的广袤领域中&#xff0c;阿里云 Flink 犹如一颗璀璨的明星&#xff0c;占据着举足轻重的地位。随着数据量呈指数级增长&#xff0c;企业对数据处理的实时性、高效性和准确性提出了前所未有的挑战 。传统的数据处理方式逐渐难以满足这些严苛的需…