一、简介

我们之前其实已经完成了关于grpc的一些基础用法,实际上还有一些比较相对进阶的使用方式。比如:

  • 拦截器:包括客户端和服务端的拦截器,进而在每一端都可以划分为流式的拦截器和非流式的拦截器。和以前我们在spring web中的拦截器思路是一样的,都可以在请求来之前做一些统一的处理,进而减少代码量,做一些鉴权 ,数据校验 ,限流等等,和业务解耦。
    gRPC的拦截器
    1. 一元请求的 拦截器
      客户端 【请求 响应】
      服务端 【请求 响应】
    2. 流式请求的 拦截器 (Stream Tracer)
      客户端 【请求 响应】
      服务端 【请求 响应】
  • 客户端重试:grpc的客户端还可以发起重试请求,当我们有一些异常并非代码异常的时候,可以通过重试来避免问题。
  • NameResolver :当用于微服务的时候,需要注册中心对服务名的解析等等。
  • 负载均衡:包括(pick-first , 轮训)等轮训方式。
  • 可以在其他微服务框架中整合,比如dubbo中,spring cloud中,用protobuf来序列化数据,用grpc来发起rpc(比如可以替代open fegin)等场合。

下面我们就来从拦截器功能开始学习一下grpc。

二、项目构建

我们为进阶篇搭建一个新的工程。结构还是客户端,服务端,api模块。
其中api模块作为公共内容被其他模块引入做公共的声明使用。

api模块: rpc-grpc-adv-api
服务端模块:rpc-grpc-adv-server
客户端模块: rpc-grpc-adv-client

1、api模块

<dependencies><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty-shaded</artifactId><version>1.51.0</version><scope>runtime</scope></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.51.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.51.0</version></dependency><dependency> <!-- necessary for Java 9+ --><groupId>org.apache.tomcat</groupId><artifactId>annotations-api</artifactId><version>6.0.53</version><scope>provided</scope></dependency>
</dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.7.1</version></extension></extensions><plugins><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact>com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact><outputDirectory>${basedir}/src/main/java</outputDirectory><clearOutputDirectory>false</clearOutputDirectory></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins>
</build>

在main目录下创建proto目录,下建立Hello.proto文件,声明内容为。

syntax = "proto3";package com.levi;option java_multiple_files = false;
option java_package = "com.levi";
option java_outer_classname = "HelloProto";message HelloRequest{string name = 1;
}message HelloRespnose{string result = 1;
}service HelloService{// 普通方法rpc hello(HelloRequest) returns (HelloRespnose);// 双端流方法rpc hello1(stream HelloRequest) returns (stream HelloRespnose);
}

然后通过编译器编译生成对应的message类HelloProto.java和service类HelloServiceGrpc.java。
然后该模块将会被其他模块引用,使用这些定义的类。

2、server模块

引入api模块。

<dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-adv-api</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>

服务端业务代码为:

@Slf4j
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloRespnose> responseObserver) {String name = request.getName();System.out.println("接收到客户端的参数name = " + name);responseObserver.onNext(HelloProto.HelloRespnose.newBuilder().setResult("this is server result").build());responseObserver.onCompleted();}
}

服务端启动代码为:

package com.levi;import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer {public static void main(String[] args) throws InterruptedException, IOException {ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);serverBuilder.addService(new HelloServiceImpl());Server server = serverBuilder.build();server.start();server.awaitTermination();}
}

3、client模块

引入api模块。

<dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-adv-api</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>

ok,至此就搭建完了项目结构。

三、拦截器

1、一元拦截器

其中一元拦截器就是在我们以前的一元通信模式使用的。也就是非流式的通信模式下。
而一元拦截器也分为两种:客户端拦截器和服务端拦截器
每一种下面又能分为两种
1.简单模式:只能拦截请求,不能拦截响应。
2.复杂模式:可以拦截请求和响应两种。
下面我们先来研究客户端拦截器。

1.1、客户端拦截器

我们说客户端拦截器又分为简单模式和复杂模式。

1.1.1、简单客户端拦截器

我们来开发客户端的代码,首先我们来编写一个简单拦截器。

