定义与概念

  • Future:表示一个异步操作的结果。它是只读的,意味着你只能查看操作是否完成、是否成功、获取结果或者异常等信息,但不能主动设置操作的结果。
  • Promise:是 Future 的可写扩展。它不仅可以像 Future 一样查看操作结果,还能主动设置操作的成功、失败或者取消状态,并且通知所有的监听器。

用法示例

Future 的用法

Future 通常用于获取异步操作的结果,并且可以添加监听器来处理操作完成后的逻辑。以下是一个简单的示例,展示了如何使用 Future 来处理 DNS 解析结果:

dnsNameResolver.resolve(host).addListener(new FutureListener<InetAddress>() {@Overridepublic void operationComplete(Future<InetAddress> future) throws Exception {if (future.isSuccess()) {InetAddress hostAddress = future.get();// 处理解析成功的结果} else {// 处理解析失败的情况}}
});

在这个示例中,dnsNameResolver.resolve(host) 方法返回一个 Future<InetAddress> 对象,我们通过添加 FutureListener 来监听解析操作的完成状态。当操作完成后,会调用 operationComplete 方法,我们可以在这个方法中处理解析结果。

Promise 的用法

Promise 主要用于主动设置异步操作的结果,并且可以通知所有的监听器。以下是一个示例,展示了如何使用 Promise 来处理 OCSP 查询结果:

final Promise<OCSPResp> responsePromise = eventLoop.newPromise();// 异步操作
dnsNameResolver.resolve(host).addListener(new FutureListener<InetAddress>() {@Overridepublic void operationComplete(Future<InetAddress> future) throws Exception {if (future.isSuccess()) {// 处理解析成功的结果InetAddress hostAddress = future.get();final ChannelFuture channelFuture = bootstrap.connect(hostAddress, port);channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {if (future.isSuccess()) {// 处理连接成功的结果responsePromise.trySuccess(result); // 设置操作成功的结果} else {responsePromise.tryFailure(new IllegalStateException("Connection to OCSP Responder Failed", future.cause())); // 设置操作失败的结果}}});} else {responsePromise.tryFailure(future.cause()); // 设置操作失败的结果}}
});// 添加监听器来处理操作结果
responsePromise.addListener(new FutureListener<OCSPResp>() {@Overridepublic void operationComplete(Future<OCSPResp> future) throws Exception {if (future.isSuccess()) {OCSPResp resp = future.get();// 处理操作成功的结果} else {// 处理操作失败的情况}}
});

在这个示例中,我们首先创建了一个 Promise<OCSPResp> 对象 responsePromise,然后在异步操作完成后,根据操作结果调用 trySuccesstryFailure 方法来设置 Promise 的状态。最后,我们添加了一个 FutureListener 来监听 Promise 的完成状态,并处理操作结果。

区别总结

  • 可写性:
    • Future 是只读的,只能查看异步操作的结果,不能主动设置操作的状态。
    • Promise 是可写的,可以主动设置操作的成功、失败或者取消状态。
  • 用途:
    • Future 主要用于获取异步操作的结果,并且可以添加监听器来处理操作完成后的逻辑。
    • Promise 主要用于在异步操作完成后,主动设置操作的结果,并且通知所有的监听器。
  • 方法差异:
    • Future 提供了一些方法来查看操作的状态,如 isDone()isSuccess()cause() 等。
    • Promise 除了继承了 Future 的方法外,还提供了一些方法来设置操作的结果,如 setSuccess()trySuccess()setFailure()tryFailure() 等。

代码中的体现

在提供的代码片段中,InflightNameResolver 类的 resolve 方法使用了 Promise 来处理 DNS 解析结果:

private <U> Promise<U> resolve(final ConcurrentMap<String, Promise<U>> resolveMap,final String inetHost, final Promise<U> promise, boolean resolveAll) {// ...if (resolveAll) {@SuppressWarnings("unchecked")final Promise<List<T>> castPromise = (Promise<List<T>>) promise; // U is List<T>delegate.resolveAll(inetHost, castPromise);} else {@SuppressWarnings("unchecked")final Promise<T> castPromise = (Promise<T>) promise; // U is Tdelegate.resolve(inetHost, castPromise);}// ...return promise;
}

在这个方法中,我们可以看到 Promise 被用于传递异步操作的结果,并且可以在操作完成后主动设置操作的状态。

另外,PromiseNotifier 类展示了如何使用 Promise 来通知多个监听器:

public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureListener<F> {private final Promise<? super V>[] promises;public PromiseNotifier(Promise<? super V>... promises) {this.promises = promises;}@Overridepublic void operationComplete(F future) throws Exception {if (future.isSuccess()) {V result = future.get();for (Promise<? super V> p : promises) {PromiseNotificationUtil.trySuccess(p, result, null);}} else if (future.isCancelled()) {for (Promise<? super V> p : promises) {PromiseNotificationUtil.tryCancel(p, null);}} else {Throwable cause = future.cause();for (Promise<? super V> p : promises) {PromiseNotificationUtil.tryFailure(p, cause, null);}}}
}

在这个类中,我们可以看到 Promise 被用于通知多个监听器操作的结果,并且可以根据操作的状态调用不同的方法来设置 Promise 的状态。

综上所述,FuturePromise 在 Netty 中都是非常重要的组件,它们分别用于处理异步操作的不同方面。通过合理使用 FuturePromise,可以有效地处理异步操作的结果,提高代码的可读性和可维护性。

处理多个顺序依赖的异步操作

假设我们需要完成一个包含三个步骤的操作流程:

  1. 连接到服务器
  2. 发送认证请求并等待认证成功
  3. 发送业务数据并接收响应

这三个步骤必须按顺序执行,后一个步骤依赖于前一个步骤的成功完成。以下是实现这种依赖关系的代码示例:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class ChannelPromiseDependencyExample {private static final String SERVER_HOST = "localhost";private static final int SERVER_PORT = 8080;public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new StringDecoder(),new StringEncoder(),new ClientHandler());}});// 创建主 Promise,用于跟踪整个操作流程的完成状态ChannelPromise mainPromise = bootstrap.config().group().next().newPromise();// 开始执行依赖操作链connectAndProcess(bootstrap, mainPromise);// 等待整个操作流程完成mainPromise.await();if (mainPromise.isSuccess()) {System.out.println("所有操作成功完成");} else {System.out.println("操作失败: " + mainPromise.cause());}} finally {group.shutdownGracefully();}}private static void connectAndProcess(Bootstrap bootstrap, ChannelPromise mainPromise) {// 步骤 1: 连接到服务器ChannelFuture connectFuture = bootstrap.connect(SERVER_HOST, SERVER_PORT);// 为连接操作添加监听器connectFuture.addListener((ChannelFuture future) -> {if (future.isSuccess()) {Channel channel = future.channel();System.out.println("成功连接到服务器");// 步骤 2: 发送认证请求ChannelPromise authPromise = channel.newPromise();sendAuthRequest(channel, authPromise);// 为认证操作添加监听器authPromise.addListener((ChannelFuture authFuture) -> {if (authFuture.isSuccess()) {System.out.println("认证成功");// 步骤 3: 发送业务数据ChannelPromise businessPromise = channel.newPromise();sendBusinessData(channel, businessPromise);// 为业务操作添加监听器businessPromise.addListener((ChannelFuture businessFuture) -> {if (businessFuture.isSuccess()) {System.out.println("业务数据处理成功");mainPromise.setSuccess(); // 标记整个操作成功} else {mainPromise.setFailure(businessFuture.cause()); // 标记整个操作失败}channel.close(); // 关闭连接});} else {mainPromise.setFailure(authFuture.cause()); // 标记整个操作失败channel.close(); // 关闭连接}});} else {mainPromise.setFailure(future.cause()); // 标记整个操作失败}});}private static void sendAuthRequest(Channel channel, ChannelPromise authPromise) {// 发送认证请求channel.writeAndFlush("AUTH username password").addListener(future -> {if (future.isSuccess()) {System.out.println("认证请求已发送");// 认证结果将在 ChannelHandler 中处理} else {authPromise.setFailure(future.cause()); // 认证请求发送失败}});}private static void sendBusinessData(Channel channel, ChannelPromise businessPromise) {// 发送业务数据channel.writeAndFlush("DATA some_business_data").addListener(future -> {if (future.isSuccess()) {System.out.println("业务数据已发送");// 业务响应将在 ChannelHandler 中处理} else {businessPromise.setFailure(future.cause()); // 业务数据发送失败}});}static class ClientHandler extends SimpleChannelInboundHandler<String> {private ChannelPromise authPromise;private ChannelPromise businessPromise;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 通道激活时,可以获取外部的 Promise 实例// 实际应用中可能需要通过构造函数或其他方式传递}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("收到服务器响应: " + msg);// 根据响应内容判断操作结果if (msg.startsWith("AUTH_SUCCESS")) {if (authPromise != null) {authPromise.setSuccess(); // 认证成功}} else if (msg.startsWith("AUTH_FAILURE")) {if (authPromise != null) {authPromise.setFailure(new Exception("认证失败: " + msg)); // 认证失败}} else if (msg.startsWith("DATA_SUCCESS")) {if (businessPromise != null) {businessPromise.setSuccess(); // 业务数据处理成功}} else if (msg.startsWith("DATA_FAILURE")) {if (businessPromise != null) {businessPromise.setFailure(new Exception("业务数据处理失败: " + msg)); // 业务数据处理失败}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();// 设置所有未完成的 Promise 为失败状态if (authPromise != null && !authPromise.isDone()) {authPromise.setFailure(cause);}if (businessPromise != null && !businessPromise.isDone()) {businessPromise.setFailure(cause);}ctx.close();}}
}

关键点解析

  1. 创建和使用 ChannelPromise
    • 通过 EventLoop.newPromise()Channel.newPromise() 创建 ChannelPromise 实例。
    • mainPromise 用于跟踪整个操作流程的完成状态。
  2. 处理依赖关系
    • 使用 addListener() 方法为每个异步操作添加监听器。
    • 在前一个操作的监听器中检查操作结果,只有成功时才继续执行下一个操作。
    • 如果某个操作失败,立即设置主 Promise 为失败状态并终止后续操作。
  3. 在 ChannelHandler 中处理响应
    • ClientHandler 中接收服务器响应,并根据响应内容设置相应的 Promise 状态。
    • 这样可以将异步响应与对应的操作关联起来。
  4. 异常处理
    • exceptionCaught() 方法中捕获异常,并设置所有未完成的 Promise 为失败状态。

更复杂的依赖关系处理

对于更复杂的依赖关系,可以使用 PromiseCombiner 来组合多个 Promise,并在所有 Promise 都成功完成后执行后续操作。以下是一个使用 PromiseCombiner 的示例:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class PromiseCombinerExample {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new StringDecoder(),new StringEncoder(),new SimpleChannelInboundHandler<String>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("收到消息: " + msg);}});}});// 连接到多个服务器ChannelFuture future1 = bootstrap.connect("server1.example.com", 8080);ChannelFuture future2 = bootstrap.connect("server2.example.com", 8080);ChannelFuture future3 = bootstrap.connect("server3.example.com", 8080);// 创建 PromiseCombiner 来组合多个 FuturePromiseCombiner combiner = new PromiseCombiner(group.next());combiner.add(future1);combiner.add(future2);combiner.add(future3);// 创建一个 Promise 来接收组合结果ChannelPromise allConnectedPromise = group.next().newPromise();combiner.finish(allConnectedPromise);// 为组合结果添加监听器allConnectedPromise.addListener(future -> {if (future.isSuccess()) {System.out.println("所有连接都已成功建立");// 执行后续操作} else {System.out.println("至少有一个连接失败: " + future.cause());}});// 等待所有操作完成allConnectedPromise.await();} finally {group.shutdownGracefully();}}
}

