在Kafka数据写入流程中,Broker端负责接收客户端发送的消息,并将其持久化存储,是整个流程的关键环节。本文将深入Kafka Broker的源码,详细解析消息接收、处理和存储的具体实现。

一、网络请求接收与解析

Broker通过Processor线程池接收来自客户端的网络请求,Processor线程基于Java NIO的Selector实现非阻塞I/O,负责监听网络连接和读取数据。其核心处理逻辑如下:

public class Processor implements Runnable {private final Selector selector;private final KafkaApis kafkaApis;public Processor(Selector selector, KafkaApis kafkaApis) {this.selector = selector;this.kafkaApis = kafkaApis;}@Overridepublic void run() {while (!stopped) {try {// 轮询获取就绪的网络事件selector.poll(POLL_TIMEOUT);Set<SelectionKey> keys = selector.selectedKeys();for (SelectionKey key : keys) {if (key.isReadable()) {// 读取网络数据NetworkReceive receive = selector.read(key);if (receive != null) {// 处理接收到的请求kafkaApis.handle(receive);}}}} catch (Exception e) {log.error("Processor failed to process requests", e);}}}
}

Selector检测到有可读事件时,会从对应的SocketChannel中读取数据,并封装成NetworkReceive对象,然后传递给KafkaApis进行进一步处理。

KafkaApis是Broker处理请求的核心组件,它根据请求类型调用相应的处理器:

public class KafkaApis {private final Map<ApiKeys, RequestHandler> requestHandlers;public KafkaApis(Map<ApiKeys, RequestHandler> requestHandlers) {this.requestHandlers = requestHandlers;}public void handle(NetworkReceive receive) {try {// 解析请求头RequestHeader header = RequestHeader.parse(receive.payload());ApiKeys apiKey = ApiKeys.forId(header.apiKey());// 获取对应的请求处理器RequestHandler handler = requestHandlers.get(apiKey);if (handler != null) {// 处理请求handler.handle(receive);} else {// 处理未知请求类型handleUnknownRequest(header, receive);}} catch (Exception e) {// 处理请求解析和处理过程中的异常handleException(receive, e);}}
}

对于生产者发送的消息写入请求(ApiKeys.PRODUCE),会由ProduceRequestHandler进行处理。

二、消息写入处理与验证

ProduceRequestHandler负责处理生产者发送的消息写入请求,其核心职责包括验证请求合法性、将消息写入对应分区日志以及生成响应。关键处理逻辑如下:

public class ProduceRequestHandler implements RequestHandler {private final LogManager logManager;private final ReplicaManager replicaManager;public ProduceRequestHandler(LogManager logManager, ReplicaManager replicaManager) {this.logManager = logManager;this.replicaManager = replicaManager;}@Overridepublic void handle(NetworkReceive receive) {try {// 解析ProduceRequestProduceRequest request = ProduceRequest.parse(receive.payload());// 验证请求版本和元数据validateRequest(request);// 处理每个分区的消息Map<TopicPartition, PartitionData> partitionDataMap = new HashMap<>();for (Map.Entry<TopicPartition, MemoryRecords> entry : request.data().entrySet()) {TopicPartition tp = entry.getKey();MemoryRecords records = entry.getValue();// 获取分区日志Log log = logManager.getLog(tp);if (log != null) {// 将消息追加到日志LogAppendInfo appendInfo = log.append(records);// 记录分区数据信息partitionDataMap.put(tp, new PartitionData(appendInfo.offset(), appendInfo.logAppendTime()));} else {// 处理分区不存在的情况partitionDataMap.put(tp, new PartitionData(RecordBatch.NO_OFFSET, -1L));}}// 构建响应ProduceResponse response = new ProduceResponse(request.version(), request.correlationId(), partitionDataMap);// 发送响应sendResponse(response, receive);} catch (Exception e) {// 处理请求处理过程中的异常handleException(receive, e);}}
}

在上述代码中,validateRequest方法会对请求的版本、主题和分区的合法性进行检查;log.append方法将消息追加到对应分区的日志文件中;最后根据处理结果构建ProduceResponse响应,并发送回给生产者。

三、消息持久化存储

Kafka使用日志(Log)来持久化存储消息,每个分区对应一个日志实例。Log类负责管理日志文件、分段以及消息的读写操作,其核心的消息追加方法如下:

public class Log {private final LogSegmentManager segmentManager;// 省略其他成员变量public LogAppendInfo append(MemoryRecords records) throws IOException {try {// 获取当前活跃的日志分段LogSegment segment = segmentManager.activeSegment();long offset = segment.sizeInBytes();long baseOffset = segment.baseOffset();// 将消息追加到日志分段long appended = segment.append(records);// 更新日志元数据updateHighWatermark(segment);// 返回追加信息return new LogAppendInfo(baseOffset + offset, time.milliseconds());} catch (Exception e) {// 处理写入异常handleWriteException(e);throw e;}}
}

LogSegment类表示一个日志分段,它包含了日志文件、索引文件等,具体的消息写入操作在LogSegmentappend方法中完成:

public class LogSegment {private final FileMessageSet fileMessageSet;// 省略其他成员变量public long append(MemoryRecords records) throws IOException {// 计算写入位置long position = fileMessageSet.sizeInBytes();// 将消息写入文件long written = fileMessageSet.append(records);// 更新索引updateIndex(records.sizeInBytes(), position);return written;}
}

FileMessageSet类负责实际的文件I/O操作,它利用Java NIO的FileChannel实现高效的磁盘写入,并且支持零拷贝技术,进一步提升写入性能:

public class FileMessageSet {private final FileChannel fileChannel;// 省略其他成员变量public long append(MemoryRecords records) throws IOException {try (FileLock lock = fileChannel.lock()) {// 使用零拷贝技术写入数据long written = fileChannel.transferFrom(new ReadOnlyByteBufferChannel(records.buffer()), sizeInBytes(), records.sizeInBytes());sizeInBytes += written;return written;}}
}

通过上述一系列操作,Kafka将接收到的消息高效、可靠地持久化存储到磁盘中,保证了数据的安全性和一致性。

通过对Kafka Broker端数据写入流程的源码剖析,我们全面了解了从网络请求接收到消息持久化存储的完整过程。各组件通过严谨的设计和高效的实现,确保了Kafka在高并发场景下能够稳定、快速地处理大量消息写入请求,为整个消息系统的可靠运行提供了坚实保障。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/85045.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/85045.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/85045.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

名称 深度学习(监督学习) Iteration 一次 mini-batch 前向+反向传播更新 Epoch 所有数据集训练一遍。这两个概念不一样吗?

你的问题非常专业&#xff0c;确实容易混淆&#xff0c;下面我用科研术语 通俗比喻来清晰地区分&#xff1a; ✅ 简明对照表&#xff1a;Iteration vs. Epoch&#xff08;在监督学习中&#xff09; 名称专业术语解释通俗理解是否属于监督学习&#xff08;深度学习&#xff09…

Blender 模型下载

1.CGModel网站 https://www.cgmodel.com/ 免费模型数量多&#xff0c;国风类题材的模型多&#xff0c;中文搜索方便 2.Blender官网 3.3d溜溜网 https://3d.3d66.com/ 4.free3d.com free3d.com

Spring涉及的设计模式以及实际使用场景(含代码)

Spring涉及的设计模式以及实际使用场景(含代码) 1.工厂模式&#xff08;Factory Pattern&#xff09; 作用: 隐藏对象创建的细节&#xff0c;通过工厂类统一管理对象的实例化。 场景&#xff1a;Spring的BeanFactory和ApplicationContext是工厂模式的典型实现。 // 通过App…

ROM 只读存储器 随机存取

ROM&#xff08;Read-Only Memory&#xff0c;只读存储器&#xff09;的存取方式为&#xff1a; ✅ 随机存取方式&#xff08;Random Access&#xff09; 尽管“ROM”强调的是“只读”&#xff0c;它的数据访问方式与 RAM 类似&#xff0c;都是随机存取。 &#x1f50d; 解释如…

opensuse解决微信无法登录的问题

思路启发 https://forum.suse.org.cn/t/topic/17183/2 实际解决 https://forum.suse.org.cn/t/topic/17204/5 解决方法 先安装 sudo zypper install execstackcd /opt/wechatsudo bash -c execstack -c ./*.so

Adixen ASM380 氦气检漏仪 阿尔卡特Mobile high performance helium leak detector

Adixen ASM380 氦气检漏仪 阿尔卡特Mobile high performance helium leak detector

堆的自动管理

由于程序员必须编写出到分配和释放存储器的明确的调用&#xff0c;所以用m a l l o c和f r e e完成指针的动态分配和重新分配是管理堆的手工( m a n u a l )方法。相反地&#xff0c;运行时栈则是由调用序列自动地( a u t o m a t i c a l l y )管理。在一种需要完全动态的运行…

智能出入库管理系统:自动化管控平台

部队装备库室智能管控系统是集智能化、集成化、网络化于一体的综合管理系统&#xff0c;由智慧营区库室综合管控平台、出入口控制子系统、智能QD柜子系统、装备物资管理子系统、视频监控系统、入侵报警子系统、环境监测子系统等七大核心子系统构成。各子系统通过数据自动交互&a…

归并排序:高效分治的艺术

归并排序(Merge Sort)原理详解 归并排序是一种基于分治法(Divide and Conquer)的高效排序算法,由冯诺依曼于1945年提出。它的核心思想是将大问题分解为小问题,解决小问题后再合并结果。 核心原理 1. 分治策略(Divide and Conquer) 分(Divide):将无序数组递归地拆…

知识库建设方案有哪些?全面解析

知识库建设方案主要包括本地部署方案、云端在线方案、混合部署方案。其中&#xff0c;云端在线方案以其灵活性、实时更新能力和低维护成本&#xff0c;逐渐成为大多数企业的首选方案。云端在线方案可随时随地提供实时更新的知识内容&#xff0c;确保企业员工和客户始终获得最新…

政务大厅智能引导系统:基于数字孪生的技术架构与实践

本文面向政务信息化开发者、系统集成工程师、智能导视领域技术人员。解析政务大厅智能引导系统的技术实现路径&#xff0c;提供从定位导航到数据驱动的技术方案&#xff0c;助力解决传统导视系统效率低下、体验不佳的技术痛点。 一、技术架构全景&#xff1a;从物理空间到数字映…

java设计模式[2]之创建型模式

文章目录 一 创建型模式1.1 单例模式的设计与实现1.1.1 饿汉式模式1.1.2 懒汉式单例模式1.1.3 懒汉式单例模式完善1.1.4 双重检测锁式1.1.4.1 volatile关键字1.1.4.2 在双重检查锁定中的作用 1.1.5 静态内部类式单例模式1.1.6 枚举式单例模式1.1.7 反射暴力破解解决方案1.1.8 序…

PHP设计模式实战:构建高性能API服务

在前一篇电子商务系统设计的基础上,我们将深入探讨如何运用设计模式构建高性能、可扩展的API服务。现代Web应用越来越依赖API作为前后端分离架构的核心,良好的API设计对系统性能和维护性至关重要。 仓库模式实现数据访问层 仓库模式(Repository Pattern)可以抽象数据访问逻…

ComfyUI Flux.1 ACE++ 图像编辑原理详解

关注不迷路&#xff0c;点赞走好运&#xff01;&#xff01;&#xff01; ComfyUI Flux.1 ACE 图像编辑原理详解 ——从“拼图游戏”到“魔法画笔”的技术革命 目录 ACE 的核心思想&#xff1a;用“指令”指挥图像生成 1.1 什么是上下文感知内容填充&#xff1f;1.2 条件单元&…

Datawhale-爬虫

task1-初始爬虫 爬虫用python好&#xff0c;python库多&#xff0c;功能全 反爬机制和反反爬机制 顾名思义&#xff0c;一个是防范爬虫的&#xff0c;一个是应对限制爬虫的方法 好的&#xff0c;我们来更深入地探讨反爬机制和反反爬策略的细节&#xff0c;包括具体的技术手段…

双token三验证(Refresh Token 机制​)

单token存在的问题 我们都知道&#xff0c;token是我们在前后端数据传输的时候为了保证安全从而必须需要进行设置的东西&#xff0c;他的主要作用实际上就是为了保证我们的数据安全&#xff0c;进行身份验证和授权&#xff0c;并且相对于session而言更加适合如今的分布式系统&a…

青少年编程与数学 01-011 系统软件简介 22 VMware 虚拟化软件

青少年编程与数学 01-011 系统软件简介 22 VMware 虚拟化软件 一、历史沿革&#xff08;一&#xff09;创立阶段&#xff08;1998-2003&#xff09;&#xff08;二&#xff09;快速扩张&#xff08;2004-2010&#xff09;&#xff08;三&#xff09;云时代转型&#xff08;2011…

FPGA基础 -- Verilog门级建模之奇偶校验电路

✅ 一、什么是奇偶校验&#xff08;Parity Check&#xff09; &#x1f4cc; 定义&#xff1a; 奇偶校验是一种错误检测编码方式&#xff0c;用于判断一个二进制数据在传输或存储过程中是否发生了单比特错误。 奇校验&#xff08;Odd Parity&#xff09;&#xff1a;总共有奇…

UWB协议精读:IEEE 802.15.4z-2020,15. HRP UWB PHY, STS, HRP-ERDEV, BPRF, HPRF,

跟UWB相关的IEEE标准主要有2个: 1,IEEE 802.15.4-2020 2,IEEE 802.15.4z-2020 IEEE Std 802.15.4z™ ‐ 2020 Amendment 1: Enhanced Ultra Wideband (UWB) Physical Layers (PHYs) and Associated Ranging Techniques scrambled timestamp sequence (STS): A sequence of…

6.IK分词器拓展词库

比如一些行业专业词汇、简单无意义词&#xff08;例如&#xff1a;的、得、地、是等)、网络流行词、后来形成的词、再或者一些禁忌词&#xff08;比如&#xff1a;领导人的名字、黄赌毒犯罪等词要排除的&#xff09; 在es的插件目录下查找配置文件&#xff1a; 找到IKAnalyzer…