package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;/**
* 自定义客户端拦截器,需要实现grpc提供的拦截器接口ClientInterceptor
* 该拦截器在客户端发起请求时被调用,
* 可以在该拦截器中对请求进行处理,比如添加请求头、修改请求参数等
*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模拟业务处理,这是一个拦截启动的处理 ,统一的做了一些操作 ....");/** 拦截器在客户端发起stub的rpc调用之前被调用。处理完之后往下传,把本次调用的一些信息继续往下传* 把调用交给grpc,所以需要传递下去调用方法的元信息和一些选项* 其实就是拦截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下传是用来发起调用的,底层基于netty,所以需要传递Channel next(这是netty调用的基础连接)* 所以需要返回一个ClientCall,封装元信息,然后交给grpc,用来发起调用*/return next.newCall(method, callOptions);
}

我们定义好拦截器之后就要整合在客户端调用的构建上。

package com.levi;import com.levi.interceptor.CustomClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;import java.util.List;public class GrpcClient {public static void main(String[] args) {ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000)// .intercept(new CustomClientInterceptor())// 可以传递多个拦截器,按照传递顺序执行拦截器.intercept(List.of(new CustomClientInterceptor())).usePlaintext().build();try {HelloServiceGrpc.HelloServiceBlockingStub helloServiceBlockingStub = HelloServiceGrpc.newBlockingStub(managedChannel);HelloProto.HelloRequest helloRequest = HelloProto.HelloRequest.newBuilder().setName("levi").build();HelloProto.HelloRespnose helloRespnose = helloServiceBlockingStub.hello(helloRequest);System.out.println("接收到的服务端响应为: " + helloRespnose.getResult());} catch (Exception e) {e.printStackTrace();} finally {managedChannel.shutdown();}}
}

启动服务端,启动客户端,日志显示正常输出没毛病。
但是此时我们也看出来这种简单模式存在几个问题。

# 客户端简单拦截器的问题
1. 只能拦截请求,不能拦截响应。我们只能在请求的时候发起拦截,但是接收响应的时候无法拦截,也就是类似spring mvc的时候没有后置的拦截能力。
2. 即使拦截了请求操作,但是就这个请求拦截上,这个业务粒度也是过于宽泛,不精准。无法在请求的各个阶段发起拦截(1. 开始阶段 2. 设置消息数量 3.发送数据阶段 4.半连接阶段。),其实我们上面的代码可以看出来我们的拦截器是在往下传递ClientCall给grpc,也就是这个调用最后是ClientCall完成的。这里的各个阶段拦截其实就是在ClientCall的各个方法里面插入一些拦截操纵,其实就是在发起请求前,在ClientCall构建的各个阶段拦截一下(这个的底层应该是netty那些阶段性的事件感知实现的。)
装饰者模式增强了一下。

在这里插入图片描述

1.1.2、复杂客户端拦截器

我们前面操作的简单客户端请求拦截器粒度比较大,无法实现对请求过程的更加细力度的监听和管理。所以我们需要一个更加强大的拦截器。我们说白了就是对原来能正常请求中间加一些增强方法,其实就是装饰者模式,包装一下原始类型。在原始类型的基础上加了一堆方法分别在各个阶段生效,从而来增强原始能力。但是真正的rpc调用实现还是原始类型发起的。

请求拦截

于是我们来写一下代码。

/*这个类型增强原始类型 适用于控制 拦截 请求发送各个环节*/
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 构造器模式,需要实现构造函数,传入原始类型,进行增强类型的包装*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 开始调用,目的 看一个这个RPC请求是不是可以被发起。比如加一些鉴权等功能来判断是不是可以调用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否则就发起请求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("发送请求数据之前的检查.....");//真正的去发起grpc的请求// 是否真正发送grpc的请求,取决这个start方法的调用,delegate()就是原始类型,可以通过构造函数来看到// delegate()就是原始类型那个之前简单调用的ClientCall,这就是装饰器模式delegate().start(responseListener, headers);}
}

我们看到这就是一个增强的包装类,他是对原始的简单拦截器的那个ClientCall的包装。我们看到它在后续的动作之前增强了一个检查的实现。
然后你钥匙要继续就一定要delegate().start才会往下走。否则没启动ClientCall会报错。
ok,我们已经完成了增强ClientCall的开发,现在要把原来的拦截器方法里面的简单ClientCall替换为增强ClientCall。
我们来修改拦截器代码。

