在阅读这篇文章前,推荐先阅读以下内容:

  1. [netty5: WebSocketFrame]-源码分析
  2. [netty5: WebSocketFrameEncoder & WebSocketFrameDecoder]-源码解析

WebSocketClientHandshakerFactory

WebSocketClientHandshakerFactory 是用于根据 URI 和协议版本创建对应 WebSocket 握手器(Handshaker)的工厂类,简化客户端握手流程。

public final class WebSocketClientHandshakerFactory {private WebSocketClientHandshakerFactory() {}// ...// new WebSocketClientProtocolHandler(config)public static WebSocketClientHandshaker newHandshaker(URI webSocketURL, WebSocketVersion version, String subprotocol,boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,boolean performMasking, boolean allowMaskMismatch, long forceCloseTimeoutMillis,boolean absoluteUpgradeUrl, boolean generateOriginHeader) {return new WebSocketClientHandshaker13(webSocketURL, subprotocol, allowExtensions, customHeaders,maxFramePayloadLength, performMasking, allowMaskMismatch, forceCloseTimeoutMillis,absoluteUpgradeUrl, generateOriginHeader);}
}

WebSocketClientHandshaker13

WebSocketClientHandshaker13 是实现 WebSocket 协议 RFC 6455(版本13)的客户端握手器,负责构造握手请求、验证响应并完成协议升级。

public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {private final boolean allowExtensions;private final boolean performMasking;private final boolean allowMaskMismatch;private volatile String sentNonce;// WebSocketClientHandshakerFactory.newHandshakerWebSocketClientHandshaker13(URI webSocketURL, String subprotocol,boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,boolean performMasking, boolean allowMaskMismatch,long forceCloseTimeoutMillis, boolean absoluteUpgradeUrl,boolean generateOriginHeader) {super(webSocketURL, WebSocketVersion.V13, subprotocol, customHeaders, maxFramePayloadLength,forceCloseTimeoutMillis, absoluteUpgradeUrl, generateOriginHeader);this.allowExtensions = allowExtensions;this.performMasking = performMasking;this.allowMaskMismatch = allowMaskMismatch;}/*** /*** <p>* Sends the opening request to the server:* </p>** <pre>* GET /chat HTTP/1.1* Host: server.example.com* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==* Sec-WebSocket-Protocol: chat, superchat* Sec-WebSocket-Version: 13* </pre>**/@Overrideprotected FullHttpRequest newHandshakeRequest(BufferAllocator allocator) {URI wsURL = uri();FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, upgradeUrl(wsURL),allocator.allocate(0));HttpHeaders headers = request.headers();if (customHeaders != null) {headers.add(customHeaders);if (!headers.contains(HttpHeaderNames.HOST)) {headers.set(HttpHeaderNames.HOST, websocketHostValue(wsURL));}} else {headers.set(HttpHeaderNames.HOST, websocketHostValue(wsURL));}String nonce = createNonce();headers.set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET).set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE).set(HttpHeaderNames.SEC_WEBSOCKET_KEY, nonce);if (generateOriginHeader && !headers.contains(HttpHeaderNames.ORIGIN)) {headers.set(HttpHeaderNames.ORIGIN, websocketHostValue(wsURL));}sentNonce = nonce;String expectedSubprotocol = expectedSubprotocol();if (!StringUtil.isNullOrEmpty(expectedSubprotocol)) {headers.set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, expectedSubprotocol);}headers.set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, version().toAsciiString());return request;}/*** <p>* Process server response:* </p>** <pre>* HTTP/1.1 101 Switching Protocols* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=* Sec-WebSocket-Protocol: chat* </pre>** @param response*            HTTP response returned from the server for the request sent by beginOpeningHandshake00().* @throws WebSocketHandshakeException if handshake response is invalid.*/@Overrideprotected void verify(FullHttpResponse response) {HttpResponseStatus status = response.status();if (!HttpResponseStatus.SWITCHING_PROTOCOLS.equals(status)) {throw new WebSocketClientHandshakeException("Invalid handshake response status: " + status, response);}HttpHeaders headers = response.headers();CharSequence upgrade = headers.get(HttpHeaderNames.UPGRADE);if (!HttpHeaderValues.WEBSOCKET.contentEqualsIgnoreCase(upgrade)) {throw new WebSocketClientHandshakeException("Invalid handshake response upgrade: " + upgrade, response);}if (!headers.containsIgnoreCase(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)) {throw new WebSocketClientHandshakeException("Invalid handshake response connection: " + headers.get(HttpHeaderNames.CONNECTION), response);}CharSequence accept = headers.get(HttpHeaderNames.SEC_WEBSOCKET_ACCEPT);if (accept == null) {throw new WebSocketClientHandshakeException("Invalid handshake response sec-websocket-accept: null", response);}String expectedAccept = WebSocketUtil.calculateV13Accept(sentNonce);if (!AsciiString.contentEquals(expectedAccept, AsciiString.trim(accept))) {throw new WebSocketClientHandshakeException("Invalid handshake response sec-websocket-accept: " + accept + ", expected: " + expectedAccept, response);}}@Overrideprotected WebSocketFrameDecoder newWebsocketDecoder() {return new WebSocket13FrameDecoder(false, allowExtensions, maxFramePayloadLength(), allowMaskMismatch);}@Overrideprotected WebSocketFrameEncoder newWebSocketEncoder() {return new WebSocket13FrameEncoder(performMasking);}@Overridepublic WebSocketClientHandshaker13 setForceCloseTimeoutMillis(long forceCloseTimeoutMillis) {super.setForceCloseTimeoutMillis(forceCloseTimeoutMillis);return this;}// 生成一个符合 WebSocket 协议要求的 16 字节 Base64 编码的随机值,用作 Sec-WebSocket-Keyprivate static String createNonce() {var nonce = WebSocketUtil.randomBytes(16);return WebSocketUtil.base64(nonce);}
}

