基于Dubbo 3.1,详细介绍了Dubbo Consumer接收服务调用响应

此前我们学习了Dubbo Provider处理服务调用请求的流程,现在我们来学习Dubbo Consumer接收服务调用响应流程。

实际上接收请求和接收响应同属于接收消息,它们的流程的很多步骤是一样的。下面我们仅分析关键步骤。

Dubbo 3.x服务调用源码:

  1. Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
  2. Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用
  3. Dubbo 3.x源码(31)—Dubbo消息的编码解码
  4. Dubbo 3.x源码(32)—Dubbo Provider处理服务调用请求源码
  5. Dubbo 3.x源码(33)—Dubbo Consumer接收服务调用响应

Dubbo 3.x服务引用源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
  3. Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
  4. Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
  5. Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
  6. Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
  7. Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
  8. Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
  9. Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
  10. Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
  11. Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url

Dubbo 3.x服务发布源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
  6. Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
  7. Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
  8. Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布

文章目录

  • AllChannelHandler#received分发任务
    • getPreferredExecutorService获取首选线程池
  • ThreadlessExecutor#execute执行
  • HeaderExchangeHandler#received处理消息
  • HeaderExchangeHandler#handleResponse处理响应
  • DefaultFuture#received处理响应
  • DefaultFuture#doReceived处理响应

AllChannelHandler#received分发任务

将当前消息包装为一个ChannelEventRunnable分发给对应的线程池执行,这里的线程池就是dubbo业务线程池,到此IO线程的任务结束。

这种方式实现了线程资源的隔离,释放了IO线程,可以快速处理更多的IO操作,提升了系统吞吐量。

/*** AllChannelHandler的方法* <p>* 处理普通rpc请求请求** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//获取对应的线程池,可能是ThreadlessExecutorExecutorService executor = getPreferredExecutorService(message);try {//创建一个线程任务ChannelEventRunnable,通过线程池执行//这里的handler是DecodeHandlerexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (message instanceof Request && t instanceof RejectedExecutionException) {sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}

getPreferredExecutorService获取首选线程池

该方法获取处理业务的首选线程池,目前,这种方法主要是为了方便消费者端的线程模型而定制的。这是Dubo2.7.5之后的线程模型的优化:

  1. 对于响应消息,那么从DefaultFuture的静态字段缓存映射FUTURES中获取请求id对应的DefaultFuture。请求的id就是对应的响应的id。
    1. 然后获取执DefaultFuture的执行器,对于默认的同步请求,那自然是ThreadlessExecutor,这里面阻塞着发起同步调用的线程,将回调直接委派给发起调用的线程。对于异步请求,则获取异步请求的线程池。
    2. 因此后续的处理包括请求体的解码都是由发起调用的线程来执行,这样减轻了业务线程池的压力。后面我们将消费者接受响应的时候,会讲解对应的源码。
  2. 对于请求消息,则使用共享executor执行后续逻辑。对于共享线程池,默认为FixedThreadPool,固定200线程,阻塞队列长度为0,拒绝策略为打印异常日志并且抛出异常。

对于同步阻塞请求的响应,这是默认的请求方式,将会获取ThreadlessExecutor执行器,对于异步请求的响应,将会获取一个多线程的线程池。

/*** WrappedChannelHandler的方法* <p>* 目前,这种方法主要是为了方便消费者端的线程模型而定制的。* 1. 使用ThreadlessExecutor,又名,将回调直接委派给发起调用的线程。* 2. 使用共享executor执行回调。** @param msg 消息* @return 执行器*/
public ExecutorService getPreferredExecutorService(Object msg) {//如果是响应消息if (msg instanceof Response) {Response response = (Response) msg;//从DefaultFuture的静态字段缓存映射FUTURES中获取请求id对应的DefaultFutureDefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());// a typical scenario is the response returned after timeout, the timeout response may have completed the future//一个典型的场景是响应超时后返回,超时后的响应可能已经完成if (responseFuture == null) {//获取当前服务器或客户端的共享执行器return getSharedExecutorService();} else {//获取执future的执行器,对于默认的同步请i去,那自然是ThreadlessExecutor,这里面阻塞着发起同步调用的线程ExecutorService executor = responseFuture.getExecutor();if (executor == null || executor.isShutdown()) {//获取当前服务器或客户端的共享执行器executor = getSharedExecutorService();}return executor;}} else {//获取当前服务器或客户端的共享执行器return getSharedExecutorService();}
}/*** get the shared executor for current Server or Client** @return*/
public ExecutorService getSharedExecutorService() {// Application may be destroyed before channel disconnected, avoid create new application model// see https://github.com/apache/dubbo/issues/9127//在断开通道之前,应用程序可能被销毁,避免创建新的应用程序模型if (url.getApplicationModel() == null || url.getApplicationModel().isDestroyed()) {return GlobalResourcesRepository.getGlobalExecutorService();}// note: url.getOrDefaultApplicationModel() may create new application modelApplicationModel applicationModel = url.getOrDefaultApplicationModel();ExecutorRepository executorRepository =applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();//从执行器仓库中根据url获取对应的执行器,默认INTERNAL_SERVICE_EXECUTOR对应的执行器ExecutorService executor = executorRepository.getExecutor(url);if (executor == null) {executor = executorRepository.createExecutorIfAbsent(url);}return executor;
}

