netty系列文章:

01-netty基础-socket
02-netty基础-java四种IO模型
03-netty基础-多路复用select、poll、epoll
04-netty基础-Reactor三种模型
05-netty基础-ByteBuf数据结构
06-netty基础-编码解码
07-netty基础-自定义编解码器
08-netty基础-自定义序列化和反序列化
09-netty基础-手写rpc-原理-01
10-netty基础-手写rpc-定义协议头-02
11-netty基础-手写rpc-支持多序列化协议-03
12-netty基础-手写rpc-编解码-04
13-netty基础-手写rpc-消费方生成代理-05
14-netty基础-手写rpc-提供方(服务端)-06

1 功能逻辑

在客户端启动的时候要为添加了BonnieRemoteReference注解的属性生成一个代理类;代理类的主要功能:在spring容器加载完BeanDefinition之后,在Bean初始化之前,触发生成代理类。
逻辑:

  1. 获取到所有的BeanDefinition
  2. 拿到BeanDefinition对应的class
  3. 遍历class下的所有被BonnieRemoteReference修饰的属性(成员变量)
  4. 为被BonnieRemoteReference修饰的属性,使用BeanDefinitionBuilder构建BeanDefinition,设置interfaceClass、serviceAddress、servicePort属性,并放入到spring容器中,对象的类型为SpringRpcReferenceBean;
  5. SpringRpcReferenceBean实现FactoryBean接口,然后在getObject中返回代理对象。
  6. 编写NettyClient代码

补充:

Spring 的 FactoryBean 是一个工厂 bean 接口,用于自定义 bean 的创建逻辑。它的核心作用是:

  • 当容器获取该 bean 时(如 getBean("xxx")),实际返回的是 getObject() 方法创建的对象,而非 SpringRpcReferenceBean 自身实例。
  • 常用于创建复杂对象(如远程服务代理、数据库连接池等)

2 重点代码介绍

2.1 触发生成代理类入口代码

在spring容器加载BeanDefinition之后,在Bean初始化之前执行,实现接口BeanFactoryPostProcessor接口中postProcessBeanFactory方法即可
 

获取所有的beanDefinitionNames
String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();

获取beanClassName对应的类信息
Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);

获取clazz上的所有属性(成员变量)
ReflectionUtils.doWithFields(clazz, this::parseRpcReference);

当前这个field是否被BonnieRemoteReference注解修饰
BonnieRemoteReference remoteReference = AnnotationUtils.getAnnotation(field, BonnieRemoteReference.class);

生成SpringRpcReferenceBean的BeanDefinition
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class)
放入属性,远程调用中需要的内容,比如是那个类,以及地址端口信息
builder.addPropertyValue("interfaceClass", field.getType());
builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());
builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());
BeanDefinition beanDefinition = builder.getBeanDefinition();
rpcRefBeanDefinitionMap.put(field.getName(), beanDefinition);

放入到spring容器中
registry.registerBeanDefinition(entry.getKey(), entry.getValue());