WebSocketClientHandshaker

public abstract class WebSocketClientHandshaker {protected static final int DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS = 10000;// 代表握手时的目标地址, 例如 ws://example.com/chatprivate final URI uri;// 控制握手请求和数据帧的格式, 比如 RFC 6455 标准版本private final WebSocketVersion version;// 标记握手是否完成,volatile 保证多线程访问时的可见性private volatile boolean handshakeComplete;// 握手完成后,如果关闭 WebSocket 连接时等待超时,会触发强制关闭。private volatile long forceCloseTimeoutMillis;// 用于标记强制关闭流程是否初始化, 通过 AtomicIntegerFieldUpdater 原子更新private volatile int forceCloseInit;private static final AtomicIntegerFieldUpdater<WebSocketClientHandshaker> FORCE_CLOSE_INIT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(WebSocketClientHandshaker.class, "forceCloseInit");// 标记强制关闭流程是否完成。private volatile boolean forceCloseComplete;// 握手时客户端希望协商的子协议(Subprotocol), 例如视频、聊天子协议名称等private final String expectedSubprotocol;// 握手后服务器协商确认的子协议,握手成功后才有值。private volatile String actualSubprotocol;// 握手请求时使用,方便传递用户自定义信息。protected final HttpHeaders customHeaders;// 最大单个 WebSocket 帧负载长度限制, 防止收到超大数据导致内存溢出。private final int maxFramePayloadLength;// 是否在握手请求中使用绝对 URI 作为 Upgrade URL, 一般用于特殊代理或协议场景private final boolean absoluteUpgradeUrl;// 是否自动生成 Origin 请求头protected final boolean generateOriginHeader;protected WebSocketClientHandshaker(URI uri, WebSocketVersion version, String subprotocol,HttpHeaders customHeaders, int maxFramePayloadLength,long forceCloseTimeoutMillis, boolean absoluteUpgradeUrl, boolean generateOriginHeader) {this.uri = uri;this.version = version;expectedSubprotocol = subprotocol;this.customHeaders = customHeaders;this.maxFramePayloadLength = maxFramePayloadLength;this.forceCloseTimeoutMillis = forceCloseTimeoutMillis;this.absoluteUpgradeUrl = absoluteUpgradeUrl;this.generateOriginHeader = generateOriginHeader;}// WebSocketClientProtocolHandshakeHandler.channelActivepublic Future<Void> handshake(Channel channel) {requireNonNull(channel, "channel");ChannelPipeline pipeline = channel.pipeline();// 检查管道中解码器HttpResponseDecoder decoder = pipeline.get(HttpResponseDecoder.class);if (decoder == null) {HttpClientCodec codec = pipeline.get(HttpClientCodec.class);if (codec == null) {return channel.newFailedFuture(new IllegalStateException("ChannelPipeline does not contain " + "an HttpResponseDecoder or HttpClientCodec"));}}// 检查 URI 和 Header 相关的 Host 与 Originif (uri.getHost() == null) {if (customHeaders == null || !customHeaders.contains(HttpHeaderNames.HOST)) {return channel.newFailedFuture(new IllegalArgumentException("Cannot generate the 'host' header value," + " webSocketURI should contain host or passed through customHeaders"));}if (generateOriginHeader && !customHeaders.contains(HttpHeaderNames.ORIGIN)) {final String originName = HttpHeaderNames.ORIGIN.toString();return channel.newFailedFuture(new IllegalArgumentException("Cannot generate the '" + originName + "' header" + " value, webSocketURI should contain host or disable generateOriginHeader or pass value" + " through customHeaders"));}}// 创建握手请求FullHttpRequest request = newHandshakeRequest(channel.bufferAllocator());// 创建 Promise,异步写出请求Promise<Void> promise = channel.newPromise();channel.writeAndFlush(request).addListener(channel, (ch, future) -> {// 如果写操作成功if (future.isSuccess()) {ChannelPipeline p = ch.pipeline();//找出管道中 HTTP 请求编码器 HttpRequestEncoder 或者 HttpClientCodec,ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);if (ctx == null) {ctx = p.context(HttpClientCodec.class);}if (ctx == null) {promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " + "an HttpRequestEncoder or HttpClientCodec"));return;}// 然后在其后面动态添加 WebSocket 专用的编码器 ws-encoder(由 newWebSocketEncoder() 创建)p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());promise.setSuccess(null);} else {promise.setFailure(future.cause());}});return promise.asFuture();}// WebSocketClientProtocolHandshakeHandler.channelReadpublic final void finishHandshake(Channel channel, FullHttpResponse response) {verify(response);// 服务器返回的子协议CharSequence receivedProtocol = response.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);receivedProtocol = receivedProtocol != null ? AsciiString.trim(receivedProtocol) : null;// 客户端期望的子协议String expectedProtocol = expectedSubprotocol != null ? expectedSubprotocol : "";boolean protocolValid = false;// 如果客户端没指定预期协议,且服务器也没返回协议,视为通过。if (expectedProtocol.isEmpty() && receivedProtocol == null) {protocolValid = true;setActualSubprotocol(expectedSubprotocol);} else if (!expectedProtocol.isEmpty() && receivedProtocol != null && receivedProtocol.length() > 0) {// 如果客户端有期望协议且服务器返回了协议,则判断服务器返回的协议是否在客户端允许的列表中for (String protocol : expectedProtocol.split(",")) {if (AsciiString.contentEquals(protocol.trim(), receivedProtocol)) {protocolValid = true;setActualSubprotocol(receivedProtocol.toString());break;}}}// 如果子协议校验失败,抛出握手异常。if (!protocolValid) {throw new WebSocketClientHandshakeException(String.format("Invalid subprotocol. Actual: %s. Expected one of: %s",receivedProtocol, expectedSubprotocol), response);}// 标记握手完成。setHandshakeComplete();final ChannelPipeline p = channel.pipeline();// 移除 HTTP 消息解压处理器(如 gzip 解压),以及 HTTP 聚合器,WebSocket 不需要这些HttpContentDecompressor decompressor = p.get(HttpContentDecompressor.class);if (decompressor != null) {p.remove(decompressor);}HttpObjectAggregator aggregator = p.get(HttpObjectAggregator.class);if (aggregator != null) {p.remove(aggregator);}// 查找 HTTP 解码器上下文:// 		1. 若是 HttpClientCodec,先调用 removeOutboundHandler(),然后添加 WebSocket 解码器,最后异步移除 HTTP Codec。// 		2. 若是单独的 HttpResponseDecoder,先移除对应的请求编码器,再添加 WebSocket 解码器,异步移除响应解码器。// 新加入的 ws-decoder 是 WebSocket 的解码器,处理 WebSocket 帧。ChannelHandlerContext ctx = p.context(HttpResponseDecoder.class);if (ctx == null) {ctx = p.context(HttpClientCodec.class);if (ctx == null) {throw new IllegalStateException("ChannelPipeline does not contain " +"an HttpRequestEncoder or HttpClientCodec");}final HttpClientCodec codec =  (HttpClientCodec) ctx.handler();codec.removeOutboundHandler();p.addAfter(ctx.name(), "ws-decoder", newWebsocketDecoder());channel.executor().execute(() -> p.remove(codec));} else {if (p.get(HttpRequestEncoder.class) != null) {p.remove(HttpRequestEncoder.class);}final ChannelHandlerContext context = ctx;p.addAfter(context.name(), "ws-decoder", newWebsocketDecoder());channel.executor().execute(() -> p.remove(context.handler()));}}// ...protected abstract FullHttpRequest newHandshakeRequest(BufferAllocator allocator);protected abstract void verify(FullHttpResponse response);protected abstract WebSocketFrameDecoder newWebsocketDecoder();protected abstract WebSocketFrameEncoder newWebSocketEncoder();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/87870.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/87870.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/87870.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

4.2 如何训练⼀个 LLM

⼀般⽽⾔&#xff0c;训练⼀个完整的 LLM 需要经过图1中的三个阶段——Pretrain、SFT 和 RLHF。 4.2.1 Pretrain 预训练任务与架构 任务类型&#xff1a;采用因果语言模型&#xff08;CLM&#xff09;&#xff0c;通过预测下一个 token 进行训练&#xff0c;与传统预训练模型…

Qt中的QObject::moveToThread方法详解

一、QObject::moveToThread方法QObject::moveToThread()是Qt框架中一个非常重要的功能&#xff0c;它允许改变QObject及其子对象的线程关联性。这个功能在多线程编程中特别有用&#xff0c;可以将耗时操作移到工作线程执行&#xff0c;避免阻塞主线程/GUI线程。基本用法void QO…

【9】用户接入与认证配置

本文旨在帮助网络管理员在 SD-WAN 环境中实现安全、稳定的用户接入与认证策略,涵盖本地/远程认证、权限管理、密码策略、SSH、会话控制等关键配置要素。 1.密码策略与账户安全 从 IOS XE SD-WAN 17.3.1 起,Cisco 引入密码强化功能,用于统一用户密码的复杂度与有效性要求。密…

第十六节:第三部分:多线程:线程安全问题、取钱问题的模拟

线程安全问题介绍&#xff1a;取钱的线程安全问题 取钱的线程安全问题 取钱案例需求分析 线程安全问题出现的原因 代码&#xff1a;模拟线程安全问题&#xff08;上述取钱案例&#xff09; Account类&#xff08;账户类&#xff09; package com.itheima.day3_thread_safe;pu…

APE:大语言模型具有人类水平的提示工程能力

摘要 通过以自然语言指令作为条件输入&#xff0c;大型语言模型&#xff08;LLMs&#xff09;展现出令人印象深刻的通用计算能力。然而&#xff0c;任务表现严重依赖于用于引导模型的提示&#xff08;prompt&#xff09;质量&#xff0c;而最有效的提示通常是由人类手工设计的…

X86 CPU 工作模式

1.概述 1.实模式 实模式又称实地址模式&#xff0c;实&#xff0c;即真实&#xff0c;这个真实分为两个方面&#xff0c;一个方面是运行真实的指令&#xff0c;对指令的动作不作区分&#xff0c;直接执行指令的真实功能&#xff0c;另一方面是发往内存的地址是真实的&#xff…

Java设计模式之行为型模式(策略模式)介绍与说明

一、策略模式简介 策略模式&#xff08;Strategy Pattern&#xff09;是一种行为型设计模式&#xff0c;它定义了一系列算法&#xff0c;并将每个算法封装起来&#xff0c;使它们可以相互替换&#xff0c;且算法的变化不会影响使用算法的客户。策略模式让算法独立于使用它的客…

【BIOS+MBR 微内核手写实现】

本文基于BIOS+MBR的架构,从四部分讲解微内核是如何实现的: 1)搭建微内核编译调试环境 2)梳理微内核的代码结构:伪指令讲解 3)手写实现微内核框架,输出简单的字符串 4)讲解微内核启动阶段的具体运行过程 先完成内核工程创建,如下图 我们这里使用nasm风格的汇编编写,…