/*** 自定义客户端拦截器,需要实现grpc提供的拦截器接口ClientInterceptor* 该拦截器在客户端发起请求时被调用,* 可以在该拦截器中对请求进行处理,比如添加请求头、修改请求参数等*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模拟业务处理,这是一个拦截启动的处理 ,统一的做了一些操作 ....");/** 拦截器在客户端发起stub的rpc调用之前被调用。处理完之后往下传,把本次调用的一些信息继续往下传* 把调用交给grpc,所以需要传递下去调用方法的元信息和一些选项* 其实就是拦截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下传是用来发起调用的,底层基于netty,所以需要传递Channel next(这是netty调用的基础连接)* 所以需要返回一个ClientCall,封装元信息,然后交给grpc,用来发起调用*/// return next.newCall(method, callOptions);/**  如果我们需要用复杂客户端拦截器 ,就需要对原始的ClientCall进行包装*  那么这个时候,就不能返回原始ClientCall对象,*  应该返回 包装的ClientCall ---> CustomForwardingClientClass*/return new CustomForwardingClientClass<>(next.newCall(method, callOptions));}
}

于是,启动服务端代码,客户端代码观察执行结果。
在这里插入图片描述
没有问题。
此外这个增强拦截还有更加细粒度的方法增强,我们来实现一下。

package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;import javax.annotation.Nullable;/*** 自定义客户端拦截器,需要实现grpc提供的拦截器接口ClientInterceptor* 该拦截器在客户端发起请求时被调用,* 可以在该拦截器中对请求进行处理,比如添加请求头、修改请求参数等*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模拟业务处理,这是一个拦截启动的处理 ,统一的做了一些操作 ....");/** 拦截器在客户端发起stub的rpc调用之前被调用。处理完之后往下传,把本次调用的一些信息继续往下传* 把调用交给grpc,所以需要传递下去调用方法的元信息和一些选项* 其实就是拦截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下传是用来发起调用的,底层基于netty,所以需要传递Channel next(这是netty调用的基础连接)* 所以需要返回一个ClientCall,封装元信息,然后交给grpc,用来发起调用*/// return next.newCall(method, callOptions);/**  如果我们需要用复杂客户端拦截器 ,就需要对原始的ClientCall进行包装*  那么这个时候,就不能返回原始ClientCall对象,*  应该返回 包装的ClientCall ---> CustomForwardingClientClass*/return new CustomForwardingClientClass<>(next.newCall(method, callOptions));}
}/*这个类型增强原始类型 适用于控制 拦截 请求发送各个环节*/
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 构造器模式,需要实现构造函数,传入原始类型,进行增强类型的包装*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 开始调用,目的 看一个这个RPC请求是不是可以被发起。比如加一些鉴权等功能来判断是不是可以调用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否则就发起请求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("发送请求数据之前的检查.....");//真正的去发起grpc的请求// 是否真正发送grpc的请求,取决这个start方法的调用,delegate()就是原始类型,可以通过构造函数来看到// delegate()就是原始类型那个之前简单调用的ClientCall,这就是装饰器模式delegate().start(responseListener, headers);}// 真正开始发送消息,netty的发送消息的方法,outBoundBuffer@Overridepublic void sendMessage(ReqT message) {log.info("发送请求数据: {}", message);super.sendMessage(message);}// 指定发送消息的数量,类似批量发送@Overridepublic void request(int numMessages) {log.info("指定发送消息的数量: {}", numMessages);super.request(numMessages);}// 取消请求的时候回调触发@Overridepublic void cancel(@Nullable String message, @Nullable Throwable cause) {log.info("取消请求: {}", message);super.cancel(message, cause);}// 链接半关闭的时候回调触发,请求消息无法发送,但是可以接受响应的消息@Overridepublic void halfClose() {log.info("链接半关闭");super.halfClose();}// 消息发送是否启用压缩@Overridepublic void setMessageCompression(boolean enabled) {log.info("消息发送是否启用压缩: {}", enabled);super.setMessageCompression(enabled);}// 是否可以发送消息,这个在流式里面会调用,一元的不会@Overridepublic boolean isReady() {log.info("是否可以发送消息: {}", super.isReady());return super.isReady();}
}