通过 ChannelPromise 和相关工具,我们可以在 Netty 中灵活处理多个异步操作的依赖关系:

  1. 顺序依赖:通过在前一个操作的监听器中启动下一个操作,实现顺序执行。
  2. 并行依赖:使用 PromiseCombiner 等工具组合多个并行操作,等待所有操作完成后执行后续逻辑。
  3. 异常处理:在每个步骤中正确处理异常,并传播给主 Promise
  4. 状态管理:使用 Promise 跟踪每个操作的状态,确保操作按预期完成。

这种方式使得异步代码更加清晰和易于维护,避免了回调地狱,提高了代码的可读性和可维护性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/92822.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/92822.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/92822.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微算法科技(NASDAQ:MLGO)采用分布式哈希表优化区块链索引结构,提高区块链检索效率

随着区块链技术的快速发展&#xff0c;其在各个领域的应用越来越广泛。然而&#xff0c;区块链数据的存储和检索效率问题一直是制约其发展的瓶颈之一。为了解决这一问题&#xff0c;微算法科技(NASDAQ&#xff1a;MLGO)采用了分布式哈希表&#xff08;DHT&#xff09;技术来优化…

Jmeter的元件使用介绍:(三)配置元件详解01

Jmeter的配置元件有非常多&#xff0c;常用的有&#xff1a;信息头管理器、Cookie管理器、用户定义的变量、Http请求默认值、JDBC Connection Configuration、CSV 数据文件设置、计数器等&#xff0c;本文会对这些常用的配置元件一一介绍&#xff0c;还有其他很多配置元件&…