从C/C++迁移到Go:内存管理思维转变

一、引言 在当今高速发展的软件开发世界中&#xff0c;语言迁移已成为技术进化的常态。作为一名曾经的C/C开发者&#xff0c;我经历了向Go语言转变的全过程&#xff0c;其中最大的认知挑战来自内存管理模式的根本性差异。 我记得第一次接触Go项目时的困惑&#xff1a;没有析构函…

正确设置 FreeRTOS 与 STM32 的中断优先级

在裸机开发&#xff08;非 RTOS&#xff09;时&#xff0c;大多数 STM32 外设的中断优先级通常不需要手动配置&#xff0c;原因如下&#xff1a; ✅ 裸机开发中默认中断优先级行为 特点说明默认中断优先级为 0如果你不设置&#xff0c;STM32 HAL 默认设置所有外设中断为 0&…

EasyExcel之SheetWriteHandler:解锁Excel写入的高阶玩法

引言在 EasyExcel 强大的功能体系中&#xff0c;SheetWriteHandler 接口是一个关键的组成部分。它允许开发者在写入 Excel 的 Sheet 时进行自定义处理&#xff0c;为实现各种复杂的业务需求提供了强大的支持。通过深入了解和运用 SheetWriteHandler 接口&#xff0c;我们能够更…