运行程序结果为:
在这里插入图片描述
至此我们看到我们在客户端请求的各个阶段都进行了监听回调。这就是客户端的请求增强了。

响应拦截

前面我们完成的是对于请求的拦截,其实我们可以在客户端这里对服务端响应的拦截。我们可以拦截响应数据,这个能力可以让我们在不同的客户端定制自己的拦截需求。服务端不管你的需求,都返回,你不同的客户端可能有不同的要求,自己去做拦截定制。
我们先来看一下我们之前的那个请求增强。

@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("发送请求数据之前的检查.....");delegate().start(responseListener, headers);}
}

你有请求拦截才有响应拦截,而且响应拦截我们一般都是通过监听器来实现的,因为客户端你也不知道你啥时候响应,所以就需要监听回调的形式来监听。我们看到在checkedStart这个方法这里。他的参数列表里面有一个responseListener,响应监听器。其实就是这个东西,我们需要重新实现他,然后传进去,他就会在checkedStart调用的时候传递给grpc,grpc就根据你的实现来拦截了。现在他是一个responseListener,啥也没有,你要想拦截功能还是要增强包装。所以我们来实现一下。

/*用于监听响应,并对响应进行拦截,其中响应头回来onHeaders被调用是服务端的responseObserver.onNext这个调用触发的。而服务端调用responseObserver.onCompleted()才会回调onMessage这个。对应的其实就是netty的write和flush,responseObserver.onCompleted()才会真的flush,把数据写回来。可以在服务端做修改测试一下。*/
@Slf4j
class CustomCallListener<RespT> extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {// 构造器包装原始的Listener,下面的回调实现包装增强。protected CustomCallListener(ClientCall.Listener<RespT> delegate) {super(delegate);}@Overridepublic void onHeaders(Metadata headers) {log.info("响应头信息 回来了......");super.onHeaders(headers);}@Overridepublic void onMessage(RespT message) {log.info("响应的数据 回来了.....{} ", message);super.onMessage(message);}// 这个在流式里面会调用,一元的不会,可以不实现@Overridepublic void onReady() {super.onReady();}//这个在流式里面会调用,一元的不会,可以不实现@Overridepublic void onClose(Status status, Metadata trailers) {super.onClose(status, trailers);}
}

通过构造函数执行包装,然后再包装里面增强。此时我们只需要替代delegate().start(responseListener, headers);中的参数responseListener为我们自己定义的就好了。

@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 构造器模式,需要实现构造函数,传入原始类型,进行增强类型的包装*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 开始调用,目的 看一个这个RPC请求是不是可以被发起。比如加一些鉴权等功能来判断是不是可以调用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否则就发起请求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("发送请求数据之前的检查.....");//真正的去发起grpc的请求// 是否真正发送grpc的请求,取决这个start方法的调用,delegate()就是原始类型,可以通过构造函数来看到// delegate()就是原始类型那个之前简单调用的ClientCall,这就是装饰器模式// delegate().start(responseListener, headers);delegate().start(new CustomCallListener<>(responseListener), headers);// 传入增强响应拦截}...... 省略其余代码
}

执行没有问题。
在这里插入图片描述

1.2、服务端拦截器

1.2.1、服务端简单拦截器

对应客户端那边的拦截器,服务端这里其实是对应的,该有的都有。我们来看下服务端的简单拦截器。