ThreadlessExecutor#execute执行

还记得我们此前学习Dubbo Consumer发起请求的流程吗?对于同步请求,在发送请求之后,在AbstractInvoker#waitForResultIfSync方法中将会执行异步转同步等待,具体的等待方法就是ThreadlessExecutor#waitAndDrain方法。

waitAndDrain方法中,请求调用线程将会执行queue.take()方法尝试获取一个任务,如果没有任务,那么当前线程等待直到任务队列中有一个任务,那么获取并执行。实际上,这里等待的任务就是请求对应的响应结果。

ThreadlessExecutor#execute方法在响应返回之后会执行。首先将ChannelEventRunnable包装为RunnableWrapper,对于run方法添加了try-catch,不会抛出异常仅仅打印日志。然后判断如果同步调用线程没有处于等待状态,那么当前线程直接执行该线程任务。

否则将当前任务加入到阻塞队列,如果同步调用线程还在因为调用waitAndDrain方而处于等待状态,那么将会因为队列中添加了元素而被唤醒,进而执行该线程任务。

/*** 如果调用线程仍在等待回调任务,则将该任务添加到阻塞队列中以等待调度。否则,直接提交到共享回调执行器。** @param runnable 可执行的任务ChannelEventRunnable*/
@Override
public void execute(Runnable runnable) {//包装RunnableWrapper,对于run方法添加了try-catch,不会抛出异常仅仅打印日志runnable = new RunnableWrapper(runnable);synchronized (lock) {//如果同步调用线程没有处于等待状态if (!isWaiting()) {//那么当前线程直接执行该线程任务runnable.run();return;}/** 将当前任务加入到阻塞队列,如果同步调用线程还在因为调用waitAndDrain方而处于等待状态* 那么将会因为队列中添加了元素而被唤醒,进而执行该线程任务*/queue.add(runnable);}
}

HeaderExchangeHandler#received处理消息

HeaderExchangeHandler#received方法对于消息进行分类并调用不同的方法继续处理。

对于请求消息,如果是双向消息,那么调用handleRequest方法继续处理,将会创建Response对象,然后调用dubboProtocol.requestHandler完成请求处理获取结果,并将结果封装到Response中后返回给客户端。如果是单向消息则仅仅调用dubboProtocol.requestHandler完成请求处理即可。

对于响应消息,将会调用DefaultFuture#received方法处理,此时就会根据响应id获取对应的DefaultFuture,将响应结果设置进去。这里的源码我们后面讲consumer获取响应结果的时候再讲解。