git 连接GitHub仓库

一、安装 git 包在官网下载 git 包二、通过SSH密钥与GitHub远程仓库连接1. 检查本地 SSH 密钥是否存在ls -al ~/.ssh如果看到 id_rsa 和 id_rsa.pub&#xff0c;说明已有密钥。2.如果没有&#xff0c;生成新的 SSH 密钥&#xff1a;ssh-keygen -t ed25519 -C "your_email…

如何通过AI扫描代码中的问题

代码质量其实在需求高压&#xff0c;业务快速迭代的场景下往往容易被人忽视的问题&#xff0c;大家的编码习惯和规范也经常会各有喜好&#xff0c;短期之内获取看不出来什么问题&#xff0c;但长此以往就会发现&#xff0c;屎山逐步成型了&#xff0c;而线上代码跑着往往就不想…

Java 大视界 -- Java 大数据机器学习模型在金融衍生品市场波动特征挖掘与交易策略创新中的应用(363)

Java 大视界 -- Java 大数据机器学习模型在金融衍生品市场波动特征挖掘与交易策略创新中的应用&#xff08;363&#xff09;引言&#xff1a;正文&#xff1a;一、Java 构建的金融数据处理架构1.1 多源异构数据实时融合1.2 新闻舆情与市场冲击建模二、Java 驱动的波动特征挖掘与…