/*** 自定义服务端拦截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服务器端 拦截请求操作的功能 写在这个方法中log.debug("服务器端拦截器生效.....");// 返回req请求监听器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器ServerCall.Listener<ReqT> reqTListener = next.startCall(call, headers);return reqTListener;}
}

这里的概念我们都可以在客户端那里找到对应的。我们看到这个方法返回了一个ServerCall.Listener 类型的,而且泛型是req,可见是对于请求的监听器。作为服务端,他是被动连接的,所以她的拦截方式就是监听,什么时候来我什么时候拦截,他不知道你啥时候来,就只能监听着。而next.startCall(call, headers);返回的就是一个具有原始能力的拦截器,没有封装增强的。我们把这个拦截器整合到服务端发布代码中使其生效。

package com.levi;import com.levi.interceptor.CustomServerInterceptor;
import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer {public static void main(String[] args) throws InterruptedException, IOException {ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);serverBuilder.addService(new HelloServiceImpl());// 注册自定义拦截器serverBuilder.intercept(new CustomServerInterceptor());Server server = serverBuilder.build();server.start();server.awaitTermination();}
}

我们把自定义的拦截器注册进去之后启动服务端和客户端看一下。没有问题。
在这里插入图片描述
同样的服务端的简单拦截器也存在像客户端那边的问题,
拦截请求发送过来的数据,无法处理响应的数据。
拦截力度过于宽泛
所以我么需要复杂拦截器,增强原始的拦截器,达到更加细力度的控制拦截。一切都和当初我们在客户端做的一样,重新定义一个增强的。

1.2.2、服务端复杂拦截器

拦截请求,拦截的是客户端过来的消息

/*** 自定义服务端拦截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服务器端 拦截请求操作的功能 写在这个方法中log.debug("服务器端拦截器生效.....");// 返回req请求监听器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器// ServerCall.Listener<ReqT> reqTListener = next.startCall(call, headers);// 包装器设计模式,封装原始的监听器,增强原始监听器的功能,实际的核心调用还是原始的在做// 只是加了一些额外的增强的方法return new CustomServerCallListener<>(next.startCall(call, headers));}
}/*** 复杂服务端拦截器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器* 对于reqTListener的事件,我们可以在事件触发时,做一些自定义的操作,* 本质是对于原始监听器的一个包装增强,包装器模式*/
@Slf4j
class CustomServerCallListener<ReqT> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {protected CustomServerCallListener(ServerCall.Listener<ReqT> delegate) {super(delegate);}@Override//准备接受请求数据public void onReady() {log.debug("onRead Method Invoke,准备好接收客户端数据....");super.onReady();}@Overridepublic void onMessage(ReqT message) {log.debug("接受到了客户端请求提交的数据,客户端的请求数据是:  {} ", message);super.onMessage(message);}@Overridepublic void onHalfClose() {log.debug("监听到了 半连接触发这个操作...");super.onHalfClose();}@Overridepublic void onComplete() {log.debug("服务端 调用onCompleted()触发...");super.onComplete();}@Overridepublic void onCancel() {log.debug("出现异常后 会调用这个方法... 可以在这里做一些关闭资源的操作");super.onCancel();}
}

在经历了客户端的开发之后,我们这里其实就很好理解了。调用没有问题。
在这里插入图片描述

拦截响应,拦截的是服务端发给客户端的响应

我们能拦截请求,自然也就能拦截响应。我们先来看一下什么是服务端的响应,其实就是服务端回写给客户端的操作。也就是服务端调用客户端的操作,我们上面拦截请求其实是客户端发给服务端,也即是服务端的监听ServerCall.Listener,服务端的监听器做包装增强。
现在你要增强服务端对客户端的调用其实就是ServerCall(这里对应我们客户端那里的ClientCall)。所以我们要对ServerCall做包装。就是你谁干啥就增强啥就实现啥就行。

/*** 目的:通过自定义的ServerCall 包装原始的ServerCall 增加对于响应拦截的功能*/
@Slf4j
class CustomServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {// 这里包装的是ServerCallprotected CustomServerCall(ServerCall<ReqT, RespT> delegate) {super(delegate);}@Override//指定发送消息的数量 【响应消息】public void request(int numMessages) {log.debug("response 指定消息的数量 【request】");super.request(numMessages);}@Override//设置响应头public void sendHeaders(Metadata headers) {log.debug("response 设置响应头 【sendHeaders】");super.sendHeaders(headers);}@Override//响应数据public void sendMessage(RespT message) {log.debug("response 响应数据  【send Message 】 {} ", message);super.sendMessage(message);}@Override//关闭连接public void close(Status status, Metadata trailers) {log.debug("response 关闭连接 【close】");super.close(status, trailers);}
}

然后我们再把这个包装增强整合到拦截器里面,交给grpc的体系中才能生效,在interceptCall中进行整合,我们不需要改动服务端发布那里的代码,那里可以直接通过CustomServerInterceptor来处理我们这里整合到的两个拦截器,以下为完整代码。