/*** HeaderExchangeHandler的方法* <p>* 分类处理消息** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);//请求消息if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {//处理事件消息handlerEvent(channel, request);} else {if (request.isTwoWay()) {//额外处理双向消息handleRequest(exchangeChannel, request);} else {//处理单向消息,直接调用下层DubboProtocol.requestHandler#received方法handler.received(exchangeChannel, request.getData());}}}//响应消息else if (message instanceof Response) {handleResponse(channel, (Response) message);}//字符串else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (StringUtils.isNotEmpty(echo)) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}
}

HeaderExchangeHandler#handleResponse处理响应

该方法用于处理响应,将会判断如果不是心跳响应,那么继续通过DefaultFuture#received处理响应。

我们在此前学习Dubbo Consumer发起请求的流程的时候就讲过,超时检查任务TimeoutCheckTask#timeoutCheck方法中,如果请求超时,那么同样会调用DefaultFuture#received方法处理超时响应。

/*** HeaderExchangeHandler的方法* <p>* 处理响应消息** @param channel  NettyChannel* @param response Response*/
static void handleResponse(Channel channel, Response response) throws RemotingException {//如果不是心跳响应,那么继续通过DefaultFuture#received处理响应if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);}
}

DefaultFuture#received处理响应

当请求结果返回或者请求超时之后,将会通过DefaultFuture#received处理响应。