Python单例模式魔法方法or属性

1.单例模式概念定义:单例模式(Singleton Pattern)是一种创建型设计模式&#xff0c;它确保一个类只能有一个实例&#xff0c;并提供一个全局访问点来获取该实例。这种模式在需要控制资源访问、配置管理或协调系统操作时特别有用。核心特点:私有构造函数&#xff1a;防止外部通过…

【Kubernetes系列】Kubernetes 资源请求(Requests)

博客目录 引言一、资源请求的基本概念1.1 什么是资源请求1.2 请求与限制的区别 二、CPU 请求的深入解析2.1 CPU 请求的单位与含义2.2 CPU 请求的调度影响2.3 CPU 请求与限制的关系 三、内存请求的深入解析3.1 内存请求的单位与含义3.2 内存请求的调度影响3.3 内存请求的特殊性 …

大型语言模型中的自动化思维链提示

摘要 大型语言模型&#xff08;LLMs&#xff09;能够通过生成中间推理步骤来执行复杂的推理任务。为提示演示提供这些步骤的过程被称为思维链&#xff08;CoT&#xff09;提示。CoT提示有两种主要范式。一种使用简单的提示语&#xff0c;如“让我们一步一步思考”&#xff0c;…

Private Set Generation with Discriminative Information(2211.04446v1)