package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;/*** 自定义服务端拦截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服务器端 拦截请求操作的功能 写在这个方法中log.debug("服务器端拦截器生效.....");//包装ServerCall 处理服务端响应拦截CustomServerCall<ReqT,RespT> reqTRespTCustomServerCall = new CustomServerCall<>(call);// 包装Listener   处理服务端请求拦截CustomServerCallListener<ReqT> reqTCustomServerCallListener = new CustomServerCallListener<>(next.startCall(reqTRespTCustomServerCall, headers));return reqTCustomServerCallListener;}
}/*** 复杂服务端拦截器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器* 对于reqTListener的事件,我们可以在事件触发时,做一些自定义的操作,* 本质是对于原始监听器的一个包装增强,包装器模式*/
@Slf4j
class CustomServerCallListener<ReqT> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {protected CustomServerCallListener(ServerCall.Listener<ReqT> delegate) {super(delegate);}@Override//准备接受请求数据public void onReady() {log.debug("onRead Method Invoke,准备好接收客户端数据....");super.onReady();}@Overridepublic void onMessage(ReqT message) {log.debug("接受到了客户端请求提交的数据,客户端的请求数据是:  {} ", message);super.onMessage(message);}@Overridepublic void onHalfClose() {log.debug("监听到了 半连接触发这个操作...");super.onHalfClose();}@Overridepublic void onComplete() {log.debug("服务端 调用onCompleted()触发...");super.onComplete();}@Overridepublic void onCancel() {log.debug("出现异常后 会调用这个方法... 可以在这里做一些关闭资源的操作");super.onCancel();}
}/*** 通过自定义的ServerCall 包装原始的ServerCall 增加对于响应拦截的功能*/
@Slf4j
class CustomServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {protected CustomServerCall(ServerCall<ReqT, RespT> delegate) {super(delegate);}@Override//指定发送消息的数量 【响应消息】public void request(int numMessages) {log.debug("response 指定消息的数量 【request】");super.request(numMessages);}@Override//设置响应头public void sendHeaders(Metadata headers) {log.debug("response 设置响应头 【sendHeaders】");super.sendHeaders(headers);}@Override//响应数据public void sendMessage(RespT message) {log.debug("response 响应数据  【send Message 】 {} ", message);super.sendMessage(message);}@Override//关闭连接public void close(Status status, Metadata trailers) {log.debug("response 关闭连接 【close】");super.close(status, trailers);}
}

而且我们看到增强类都是要实现构造的,因为要传进去原始类,进行封装,调用核心方法还是走super,走那个原始的操作。你的增强的操作可以加在这些新的方法里面。这些增强方法,你可以酌情看你要的业务方法,需要的就实现,不需要就可以不覆盖实现。其实他就是服务端的响应的各个阶段不同的触发。
我们运行代码没有问题,各个阶段都被触发了。
在这里插入图片描述
对于你要是想只拦截响应,不拦截请求可以这么做。

public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服务器端 拦截请求操作的功能 写在这个方法中log.debug("服务器端拦截器生效.....");//包装ServerCall 处理服务端响应拦截CustomServerCall<ReqT,RespT> reqTRespTCustomServerCall = new CustomServerCall<>(call);// 包装Listener   处理服务端请求拦截CustomServerCallListener<ReqT> reqTCustomServerCallListener =new CustomServerCallListener<>(next.startCall(reqTRespTCustomServerCall, headers));// return reqTCustomServerCallListener;/*** 只拦截响应,我们就不需要包装Listener,也就是返回原始的Listener即可。原始的Listener我们是通过* next.startCall(reqTRespTCustomServerCall, headers)获取到的。所以继续用next.startCall不操作包装的* Listener即可,但是我们要包装响应也就是serverCall,所以返回reqTRespTCustomServerCall。包在原始Listener中* 你要是包装请求,那就是需要包装的Listener,不需要就直接next.startCall返回startCall即可。*/return next.startCall(reqTRespTCustomServerCall, headers);}

明白请求是包装的listener,响应是servercall,需要哪个就加强哪个就行,不需要增强拦截就用原始的就行。

四、总结