package com.bonnie.protocol.spring.reference;import com.bonnie.protocol.annotation.BonnieRemoteReference;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;import java.lang.reflect.Field;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {private ApplicationContext applicationContext;private ClassLoader classLoader;//保存发布的引用bean的信息private final Map<String, BeanDefinition> rpcRefBeanDefinitionMap = new ConcurrentHashMap<>();private RpcClientProperties rpcClientProperties;public SpringRpcReferencePostProcessor(RpcClientProperties rpcClientProperties) {this.rpcClientProperties = rpcClientProperties;}/*** 实现postProcessBeanFactory方法,spring容器加载了bean的定义文件之后, 在bean实例化之前执行* 1、将类型的存在的BonnieRemoteReference注解的属性,构造BeanDefinition放在容器中,beanName是类的全限定名, BeanDefinition(类的全限定名,客户端IP,客户端端口号)* @param beanFactory* @throws BeansException*/@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {// 获取到所有的beanDefinitionString[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();// 遍历for (String beanDefinitionName : beanDefinitionNames) {BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);String beanClassName = beanDefinition.getBeanClassName();if (Objects.nonNull(beanClassName)) {// 获取到这个类的所有fieldClass<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);// 该方法遍历class对象中的所有的field属性,并且作为参数传入到parseRpcReference方法中ReflectionUtils.doWithFields(clazz, this::parseRpcReference);}}// 将生成的BeanDefinition放入到容器中BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;Set<Map.Entry<String, BeanDefinition>> entries = this.rpcRefBeanDefinitionMap.entrySet();for (Map.Entry<String, BeanDefinition> entry : entries) {if (applicationContext.containsBean(entry.getKey())) {log.warn("SpringContext already register bean {}", entry.getKey());} else {registry.registerBeanDefinition(entry.getKey(), entry.getValue());log.info("registered RpcReferenceBean {} success", entry.getKey());}}}private void parseRpcReference(Field field) {// 当前这个field是否被BonnieRemoteReference注解修饰BonnieRemoteReference remoteReference = AnnotationUtils.getAnnotation(field, BonnieRemoteReference.class);// BonnieRemoteReference注解修饰if (Objects.nonNull(remoteReference)) {BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);builder.addPropertyValue("interfaceClass", field.getType());builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());BeanDefinition beanDefinition = builder.getBeanDefinition();rpcRefBeanDefinitionMap.put(field.getName(), beanDefinition);}}@Overridepublic void setBeanClassLoader(ClassLoader classLoader) {this.classLoader = classLoader;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}}

2.2 生成代理类代码

上面会被BonnieRemoteReference修饰的属性(Field)为生成SpringRpcReferenceBean对象,并添加相关的属性。

实现FactoryBean接口,当spring获取SpringRpcReferenceBean对象的时候,调用的就是里面的getObject对象,在getObject里面生成一个代理类,即可代理被BonnieRemoteReference修饰的类。

package com.bonnie.protocol.spring.reference;import lombok.Setter;
import org.springframework.beans.factory.FactoryBean;import java.lang.reflect.Proxy;/*** 创建SpringRpcReferenceBean的代理对象*/
@Setter
public class SpringRpcReferenceBean implements FactoryBean<Object> {private String serviceAddress;private Integer servicePort;private Class<?> interfaceClass;/*** 返回由工厂创建的目标Bean实例* @return* @throws Exception*/@Overridepublic Object getObject() throws Exception {System.out.println("代理类 serviceAddress "+serviceAddress);System.out.println("代理类 servicePort "+servicePort);System.out.println("代理类 interfaceClass "+interfaceClass);// 为BonnieRemoteReference生成一个代理类return Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[]{interfaceClass},new RpcInvokerProxy(serviceAddress, servicePort));}/*** 返回目标Bean的类型* @return*/@Overridepublic Class<?> getObjectType() {return this.interfaceClass;}}

2.3 代理类handler

这块主要是在发生rpc调用的时候,组装请求信息,并通过nettyClient向服务端发起连接并且发送请求。

package com.bonnie.protocol.spring.reference;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.enums.RpcConstant;
import com.bonnie.protocol.enums.SerialTypeEnum;
import com.bonnie.protocol.netty.NettyClient;
import io.netty.channel.DefaultEventLoop;
import io.netty.util.concurrent.DefaultPromise;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;public class RpcInvokerProxy implements InvocationHandler {private String host;private Integer port;public RpcInvokerProxy(String host, Integer port) {this.host = host;this.port = port;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {/*** 构建发送的请求报文,首先去创建RequestHold类,在这个类定义一个原子自增的RequestId,* 在一个就是每次请求都会有结果,那么请求id和结果的关系要有一个映射关系*/RpcProtocol<RpcRequest> reqProtocol = new RpcProtocol<>();long requestId = RequestHolder.REQUEST_ID.incrementAndGet();System.out.println("生成的requestId:" + requestId);Header header = new Header();header.setMagic(RpcConstant.MAGIC);header.setSerialType(SerialTypeEnum.JAVA_SERIAL.getCode());header.setReqType(ReqTypeEnum.REQUEST.getCode());header.setRequestId(requestId);header.setLength(0);RpcRequest rpcRequest = new RpcRequest();rpcRequest.setClassName(method.getDeclaringClass().getName());rpcRequest.setMethodName(method.getName());rpcRequest.setParams(args);rpcRequest.setParameterTypes(method.getParameterTypes());reqProtocol.setHeader(header);reqProtocol.setContent(rpcRequest);// 发起远程调用NettyClient nettyClient = new NettyClient(host, port);System.out.println("代理发送到服务端请求内容:" + JSONObject.toJSONString(reqProtocol));// new DefaultEventLoop(),是用来去执行监听器的RpcFuture<RpcResponse> future = new RpcFuture<>(new DefaultPromise<RpcResponse>(new DefaultEventLoop()));// 在发起请求之前,添加映射关系到map中RequestHolder.REQUEST_MAP.put(header.getRequestId(), future);// 客户端发送数据nettyClient.sendRequest(reqProtocol);// 通过promise,异步等待服务端发送数据来,不然就会一直在此等待// get方法得到的是RpcResponse类,然后调用getData方法获取到数据return future.getPromise().get().getData();}
}

2.4 netty客户端代码

这块主要包含创建客户端、向服务端发起连接、发送请求,也会设置前文中自定义编解码、序列化的操作

package com.bonnie.protocol.netty;import com.bonnie.protocol.code.BonnieDecoder;
import com.bonnie.protocol.code.BonnieEncoder;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyClient {private final Bootstrap bootstrap;private final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();private String serviceAddress;private Integer servicePort;public NettyClient(String serviceAddress, Integer servicePort) {log.info("开始初始化NettyClient======");bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.info("开始初始化RpcClientInitializer======");ch.pipeline().addLast(new LoggingHandler()).addLast(new BonnieEncoder()).addLast(new BonnieDecoder()).addLast(new RpcClientHandler());}});this.serviceAddress = serviceAddress;this.servicePort = servicePort;}/*** 发送数据* @param protocol* @throws Exception*/public void sendRequest(RpcProtocol<RpcRequest> protocol) {try {System.out.println(this.serviceAddress+ "===="+this.servicePort);final ChannelFuture channelFuture  = bootstrap.connect(this.serviceAddress, this.servicePort).sync();// 注册一个监听器,如果出问题就关闭groupchannelFuture.addListener(listener -> {if (channelFuture.isSuccess()) {log.info("connect rpc server {} success.",this.serviceAddress);} else {log.error("connect rpc server {} failed. ",this.servicePort);channelFuture.cause().printStackTrace();eventLoopGroup.shutdownGracefully();}});log.info("begin transfer data");// 向服务端发送数据channelFuture.channel().writeAndFlush(protocol);} catch (InterruptedException e) {e.printStackTrace();}}}

2.5 netty客户端接收服务端响应数据

package com.bonnie.protocol.netty;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.spring.reference.RequestHolder;
import com.bonnie.protocol.spring.reference.RpcFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {/*** 接收服务端响应数据* @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) throws Exception {long requestId = msg.getHeader().getRequestId();log.info("接收服务端响应的结果====== requestId {} {}", requestId, JSONObject.toJSONString(msg));// 删除映射关系RpcFuture<RpcResponse> future = RequestHolder.REQUEST_MAP.remove(requestId);// 我们之前说异步等待服务端发送数据过来,那么只要服务端发送数据过来,就会调用管道RpcClentHandler的read方法// 那么当初future.getPromise().get()如果不再阻塞获取数据呢?就是通过给Promise中的Success设置值,同时会唤醒阻塞的线程// 一当唤醒线程, future.getPromise().get()就会不再阻塞,就获取到服务端返回的数据future.getPromise().setSuccess(msg.getContent());}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/918425.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/918425.shtml
英文地址,请注明出处:http://en.pswp.cn/news/918425.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ThreadLocal有哪些内存泄露问题,如何避免?

每个Thread都有一个ThreadLocal.ThreadLocalMap的map&#xff0c;该map的key为ThreadLocal实例&#xff0c;它为一个弱引 用&#xff0c;我们知道弱引用有利于GC回收。当ThreadLocal的key null时&#xff0c;GC就会回收这部分空间&#xff0c;但是value却不一 定能够被回收&am…

从0到1学LangChain之Agent代理:解锁大模型应用新姿势

从0到1学LangChain之Agent代理&#xff1a;解锁大模型应用新姿势 本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型开发 学习视频/籽料/面试题 都在这>>Github<< 什么是 LangChain Agent 代理 如果把大模型比作一个超级大脑&#xff0c;那么…

Spring Boot 2.6.0+ 循环依赖问题及解决方案

Spring Boot 2.6.0 循环依赖问题及解决方案 目录 背景解决方案 1. 配置文件开启循环依赖&#xff08;侵入性最低&#xff0c;临时方案&#xff09;2. Lazy 延迟注入&#xff08;侵入性低&#xff0c;推荐优先尝试&#xff09;3. 手动从容器获取&#xff08;ApplicationContex…

本地代码上传Github步骤

1.注册Github账号 2.下载git客户端 下载、安装步骤可以参考网站&#xff1a;(6 封私信 / 10 条消息) 手把手教你用git上传项目到GitHub&#xff08;图文并茂&#xff0c;这一篇就够了&#xff09;&#xff0c;相信你一定能成功&#xff01;&#xff01; - 知乎 3.在Github上…

5G NR 非地面网络 (NTN) 5G、太空和统一网络

非地面网络 5G 和太空&#xff1a;对 NTN 测试与测量的影响NTN 基站测试与测量NTN 用户设备的测试设备R&SSMW200A 矢量信号发生器R&SSMBV100B 矢量信号发生器总结5G 和太空&#xff1a;对 NTN 测试与测量的影响 5G 非地面网络 (NTN) 是无线通信向全球性星基和机载通信…

少儿编程比赛(如蓝桥杯、创意编程大赛等)的题目类型、知识点及难度总结

以下是针对主流少儿编程比赛&#xff08;如蓝桥杯、创意编程大赛等&#xff09;的题目类型、知识点及难度总结&#xff0c;结合了Scratch和C等语言的真题分析&#xff0c;帮助备赛或教学参考&#xff1a; 一、基础操作与交互题&#xff08;适合6~10岁&#xff09; 考察图形化编…

SIFThinker: Spatially-Aware Image Focus for Visual Reasoning

SIFThinker: Spatially-Aware Image Focus for Visual Reasoning Authors: Zhangquan Chen, Ruihui Zhao, Chuwei Luo, Mingze Sun, Xinlei Yu, Yangyang Kang, Ruqi Huang 相关工作总结 视觉思维链推理 最近的研究表明&#xff0c;通过上下文学习逐步推理可以显著提升大型…

学习嵌入式第二十五天

IO 1.概念 IO指input/outputLinux中一切皆文件IO的操作对象是文件 2.文件一段数据的集合文件通常存放在外存中&#xff0c;掉电后数据不丢失分类b(block&#xff0c;块设备文件) 按块扫描信息的文件。通常存储类型的设备为块设备文件。文件IOc(character&#xff0c;字符设备文…

本地部署接入 whisper + ollama qwen3:14b 总结字幕

1. 实现功能 M4-1 接入 whisper ollama qwen3:14b 总结字幕 自动下载视频元数据如果有字幕&#xff0c;只下载字幕使用 ollama 的 qwen3:14b 对字幕内容进行总结 2.运行效果 &#x1f50d; 正在提取视频元数据… &#x1f4dd; 正在下载所有可用字幕… [youtube] Extracting U…

【13-向量化-高效计算】

研究者能够扩展神经网络并构建非常大型网络的原因之一&#xff0c;就是神经网络可以被向量化&#xff0c;vectorized&#xff1b;可以非常高效地用矩阵地乘法实现。 事实上&#xff0c;并行计算硬件&#xff0c;例如GPU&#xff0c;一些CPU的功能&#xff0c;非常擅长进行非常大…

论文中PDF的公式如何提取-公式提取

Mathcheap - An AI-powered, free alternative to Mathpix Snip. 从PDF中截图公式&#xff0c;之后 ctrl V 转换成功 &#xff0c;提取成功 复制到word中&#xff0c;是这样的 这显然不是我们需要的。 可以使用Axmath 复制进去Axmath 就能正常显示公式。 之后再插入word…

用 Flink SQL 和 Paimon 打造实时数仓:深度解析与实践指南

1. 实时数仓的魅力&#xff1a;从离线到分钟级的飞跃实时数仓&#xff0c;听起来是不是有点高大上&#xff1f;其实它没那么神秘&#xff0c;但确实能让你的数据处理能力像坐上火箭一样飙升&#xff01;传统的离线数仓&#xff0c;像 Hadoop 生态的 Hive&#xff0c;动辄小时级…

【已解决】报错:WARNING: pip is configured with locations that require TLS/SSL

一、问题背景二、问题分析1. SSL模块缺失的本质2. Anaconda环境特点三、问题表现四、解决方案详解1. 完整配置环境变量2. 添加环境变量的步骤3. 测试验证五、实战示例六、附加建议七、总结八、参考链接一、问题背景 在Windows 10系统中使用Python的包管理工具pip时&#xff0c…

Java项目基本流程(三)

一、页面初始化阶段&#xff08;加载即执行&#xff09;加载栏目列表&#xff08;同步请求&#xff09;发送同步 AJAX 请求到SearchChannel接口&#xff0c;获取所有栏目数据。清空下拉框&#xff08;.channelid&#xff09;后&#xff0c;先添加 “全部” 选项&#xff0c;再循…

鹧鸪云光伏仿真:项目前期决策的“数据明灯”

曾有一处光伏项目&#xff0c;在精心筹备数月后终于建成&#xff0c;却在运行初期即因未充分评估山体遮挡影响&#xff0c;导致实际发电量较预期大幅降低近一成。前期决策中的微小疏漏&#xff0c;往往成为项目经济性与可行性的致命伤。而鹧鸪云光伏仿真软件正是一盏照亮前路的…

开发指南129-基础类-BaseController

所有接口都需要继承BaseControllerBaseController里有很多有用的方法&#xff0c;现举例最重要的几个&#xff1a;1、getURI返回接口地址&#xff0c;就是PostMapping或GetMapping中定义的接口地址。常用于返回值中&#xff0c;例如接口的异常处理&#xff1a;try {// 处理逻辑…

C++高频知识点(十八)

文章目录86. C多线程中&#xff0c;锁的实现方式有哪些&#xff1f;1. 互斥锁&#xff08;Mutex&#xff09;2. 递归互斥锁&#xff08;Recursive Mutex&#xff09;3. 读写锁&#xff08;Shared Mutex&#xff09;4. 自旋锁&#xff08;Spinlock&#xff09;5. 条件变量&#…

【C语言强化训练16天】--从基础到进阶的蜕变之旅:Day1

&#x1f525;个人主页&#xff1a;草莓熊Lotso &#x1f3ac;作者简介&#xff1a;C研发方向学习者 &#x1f4d6;个人专栏&#xff1a; 《C语言》 《数据结构与算法》《C语言刷题集》《Leetcode刷题指南》 ⭐️人生格言&#xff1a;生活是默默的坚持&#xff0c;毅力是永久的…

【软考中级网络工程师】知识点之 TCP 协议深度剖析

目录一、TCP 协议简介二、TCP 协议的特点2.1 面向连接2.2 可靠性高2.3 拥塞控制2.4 全双工通信2.5 高效性2.6 支持多种应用协议2.7 可靠的错误恢复三、TCP 协议的工作机制3.1 三次握手建立连接3.2 数据传输3.3 四次挥手关闭连接四、TCP 协议的数据包格式五、TCP 协议在实际应用…

操作系统1.5:操作系统引导

目录 总览 什么是操作系统引导&#xff1f; 磁盘里边有哪些相关数据? 操作系统引导(开机过程&#xff09; 总览 什么是操作系统引导&#xff1f; 操作系统引导(boot)——开机的时候&#xff0c;怎么让操作系统运行起来? 磁盘里边有哪些相关数据? 一个刚买来的磁盘(硬…