基于Dubbo 3.1,详细介绍了Dubbo Consumer接收服务调用响应
此前我们学习了Dubbo Provider处理服务调用请求的流程,现在我们来学习Dubbo Consumer接收服务调用响应流程。
实际上接收请求和接收响应同属于接收消息,它们的流程的很多步骤是一样的。下面我们仅分析关键步骤。
Dubbo 3.x服务调用源码:
- Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
- Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用
- Dubbo 3.x源码(31)—Dubbo消息的编码解码
- Dubbo 3.x源码(32)—Dubbo Provider处理服务调用请求源码
- Dubbo 3.x源码(33)—Dubbo Consumer接收服务调用响应
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
- Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
- Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
- Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
- Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
- Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
- Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
- Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
- Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
- Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
- Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布
文章目录
- AllChannelHandler#received分发任务
- getPreferredExecutorService获取首选线程池
- ThreadlessExecutor#execute执行
- HeaderExchangeHandler#received处理消息
- HeaderExchangeHandler#handleResponse处理响应
- DefaultFuture#received处理响应
- DefaultFuture#doReceived处理响应
AllChannelHandler#received分发任务
将当前消息包装为一个ChannelEventRunnable分发给对应的线程池执行,这里的线程池就是dubbo业务线程池,到此IO线程的任务结束。
这种方式实现了线程资源的隔离,释放了IO线程,可以快速处理更多的IO操作,提升了系统吞吐量。
/*** AllChannelHandler的方法* <p>* 处理普通rpc请求请求** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//获取对应的线程池,可能是ThreadlessExecutorExecutorService executor = getPreferredExecutorService(message);try {//创建一个线程任务ChannelEventRunnable,通过线程池执行//这里的handler是DecodeHandlerexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (message instanceof Request && t instanceof RejectedExecutionException) {sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}
getPreferredExecutorService获取首选线程池
该方法获取处理业务的首选线程池,目前,这种方法主要是为了方便消费者端的线程模型而定制的。这是Dubo2.7.5之后的线程模型的优化:
- 对于响应消息,那么从DefaultFuture的静态字段缓存映射FUTURES中获取请求id对应的DefaultFuture。请求的id就是对应的响应的id。
- 然后获取执DefaultFuture的执行器,对于默认的同步请求,那自然是ThreadlessExecutor,这里面阻塞着发起同步调用的线程,将回调直接委派给发起调用的线程。对于异步请求,则获取异步请求的线程池。
- 因此后续的处理包括请求体的解码都是由发起调用的线程来执行,这样减轻了业务线程池的压力。后面我们将消费者接受响应的时候,会讲解对应的源码。
- 对于请求消息,则使用共享executor执行后续逻辑。对于共享线程池,默认为FixedThreadPool,固定200线程,阻塞队列长度为0,拒绝策略为打印异常日志并且抛出异常。
对于同步阻塞请求的响应,这是默认的请求方式,将会获取ThreadlessExecutor执行器,对于异步请求的响应,将会获取一个多线程的线程池。
/*** WrappedChannelHandler的方法* <p>* 目前,这种方法主要是为了方便消费者端的线程模型而定制的。* 1. 使用ThreadlessExecutor,又名,将回调直接委派给发起调用的线程。* 2. 使用共享executor执行回调。** @param msg 消息* @return 执行器*/
public ExecutorService getPreferredExecutorService(Object msg) {//如果是响应消息if (msg instanceof Response) {Response response = (Response) msg;//从DefaultFuture的静态字段缓存映射FUTURES中获取请求id对应的DefaultFutureDefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());// a typical scenario is the response returned after timeout, the timeout response may have completed the future//一个典型的场景是响应超时后返回,超时后的响应可能已经完成if (responseFuture == null) {//获取当前服务器或客户端的共享执行器return getSharedExecutorService();} else {//获取执future的执行器,对于默认的同步请i去,那自然是ThreadlessExecutor,这里面阻塞着发起同步调用的线程ExecutorService executor = responseFuture.getExecutor();if (executor == null || executor.isShutdown()) {//获取当前服务器或客户端的共享执行器executor = getSharedExecutorService();}return executor;}} else {//获取当前服务器或客户端的共享执行器return getSharedExecutorService();}
}/*** get the shared executor for current Server or Client** @return*/
public ExecutorService getSharedExecutorService() {// Application may be destroyed before channel disconnected, avoid create new application model// see https://github.com/apache/dubbo/issues/9127//在断开通道之前,应用程序可能被销毁,避免创建新的应用程序模型if (url.getApplicationModel() == null || url.getApplicationModel().isDestroyed()) {return GlobalResourcesRepository.getGlobalExecutorService();}// note: url.getOrDefaultApplicationModel() may create new application modelApplicationModel applicationModel = url.getOrDefaultApplicationModel();ExecutorRepository executorRepository =applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();//从执行器仓库中根据url获取对应的执行器,默认INTERNAL_SERVICE_EXECUTOR对应的执行器ExecutorService executor = executorRepository.getExecutor(url);if (executor == null) {executor = executorRepository.createExecutorIfAbsent(url);}return executor;
}
ThreadlessExecutor#execute执行
还记得我们此前学习Dubbo Consumer发起请求的流程吗?对于同步请求,在发送请求之后,在AbstractInvoker#waitForResultIfSync方法中将会执行异步转同步等待,具体的等待方法就是ThreadlessExecutor#waitAndDrain方法。
waitAndDrain方法中,请求调用线程将会执行queue.take()方法尝试获取一个任务,如果没有任务,那么当前线程等待直到任务队列中有一个任务,那么获取并执行。实际上,这里等待的任务就是请求对应的响应结果。
ThreadlessExecutor#execute方法在响应返回之后会执行。首先将ChannelEventRunnable包装为RunnableWrapper,对于run方法添加了try-catch,不会抛出异常仅仅打印日志。然后判断如果同步调用线程没有处于等待状态,那么当前线程直接执行该线程任务。
否则将当前任务加入到阻塞队列,如果同步调用线程还在因为调用waitAndDrain方而处于等待状态,那么将会因为队列中添加了元素而被唤醒,进而执行该线程任务。
/*** 如果调用线程仍在等待回调任务,则将该任务添加到阻塞队列中以等待调度。否则,直接提交到共享回调执行器。** @param runnable 可执行的任务ChannelEventRunnable*/
@Override
public void execute(Runnable runnable) {//包装RunnableWrapper,对于run方法添加了try-catch,不会抛出异常仅仅打印日志runnable = new RunnableWrapper(runnable);synchronized (lock) {//如果同步调用线程没有处于等待状态if (!isWaiting()) {//那么当前线程直接执行该线程任务runnable.run();return;}/** 将当前任务加入到阻塞队列,如果同步调用线程还在因为调用waitAndDrain方而处于等待状态* 那么将会因为队列中添加了元素而被唤醒,进而执行该线程任务*/queue.add(runnable);}
}
HeaderExchangeHandler#received处理消息
HeaderExchangeHandler#received方法对于消息进行分类并调用不同的方法继续处理。
对于请求消息,如果是双向消息,那么调用handleRequest方法继续处理,将会创建Response对象,然后调用dubboProtocol.requestHandler完成请求处理获取结果,并将结果封装到Response中后返回给客户端。如果是单向消息则仅仅调用dubboProtocol.requestHandler完成请求处理即可。
对于响应消息,将会调用DefaultFuture#received方法处理,此时就会根据响应id获取对应的DefaultFuture,将响应结果设置进去。这里的源码我们后面讲consumer获取响应结果的时候再讲解。
/*** HeaderExchangeHandler的方法* <p>* 分类处理消息** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);//请求消息if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {//处理事件消息handlerEvent(channel, request);} else {if (request.isTwoWay()) {//额外处理双向消息handleRequest(exchangeChannel, request);} else {//处理单向消息,直接调用下层DubboProtocol.requestHandler#received方法handler.received(exchangeChannel, request.getData());}}}//响应消息else if (message instanceof Response) {handleResponse(channel, (Response) message);}//字符串else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (StringUtils.isNotEmpty(echo)) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}
}
HeaderExchangeHandler#handleResponse处理响应
该方法用于处理响应,将会判断如果不是心跳响应,那么继续通过DefaultFuture#received处理响应。
我们在此前学习Dubbo Consumer发起请求的流程的时候就讲过,超时检查任务TimeoutCheckTask#timeoutCheck方法中,如果请求超时,那么同样会调用DefaultFuture#received方法处理超时响应。
/*** HeaderExchangeHandler的方法* <p>* 处理响应消息** @param channel NettyChannel* @param response Response*/
static void handleResponse(Channel channel, Response response) throws RemotingException {//如果不是心跳响应,那么继续通过DefaultFuture#received处理响应if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);}
}
DefaultFuture#received处理响应
当请求结果返回或者请求超时之后,将会通过DefaultFuture#received处理响应。
/*** DefaultFuture的方法* <p>* 处理响应结果** @param channel NettyChannel* @param response Response*/
public static void received(Channel channel, Response response) {//调用另一个received方法,timeout参数为falsereceived(channel, response, false);
}public static void received(Channel channel, Response response, boolean timeout) {try {//由于获得了响应,那么将该请求的缓存从FUTURES中移除DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {//该请求的超时检查任务Timeout t = future.timeoutCheckTask;//如果没有超时,那么if (!timeout) {// decrease Time//取消该任务t.cancel();}//通过future处理响应,设置结果future.doReceived(response);} else {logger.warn("The timeout response finally returned at "+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))+ ", response status is " + response.getStatus()+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");}} finally {//移除正在处理的channel缓存CHANNELS.remove(response.getId());}
}
DefaultFuture#doReceived处理响应
设置响应结果并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程。随后AsyncRpcResult#getAppResponse方法可以获取响应结果并返回。
/*** DefaultFuture的方法* <p>* 处理响应设置结果*/
private void doReceived(Response res) {if (res == null) {throw new IllegalStateException("response cannot be null");}//如果响应成功,则将设置响应结果并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程if (res.getStatus() == Response.OK) {this.complete(res.getResult());}//如果是客户端或者服务端超时,抛出超时异常并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));}//抛出远程调用异常并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程else {this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));}// the result is returning, but the caller thread may still wait// to avoid endless waiting for whatever reason, notify caller thread to return.//结果正在返回,但是调用者线程可能仍在等待,为了避免无休止的等待,通知调用线程返回。if (executor != null && executor instanceof ThreadlessExecutor) {ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;if (threadlessExecutor.isWaiting()) {threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +" which is not an expected state, interrupt the thread manually by returning an exception."));}}
}