Cartographer安装测试与模块开发(三)--Cartographer在Gazebo仿真环境下的建图以及建图与定位阶段问题(实车也可参考)

参数介绍之所以要首先介绍参数而不是实操&#xff0c;是因为大部分建图失败、漂移基本上都是参数设置错误引起的&#xff0c;或者说大部分都是TF存在问题&#xff0c;主要是坐标系Frame之间有冲突或者对不上等原因导致的&#xff0c;因此把参数放在前面介绍&#xff0c;了解了参…

uniapp nvue开发App 横竖屏切换丢失上下文导致 setTimeout和clearTimeout报错

报错内容如下 [JS Framework] Failed to find taskCenter (35). [JS Framework] Failed to execute the callback function:TypeError: c.clearTimeout is not a function reportJSException >>>> exception function:__WEEX_CALL_JAVASCRIPT__, exception:JavaSc…

Mirauge3D 赋能:全自动建模,让城市规划与建筑设计拥有高分辨率实景三维模型

在数字化浪潮席卷各行各业的当下&#xff0c;高精度、多元化的空间数据已成为基础测绘、智慧城市建设、自然资源管理等领域高质量发展的核心支撑。从城市交通网络的智能规划到国土空间的优化配置&#xff0c;从灾害监测的精准预警到生态环境保护的科学决策&#xff0c;空间数据…

Javaweb————学习javaweb的预备知识

❤️❤️❤️一.javase,javaweb,javaee的区别和联系 &#x1f499;&#x1f499;&#x1f499;javase: 通俗的来讲就是java技术栈&#xff0c;做java相关开发的基础&#xff0c;比如javaweb&#xff0c;javaee开发都是必备javase的基础的&#xff0c;包括java语言基础&#xff…

zabbix服务自动发现、自动注册及配置钉钉告警(小白的“升级打怪”成长之路)

目录 一、自动发现及自动注册 1、自动发现 2、自动注册规则 二、监控告警并发送电子邮件 1、设定发邮件的地址 2、设定发邮件的用户 3、设定监控及触发的条件 4、开始告警并设置触发发邮件 三、钉钉告警 1、配置zabbix-server 2、配置监控及触发 3、web页面操作 4、…