这就是grpc中比较常见的一元拦截器的使用,他是对于一元rpc的拦截。在各个拦截方法中我们可以定义一些自己的业务方法。进而灵活使用拦截器。而且你要是在某个点拦截之后不想继续往下走,那你就不要调用每个拦截方法的super,不要做后续的调用,直接断开链路即可。而且至于拦截请求还是响应就看你包装啥就完了,他不是耦合在一起的。
后面我们再来分析监听流也就是流式和双向调用的拦截器。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/90595.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/90595.shtml
英文地址,请注明出处:http://en.pswp.cn/web/90595.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入探索嵌入式仿真教学:以酒精测试仪实验为例的高效学习实践

引言&#xff1a;嵌入式技术普及下的教学革新 嵌入式系统作为现代科技的核心驱动力&#xff0c;其教学重要性日益凸显。然而&#xff0c;传统硬件实验面临设备成本高、维护难、时空受限等挑战。如何突破这些瓶颈&#xff0c;实现高效、灵活、专业的嵌入式教学&#xff1f;本文将…

三种深度学习模型(GRU、CNN-GRU、贝叶斯优化的CNN-GRU/BO-CNN-GRU)对北半球光伏数据进行时间序列预测

代码功能 该代码实现了一个光伏发电量预测系统&#xff0c;采用三种深度学习模型&#xff08;GRU、CNN-GRU、贝叶斯优化的CNN-GRU/BO-CNN-GRU&#xff09;对北半球光伏数据进行时间序列预测对北半球光伏数据进行时间序列预测&#xff0c;并通过多维度评估指标和可视化对比模型性…

PostgreSQL对象权限管理

本文记述在postgreSQL中对用户/角色操作库、模式、表、序列、函数、存储过程的权限管理针对数据库的授权 授权&#xff1a;grant 权限 on database 数据库 to 用户/角色; 撤权&#xff1a;revoke 权限 on database 数据库 from 用户/角色; 针对模式的授权 授权&#xff1a;gran…

Wordpress主题配置

一、下载主题 主题下载地址&#xff1a;https://www.iztwp.com/tag/blog-theme 二、主题安装 三、上传主题安装即可 四、安装完成启动主题

lock 和 synchronized 区别

1. 引言 在多线程编程中&#xff0c;我们经常需要确保某些代码在同一时刻只由一个线程执行。这种机制通常叫做“互斥锁”或“同步”。Java 提供了两种主要的同步机制&#xff1a;synchronized 关键字和 Lock 接口。尽管它们的作用相似&#xff0c;都用于实现线程的同步&#xf…

Tkinter - Python图形界面开发指南

作者&#xff1a;唐叔在学习 专栏&#xff1a;唐叔学python 标签&#xff1a;Python GUI编程 Tkinter教程 图形界面开发 Python实战 界面设计 事件监听 Python入门 唐叔Python 编程学习 软件开发 文章目录一、Tkinter是什么&#xff1f;为什么选择它&#xff1f;二、Tkinter基础…

Java基础day15

目录 一、Java集合简介 1.什么是集合&#xff1f; 2.集合接口 3.小结 二、List集合 1.List集合简介 三、ArrayList容器类 1.初始化 1.1无参初始化 1.2有参初始化 2.数据结构 3.常用方法 3.1增加元素 3.2查找元素 3.3 修改元素 3.4 删除元素 3.5 其他方法 4.扩…

React Three Fiber 实现昼夜循环:从光照过渡到日月联动的技术拆解

在 3D 场景中用 React Three Fiber 实现自然的昼夜循环&#xff0c;核心难点在于光照的平滑过渡、日月运动的联动逻辑、昼夜状态下的光影差异处理&#xff0c;以及性能与视觉效果的平衡。本文以一个 ReactThree.js 的实现为例&#xff0c;详细解析如何通过三角函数计算日月位置…

进阶向:基于Python的简易屏幕画笔工具

用Python打造你的专属屏幕画笔工具&#xff1a;零基础也能轻松实现你是否曾在观看网课或参加远程会议时&#xff0c;想要直接在屏幕上标注重点&#xff1f;或者作为设计师&#xff0c;需要快速绘制创意草图&#xff1f;现在&#xff0c;只需几行Python代码&#xff0c;你就能轻…

Elasticsearch-ik分析器

CLI 安装步骤 1、停止 Elasticsearch&#xff08;如果正在运行&#xff09;&#xff1a; 在安装插件之前&#xff0c;确保 Elasticsearch 没有在运行。 命令&#xff1a; systemctl stop elasticsearch2、安装插件&#xff1a; 使用 elasticsearch-plugin 命令安装 IK 插件。进…