/*** DefaultFuture的方法* <p>* 处理响应结果** @param channel  NettyChannel* @param response Response*/
public static void received(Channel channel, Response response) {//调用另一个received方法,timeout参数为falsereceived(channel, response, false);
}public static void received(Channel channel, Response response, boolean timeout) {try {//由于获得了响应,那么将该请求的缓存从FUTURES中移除DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {//该请求的超时检查任务Timeout t = future.timeoutCheckTask;//如果没有超时,那么if (!timeout) {// decrease Time//取消该任务t.cancel();}//通过future处理响应,设置结果future.doReceived(response);} else {logger.warn("The timeout response finally returned at "+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))+ ", response status is " + response.getStatus()+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");}} finally {//移除正在处理的channel缓存CHANNELS.remove(response.getId());}
}

DefaultFuture#doReceived处理响应

设置响应结果并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程。随后AsyncRpcResult#getAppResponse方法可以获取响应结果并返回。

/*** DefaultFuture的方法* <p>* 处理响应设置结果*/
private void doReceived(Response res) {if (res == null) {throw new IllegalStateException("response cannot be null");}//如果响应成功,则将设置响应结果并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程if (res.getStatus() == Response.OK) {this.complete(res.getResult());}//如果是客户端或者服务端超时,抛出超时异常并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));}//抛出远程调用异常并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程else {this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));}// the result is returning, but the caller thread may still wait// to avoid endless waiting for whatever reason, notify caller thread to return.//结果正在返回,但是调用者线程可能仍在等待,为了避免无休止的等待,通知调用线程返回。if (executor != null && executor instanceof ThreadlessExecutor) {ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;if (threadlessExecutor.isWaiting()) {threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +" which is not an expected state, interrupt the thread manually by returning an exception."));}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/92873.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/92873.shtml
英文地址,请注明出处:http://en.pswp.cn/web/92873.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

栈和队列:数据结构中的基础与应用​

栈和队列&#xff1a;数据结构中的基础与应用在计算机科学的领域中&#xff0c;数据结构犹如大厦的基石&#xff0c;支撑着各类复杂软件系统的构建。而栈和队列作为两种基础且重要的数据结构&#xff0c;以其独特的特性和广泛的应用&#xff0c;在程序设计的舞台上扮演着不可或…

服务端配置 CORS解决跨域问题的原理

服务端配置 CORS&#xff08;跨域资源共享&#xff09;的原理本质是 浏览器与服务器之间的安全协商机制。其核心在于服务器通过特定的 HTTP 响应头声明允许哪些外部源&#xff08;Origin&#xff09;访问资源&#xff0c;浏览器根据这些响应头决定是否放行跨域请求。以下是详细…

Unity笔记(五)知识补充——场景切换、退出游戏、鼠标隐藏锁定、随机数、委托

写在前面&#xff1a;写本系列(自用)的目的是回顾已经学过的知识、记录新学习的知识或是记录心得理解&#xff0c;方便自己以后快速复习&#xff0c;减少遗忘。主要是C#代码部分。十七、场景切换和退出游戏1、场景切换场景切换使用方法&#xff1a; SceneManager.LoadScene()&a…

用 Spring 思维快速上手 DDD——以 Kratos 为例的分层解读

用 Spring 思维理解 DDD —— 以 Kratos 为参照 ​ 在此前的学习工作中&#xff0c;使用的开发框架一直都是 SpringBoot&#xff0c;对 MVC 架构几乎是肌肉记忆&#xff1a;Controller 接请求&#xff0c;Service 写业务逻辑&#xff0c;Mapper 操作数据库&#xff0c;这套套路…

docspace|Linux|使用docker完全离线化部署onlyoffice之docspace文档协作系统(全网首发)

一、 前言 书接上回&#xff0c;Linux|实用工具|onlyoffice workspace使用docker快速部署&#xff08;离线和定制化部署&#xff09;-CSDN博客&#xff0c;如果是小公司或者比如某个项目组内部使用&#xff0c;那么&#xff0c;使用docspace这个文档协同系统是非常合适的&…

【教程】如何高效提取胡萝卜块根形态和颜色特征?

胡萝卜是全球不可或缺的健康食材和重要的经济作物&#xff0c; 从田间到餐桌&#xff0c;从鲜食到深加工&#xff0c;胡萝卜在现代人的饮食和健康中扮演着极其重要的角色&#xff0c;通过量化块根形态和色泽均匀性&#xff0c;可实现对高产优质胡萝卜品种的快速筛选。工具/材料…

Python初学者笔记第二十四期 -- (面向对象编程)

第33节课 面向对象编程 1. 面向对象编程基础 1.1 什么是面向对象编程面向过程&#xff1a;执行者 耗时 费力 结果也不一定完美 面向对象&#xff1a;指挥者 省时 省力 结果比较完美面向对象编程(Object-Oriented Programming, OOP)是一种编程范式&#xff0c;它使用"对象&…

Go 语言 里 `var`、`make`、`new`、`:=` 的区别

把 Go 语言 里 var、make、new、: 的区别彻底梳理一下。1️⃣ var 作用&#xff1a;声明变量&#xff08;可以带初始值&#xff0c;也可以不带&#xff09;。语法&#xff1a; var a int // 声明整型变量&#xff0c;默认值为 0 var b string // 默认值 ""…

计算机网络---IP(互联网协议)

一、IP协议概述 互联网协议&#xff08;Internet Protocol&#xff0c;IP&#xff09;是TCP/IP协议族的核心成员&#xff0c;位于OSI模型的网络层&#xff08;第三层&#xff09;&#xff0c;负责将数据包从源主机传输到目标主机。它是一种无连接、不可靠的协议&#xff0c;提供…

DataFun联合开源AllData社区和开源Gravitino社区将在8月9日相聚数据治理峰会论坛

&#x1f525;&#x1f525; AllData大数据产品是可定义数据中台&#xff0c;以数据平台为底座&#xff0c;以数据中台为桥梁&#xff0c;以机器学习平台为中层框架&#xff0c;以大模型应用为上游产品&#xff0c;提供全链路数字化解决方案。 ✨杭州奥零数据科技官网&#xff…

【工具】通用文档转换器 推荐 Markdown 转为 Word 或者 Pdf格式 可以批量或者通过代码调用

【工具】通用文档转换器 推荐 可以批量或者通过代码调用 通用文档转换器 https://github.com/jgm/pandoc/ Pandoc - index 下载地址 https://github.com/jgm/pandoc/releases 使用方法: 比如 Markdown 转为 Word 或者 Pdf格式 pandoc -s MANUAL.txt -o example29.docx …

【UEFI系列】Super IO

文章目录一、什么是Super IO二、Super IO的作用常见厂商三、逻辑设备控制如何访问SIO逻辑设备的配置寄存器具体配置数值四、硬件监控&#xff08;hardware monitor&#xff09;一、什么是Super IO Super Input/Output超级输入输出控制器。 通过LPC&#xff08;low pin count&a…

飞算 JavaAI 2.0.0 测评:自然语言编程如何颠覆传统开发?

一、前言 在AI技术高速发展的今天&#xff0c;编程方式正在经历一场革命。传统的“手写代码”模式逐渐被AI辅助开发取代&#xff0c;而飞算JavaAI 2.0.0的推出&#xff0c;更是让自然语言编程成为现实。 作为一名长期使用Java开发的程序员&#xff0c;我决定深度体验飞算Java…

Dubbo + zk 微服务

一、安装zk注册中心 win版本&#xff1a;windows环境下安装zookeeper教程详解&#xff08;单机版&#xff09;-CSDN博客 linux版本&#xff1a; 二、服务提供方搭建 引入dubbo和zk依赖 提供接口 使用注解方式实现接口级注册到zk&#xff0c;而springcloud是将服务注册到注册…

聆思duomotai_ap sdk适配dooiRobot

一、说明 1、duomotai_ap介绍 duomotai_ap是一个针对多模态开发板&#xff08;如 CSK6-MIX 开发板&#xff09;的大模型 AI 开发套件 SDK&#xff0c;主要用于开发语音、视觉等多模态 AI 应用。 2、dooiRobot介绍 基于Doly 机器人的经典外观设计&#xff0c;采用聆思CSK6011A…

Photoshop软件打开WebP文件格的操作教程

Photoshop软件打开WebP文件格的操作教程&#xff0c;好吧&#xff0c;这是英文原版&#xff1a; Photoshop 23.2 原生支持 WebP 格式&#xff0c;无需插件即可打开、编辑和保存 WebP 文件。用户可通过“文件 > 另存为副本”选择 WebP 格式&#xff0c;调整无损/有损压缩及质…

【数据结构】——顺序表链表(超详细解析!!!)

目录一. 前言二. 顺序表1. 顺序表的特点2. 代码实现三. 链表1. 单向链表代码实现2.双向链表代码实现四. 顺序表与链表的区别总结一. 前言 顺序表和链表是最基础的两种线性表实现方式。它们各有特点&#xff0c;适用于不同的应用场景。本文将详细介绍这两种数据结构的实现原理、…

GitHub的简单使用方法----(4)

在安装完git之后&#xff0c;桌面右键会出现两个git的选项第一个gui打开是这样的用户界面分别是新建仓库&#xff0c;克隆仓库&#xff0c;打开已经存在的仓库。tips:Git Gui 默认只能操作本地仓库——它本质上是一个图形化的“本地 Git 客户端”。 它本身不内置“下载远程仓库…

蓝桥杯----大模板

在写大模板之前&#xff0c;先讲一个函System_Init()&#xff0c;用于系统初始化关闭所有LED与外设&#xff0c;关闭所有LED就是传入0xff数据打开锁存器&#xff0c;关闭外设就是传入0x00打开锁存器。现在所有底层已经提供给大家了&#xff0c;先提供最简单版本的大模板&#x…

科技写作改革我见:取消参考文献,以点读率取代引证率!

科技写作改革我见&#xff1a;综述应取消参考文献&#xff0c;学术成就评估以点读下载率取代参考文献引证率&#xff01;李升伟 张君飞 韩若兰引言在当今信息爆炸的时代&#xff0c;科技写作作为知识传播的核心载体&#xff0c;其形式与评价体系正面临前所未有的挑战。传统…