OSPF多区域

OSPF多区域划分的必要性 OSPF单区域存在的问题 LSDB 庞大&#xff0c;占用内存大&#xff0c;SPF计算开销大。 LSA洪泛范围大&#xff0c;拓扑变化影响范围大。 路由不能被汇总&#xff0c;路由表庞大&#xff0c;查找路由开销大 解决办法 划分区域可以解决上述问题 每个区域独…

质数、因数、最大公约数经典问题整理

1、计数质数 MX 5000000 is_prime [1] * MX is_prime[0] is_prime[1] 0 for i in range(2, MX):if is_prime[i]:for j in range(i * i, MX, i):is_prime[j] 0class Solution:def countPrimes(self, n: int) -> int:return sum(is_prime[:n]) 2、序列中不同最大公约数的…

Java NIO FileChannel在大文件传输中的性能优化实践指南

Java NIO FileChannel在大文件传输中的性能优化实践指南 在现代分布式系统中&#xff0c;海量数据的存储与传输成为常见需求。Java NIO引入的FileChannel提供了高效的文件读写能力&#xff0c;尤其适合大文件传输场景。本文从原理深度解析出发&#xff0c;结合生产环境实战经验…

SQLite Insert 语句详解

SQLite Insert 语句详解 SQLite 是一种轻量级的数据库管理系统,它以其简洁的设计、强大的功能和易于使用而闻名。在 SQLite 中,INSERT 语句用于向数据库表中添加新数据。本文将详细介绍 SQLite 的 INSERT 语句,包括其基本语法、使用方法以及一些高级特性。 基本语法 SQLi…

git更新内核补丁完整指南

Git操作完整指南 📋 目录 项目概述 Git基础配置 日常操作流程 补丁更新操作 分支管理 冲突解决 常见问题 最佳实践 命令速查表 🎯 项目概述 </

关于回归决策树CART生成算法中的最优化算法详解

首先&#xff0c;一共比如有M个特征&#xff0c;N个样本&#xff0c;对于每一个特征j&#xff0c;遍历其中的N个样本&#xff0c;得到N个值中&#xff0c;最小的值&#xff0c;作为这个特征的最优切分点&#xff0c;而其中的c1&#xff0c;c2是可以直接得到的。然后&#xff0c…

Ubuntu 环境下创建并启动一个 MediaMTX 的 systemd 服务

文章目录一、简介二、安装及使用三、创建系统服务小结一、简介 MediaMTX 是一个现代、高性能、跨平台的 流媒体服务器&#xff0c;主要用于接收、转发、转码和分发 音视频流&#xff0c;支持多种协议。它的前身是 rtsp-simple-server&#xff0c;后来重命名为 MediaMTX&#x…

在React中,函数式组件和类组件各有优缺点

函数式组件&#xff1a;无this&#xff0c;无生命周期&#xff0c;配合使用useEffect&#xff0c; 可使用Hooks。 类组件&#xff1a;有生命周期&#xff0c;状态管理&#xff0c;无Hooks&#xff0c;适用于需要明确生命周期方法和实例方法的场景。 函数式组件 优点&#xff1a…

【SketchUp插件推荐】Profile Builder 4.0 中文版下载安装使用教程(含语言设置图解)

一、插件简介 Profile Builder 4.0 是一款适用于 SketchUp 2017-2024 的高效参数化建模插件&#xff0c;中文名称为「参数化造型建模工具」。该插件基于参数化设计原理&#xff0c;允许用户通过简单的路径定义和参数设定&#xff0c;快速生成智能模型&#xff0c;从而大幅提高…

【小沐学GIS】基于Unity3d绘制三维数字地球Earth(Unity3d、OpenGL、GIS)

&#x1f37a;三维数字地球GIS系列相关文章如下&#x1f37a;&#xff1a;1【小沐学GIS】基于C绘制三维数字地球Earth&#xff08;OpenGL、glfw、glut&#xff09;第一期2【小沐学GIS】基于C绘制三维数字地球Earth&#xff08;OpenGL、glfw、glut&#xff09;第二期3【小沐学GI…