MySQL八股篇

查询关键字执行先后顺序FROM&#xff08;及 JOIN)WHEREGROUP BYHAVINGSELECTDISTINCTORDER BYLIMIT / OFFSETCHAR 和 VARCHAR 的区别&#xff1f;使用场景&#xff1f;特性CHARVARCHAR​存储方式​​定长&#xff0c;存储时填充空格至定义长度变长&#xff0c;存储实际数据 长…

QT RCC 文件

RCC (Qt Resource Compiler) 是 Qt 框架中的一个工具&#xff0c;用于将资源文件&#xff08;如图像、音频、翻译文件等&#xff09;编译成二进制格式&#xff0c;并嵌入到应用程序可执行文件中。RCC 文件基本概念作用&#xff1a;将应用程序所需的资源文件编译成 C 代码&#…

数据湖典型架构解析:2025 年湖仓一体化解决方案

数据湖架构概述&#xff1a;从传统模型到 2025 年新范式数据湖作为存储海量异构数据的中央仓库&#xff0c;其架构设计直接影响企业数据价值的释放效率。传统数据湖架构主要关注数据的存储和管理&#xff0c;而 2025 年的数据湖架构已经演变为更加智能化、自动化的综合性数据平…

绘图库 Matplotlib Search

关于Pathon的绘图库的认识和基本操作的学习 这里学习了两款常用便捷的绘图库去学习使用Matplotlib介绍是最受欢迎的一种数据可视化包 是常用的2D绘图库 一般常于Numpy和Pandas使用 是数据分析中非常重要的工具可以自定义XY轴 绘制线形图 柱状图 直方图 密度图 散点图 更清晰的展…

Docker详解及实战

&#x1f389; Docker 简介和安装 - Docker 快速入门 Docker 简介 Docker是一个开源的平台&#xff0c;用于开发、交付和运行应用程序。它能够在Windows&#xff0c;macOS&#xff0c;Linux计算机上运行&#xff0c;并将某一应用程序及其依赖项打包至一个容器中&#xff0c;这…

嵌入式学习的第三十三天-进程间通信-UDP

一、网络1.定义不同主机间进程通信主机间在硬件层面互联互通主机在软件层面互联互通2.国际网络体系结构OSI模型&#xff08;7层&#xff09;: open system interconnect -------理论模型------定义了网络通信中不同层的协议1977 国际标准化组织各种不同体系结构的计算机能在世…

4、Spring AI_DeepSeek模型_结构化输出

一、前言 Spring AI 提供跨 AI 供应商&#xff08;如 OpenAI、Hugging Face 等&#xff09;的一致性 API, 通过分装的ChatModel或ChatClient即可轻松调动LLM进行流式或非流式对话。 本专栏主要围绕着通过OpenAI兼容接口调用各种大语言模型展开学习&#xff08;因为大部分模型…

Spring Data Redis 从入门到精通:原理与实战指南

一、Redis 基础概念 Redis&#xff08;Remote Dictionary Server&#xff09;是开源的内存键值对数据库&#xff0c;以高性能著称。它支持多种数据结构&#xff08;String、Hash、List、Set、ZSet&#xff09;&#xff0c;并提供持久化机制&#xff08;RDB、AOF&#xff09;。 …

免费版酒店押金原路退回系统——仙盟创梦IDE

项目介绍​东方仙盟开源酒店押金管理系统是一款面向中小型酒店、民宿、客栈的轻量级前台管理工具&#xff0c;专注于简化房态管理、订单处理和押金跟踪流程。作为完全开源的解决方案&#xff0c;它无需依赖任何第三方服务&#xff0c;所有数据存储在本地浏览器中&#xff0c;确…

10. isaacsim4.2教程-RTX Lidar 传感器

1. 前言RTX Lidar 传感器Isaac Sim的RTX或光线追踪Lidar支持通过JSON配置文件设置固态和旋转Lidar配置。每个RTX传感器必须附加到自己的视口或渲染产品&#xff0c;以确保正确模拟。重要提示&#xff1a; 在运行RTX Lidar仿真时&#xff0c;如果你在Isaac Sim UI中停靠窗口&…