1. 遇到什么问题&#xff0c;解决了什么遇到的问题现有差分隐私生成模型受限于高维数据分布建模的复杂性&#xff0c;合成样本实用性不足。深度生成模型训练依赖大量数据&#xff0c;加入隐私约束后更难优化&#xff0c;且不保证下游任务&#xff08;如分类&#xff09;的最优解…

C++编程语言入门指南

一、C语言概述 C是由丹麦计算机科学家Bjarne Stroustrup于1979年在贝尔实验室开发的一种静态类型、编译式、通用型编程语言。最初被称为"C with Classes"(带类的C)&#xff0c;1983年更名为C。它既具有高级语言的抽象特性&#xff0c;又保留了底层硬件操作能力&…

ZED相机与Foxglove集成:加速机器人视觉调试效率的实用方案

随着机器人技术的发展&#xff0c;实时视觉数据流的高效传输和可视化成为提升系统性能的重要因素。通过ZED相机&#xff08;包括ZED 2i和ZED X&#xff09;与Foxglove Studio平台的结合&#xff0c;开发者能够轻松访问高质量的2D图像、深度图和点云数据&#xff0c;从而显著提高…

目标检测新纪元:DETR到Mamba实战解析

&#x1f680;【实战分享】目标检测的“后 DEⱯ”时代&#xff1a;DETR/DINO/RT-DETR及新型骨干网络探索&#xff08;含示例代码&#xff09; 目标检测从 YOLO、Faster R-CNN 到 Transformer 结构的 DETR&#xff0c;再到 DINO、RT-DETR&#xff0c;近两年出现了许多新趋势&am…

【IOS】XCode创建firstapp并运行(成为IOS开发者)

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍XCode创建firstapp并运行 学其所用&#xff0c;用其所学。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下&#xff0c;下次更新不迷路…

class类和style内联样式的绑定 + 事件处理 + uniapp创建自定义页面模板

目录 一.class类的绑定 1.静态编写 2.动态编写 二.style内联样式的绑定 三.事件处理 1.案例1 2.案例2 四.uniapp创建自定义页面模板 1.为什么要这么做&#xff1f; 2.步骤 ①打开新建页面的界面 ②在弹出的目录下&#xff0c;新建模板文件 ③用HBuilderX打开该模板…