欢迎来到啾啾的博客🐱。
记录学习点滴。分享工作思考和实用技巧,偶尔也分享一些杂谈💬。
有很多很多不足的地方,欢迎评论交流,感谢您的阅读和评论😄。

目录

  • 1 引言
  • 2 缓冲区
    • 2.1 消息在Partition内有序
    • 2.2 批次消息ProducerBatch
      • 2.2.1 内存分配
      • 2.2.2 线程安全
  • 3 发送消息Sender
  • 4 总结

1 引言

继续看Kafka源码,看其是如何批量发送消息的。

2 缓冲区

当调用producer.send(record)时,消息将先到缓冲区,在缓冲区按照目标的Topic-Partition进行组织,满足以条件后随批次发送给Broker。

// KafkaProducer.java
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// ... 省略了部分代码 ...return doSend(record, callback); // 转交给 doSend 方法
}private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {// ...// 1. 等待元数据更新(如果需要的话)// ...// 2.【核心步骤】调用 RecordAccumulator 的 append 方法RecordAccumulator.RecordAppendResult result = accumulator.append(tp,timestamp, key, value, headers, interceptors, remainingWaitMs);// ...// 3. 唤醒 Sender 线程,告诉他“可能有新活儿干了”this.sender.wakeup();// ...return result.future;
}

我们看一下“缓冲区”RecordAccumulator。
![[Kafka源码P2-缓冲区.png]]

2.1 消息在Partition内有序

RecordAccumulator维护了一个数据结构:

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches

append代码简化如下:

// RecordAccumulator.java
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, ...) {// 1. 获取该分区的批次队列 batches中获取,没有则创建Deque<ProducerBatch> dq = getOrCreateDeque(tp);synchronized (dq) { // 对该分区的队列加锁,保证线程安全// 2. 尝试追加到最后一个(当前活跃的)批次中ProducerBatch last = dq.peekLast();if (last != null) {FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, ...);if (future != null) {// 如果追加成功,直接返回return new RecordAppendResult(future, dq.size() > 1 || last.isFull(), false);}}// 3. 如果最后一个批次满了,或者不存在,就需要一个新的批次// 从 BufferPool 申请一块内存,大小由 batch.size 配置决定ByteBuffer buffer = free.allocate(batchSize, maxTimeToBlock);// 4. 创建一个新的 ProducerBatch (货运箱)ProducerBatch batch = new ProducerBatch(tp, memoryRecordsBuilder, now);FutureRecordMetadata future = batch.tryAppend(timestamp, key, value, headers, ...); // 把当前消息放进去// 5. 将新的批次加入到队列的末尾dq.addLast(batch);// ...return new RecordAppendResult(future, ...);}
}

可以看到batches的value类型为Deque,所以生产者可以维护发送时partition内的顺序结构。
但是在网络抖动时这样做还是不够,时序性还是难以保障,所以生产者还有别的配置:
每个连接上允许发送的未确认请求的最大数量

max.in.flight.requests.per.connection
  • 当 max.in.flight.requests.per.connection = 1 时,Sender 线程在发送完 Batch-1 后,会阻塞自己,直到 Batch-1 的请求得到响应(成功或失败),它绝不会在此期间发送 Batch-2。这样一来,即使 Batch-1 需要重试,Batch-2 也只能乖乖地在后面排队。这就从根本上杜绝了因重试导致乱序的可能。
  • 默认 max.in.flight.requests.per.connection = 5。即它允许 Producer 在还没收到 Batch-1 的 ACK 时,就继续发送 Batch-2、3、4、5。这极大地提升了吞吐量(不用傻等),但牺牲了顺序性。

一般max.in.flight.requests.per.connection还需要与生产者幂等性配合。

enable.idempotence = true

开启幂等后,Producer 会被分配一个唯一的 Producer ID (PID),并且它发送的每一批消息都会带上一个从0开始递增的序列号。Broker 端会为每个 TopicPartition 维护这个 PID 和序列号。如果收到的消息序列号不是预期的下一个,Broker 就会拒绝它。

// NetworkClient.java// 这个方法判断我们是否可以向某个节点发送更多数据
@Override
public boolean isReady(Node node, long now) {// ... 省略了连接状态的检查 ...// 检查在途请求数是否小于该连接配置的上限return !connectionStates.isBlackedOut(node.idString(), now) &&canSendRequest(node.idString(), now);
}// canSendRequest 方法内部会调用 inFlightRequests.canSendMore()
// InFlightRequests.java
public boolean canSendMore(String nodeId) {// this.requests 是一个 Map<String, Deque<NetworkClient.InFlightRequest>>// 它记录了每个节点上所有在途(已发送但未收到响应)的请求Deque<InFlightRequest> queue = requests.get(nodeId);// 如果队列为空,当然可以发送if (queue == null) {return true;}// 将在途请求数 与 从配置中读到的max.in.flight.requests.per.connection比较// this.maxInFlightRequestsPerConnection 就是你配置的那个值return queue.size() < this.maxInFlightRequestsPerConnection;
}

2.2 批次消息ProducerBatch

可以看到在结构private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches中,批次消息被封装为ProducerBatch。

![[Kafka源码P2-缓冲区-1.png]]

2.2.1 内存分配

这个类的核心是MemoryRecordsBuilder。
总所周知,频繁地创建和销毁对象,特别是大块的byte[]对GC非常不友好。MemoryRecordsBuilder内部管理者一个巨大的、连续的ByteBuffer。
这个 ByteBuffer 不是每次创建 ProducerBatch 时都 new 出来的。它是在 RecordAccumulator 初始化时(我们在上面的RecordAccumulator中有看到BufferPool),从一个叫 BufferPool 的内存池中借用 (allocate) 的。当 ProducerBatch 发送完毕,这块内存会归还 (deallocate) 给池子,供下一个 ProducerBatch 复用
当你调用 tryAppend 添加消息时,消息的 key, value 等内容被直接序列化成字节,并写入到这个 ByteBuffer 的末尾。它不是在发送时才做序列化,而是在追加时就完成了。

池化:对于那些需要频繁创建和销毁的、生命周期短暂的、昂贵的对象(如数据库连接、线程、大块内存),一定要使用池化技术。这能极大地降低GC压力,提升系统稳定性。

Redis和Kafka都有共同的高频内存使用的特性,也都设计了预分配和复用。Kafka生产者与其用多少申请多少,不如一次性申请一块大内存,然后通过内部的指针移动(position, limit)来管理这块内存的使用。

2.2.2 线程安全

ProducerBatch 会被多个线程访问:

  • 你的业务线程(Producer主线程):调用 tryAppend() 往里面写数据。
  • Sender 线程:检查它是否已满 (isFull)、是否超时 (isExpired),并最终把它发送出去。

ProducerBatch 内部有一个精密的“状态机”,并用 volatile 和 synchronized 保护。

// ProducerBatch.java (简化后)
private final List<Thunk> thunks;
private final MemoryRecordsBuilder recordsBuilder;// 【关键状态】volatile 保证了多线程间的可见性
private volatile boolean closed; 
private int appends; // 记录追加次数public FutureRecordMetadata tryAppend(...) {// 【关键检查】在方法入口处检查状态,快速失败if (this.closed) {return null; }// ... 将消息写入 recordsBuilder ...// ...
}// 这个方法会被 Sender 线程调用
public void close() {this.closed = true;
}// 当批次被确认后,由 Sender 线程调用
public void done(long baseOffset, long logAppendTime, RuntimeException exception) {// for-each 循环是线程安全的,因为 thunks 列表在 close 之后就不再被修改for (Thunk thunk : this.thunks) {try {// 【核心】执行每个 send() 调用对应的回调函数thunk.callback.onCompletion(metadata, exception);} catch (Exception e) {// ...}}
}

总的来说是职责分离+最小化锁的设计以保证线程安全。

3 发送消息Sender

Sender 是一个实现了 Runnable 接口的类,它在一个独立的线程里无限循环,最终发送消息。

// Sender.java
public void run() {while (running) {try {runOnce();} catch (Exception e) {// ...}}
}void runOnce() {// ...// 1. 【核心】找出所有可以发送的批次// linger.ms 决定了可以等待的最长时间RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);// 2. 如果有准备好的节点(分区),就发送它们if (!result.readyNodes.isEmpty()) {// ...// 从累加器中“榨干”所有准备好的批次Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, ...);// ...// 将批次转换成网络请求并发送sendProducerData(now, ... , batches);}// ...
}

RecordAccumulator.ready() 方法是决定何时发送的关键。它会遍历所有的 ProducerBatch,满足以下任意一个条件的批次,就会被认为是 “ready”(准备就绪):

  1. 批次已满:批次大小达到了 batch.size。
  2. 等待超时:批次从创建到现在,等待的时间超过了 linger.ms。
  3. 其他原因:比如 Producer 被关闭,或者有新的 Producer 加入导致需要立即发送等。

Sender 的工作模式是:
不断地问accumulator.ready()有到linger.ms时间的或者装满batch.size的批次没有。然后依据节点列表,通过NetworkClient发送ProducerBatch到Kafka Broker。

4 总结

Kafka Producer 在客户端内部通过 RecordAccumulator 维护了一个按 TopicPartition 分类的内存缓冲区。当用户调用 send() 方法时,消息并不会立即发送,而是被追加到对应分区的某个 ProducerBatch 中。一个独立的 Sender 线程在后台运行,它会持续检查 RecordAccumulator 中的批次,一旦某个批次满足了“大小达到 batch.size”或“等待时间超过 linger.ms”这两个条件之一,Sender 线程就会将这个批次以及其他所有准备好的批次一同取出,打包成一个请求,通过网络一次性发送给 Broker,从而实现批量发送,极大地提升了吞吐能力。

这个设计是经典的 “空间换时间”“攒一批再处理” 的思想,通过牺牲一点点延迟(linger.ms),换取了巨大的吞吐量提升。理解了这个机制,你就能更好地去配置 batch.size 和 linger.ms 这两个核心参数,以平衡你的业务对吞吐和延迟的需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/85270.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/85270.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/85270.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣网C语言编程题:三数之和

一. 简介 本文记录力扣网上的逻辑编程题&#xff0c;涉及数组方面的&#xff0c;这里记录一下 C语言实现和Python实现。 二. 力扣网C语言编程题&#xff1a;三数之和 题目&#xff1a;三数之和 给你一个整数数组 nums &#xff0c;判断是否存在三元组 [nums[i], nums[j], nu…

2.2 Windows MSYS2编译FFmpeg 4.4.1

一、安装编译工具 # 更换pacman源 sed -i "s#mirror.msys2.org/#mirrors.ustc.edu.cn/msys2/#g" /etc/pacman.d/mirrorlist* pacman -Sy# 安装依赖 pacman -S --needed base-devel mingw-w64-x86_64-toolchain pacman -S mingw-w64-x86_64-nasm mingw-w64-x86_64-ya…

驱动开发,队列,环形缓冲区:以GD32 CAN 消息处理为例

对环形缓冲区进行进一步的优化和功能扩展&#xff0c;以应对更复杂的实际应用场景&#xff0c;特别是针对 CAN 总线消息处理的场景。 一、优化点 1&#xff1a;动态配置环形缓冲区大小在原始实现中&#xff0c;我们固定了缓冲区大小为 RINGBUFF_LEN 64。这种方式虽然简单&am…

SQL基础知识,MySQL学习(长期更新)

1、基本操作&#xff0c;增删查改 INSERT INTO 表名 (字段1, 字段2, ...) VALUES (值1, 值2, ...); DELETE FROM 表名 WHERE 条件 SELECT * FROM 表名 WHERE 条件 UPDATE 表名 SET 字段1 值, 字段2 值, ... WHERE 条件; SELECT * INTO 新表 FROM 旧表 WHERE… INSERT INTO 语…

Git(一):初识Git

文章目录 Git(一)&#xff1a;初识GitGit简介核心功能分布式特性结构与操作优势与适用场景 创建本地仓库git init配置name与email--global 工作区、暂存区与版本库git addgit commitcommit后.git的变化 Git(一)&#xff1a;初识Git Git简介 Git 是一个分布式版本控制系统&…

第19天:初级数据库学习笔记3

分组函数&#xff08;多行处理函数&#xff09; 即多个输入对应一个输出。前面讲的数据处理函数是单行处理函数。&#xff08;在公司中常说单&#xff0c;多行处理函数&#xff09; 分组函数包括五个&#xff1a; max&#xff1a;最大值min&#xff1a;最小值avg&#xff1a…

Windows11下搭建Raspberry Pi Pico编译环境

1. 系统与工具要求 PC平台&#xff1a; Windows 11 专业版 Windows GCC: gcc-15.1.0-64.exe GNU Make: 4.3 Git: 2.49.0 cmake: 4.0.2 python:3.12.11 Arm GNU Toolchain Downloads – Arm Developer 2. 工具安装与验证 2.1 工具安装 winget安装依赖工具&#xff08;Windows …

【C语言极简自学笔记】重讲运算符

一、算术操作符 算术操作符描述把两个操作数相加-第一个操作数减去第二个操作数*把两个操作数相乘/分子除以分母%取模运算符&#xff0c;整除后的余数 注意&#xff1a;1.除号的两端都是整数的时候执行的是整数的除法&#xff0c;两端只要有一个浮点数&#xff0c;就执行浮点…

持续集成 CI/CD-Jenkins持续集成GitLab项目打包docker镜像推送k8s集群并部署至rancher

Jenkins持续集成GitLab项目 GitLab提交分支后触发Jenkis任务 之前是通过jar包在shell服务器上进行手动部署&#xff0c;麻烦且耗时。现通过Jenkins进行持续集成实现CI/CD。以test分支为例 提交即部署。 由于是根据自己实际使用过程 具体使用到了 gitlabjenkinsdockerharborra…

Apache Iceberg与Hive集成:非分区表篇

引言 在大数据处理领域&#xff0c;Apache Iceberg凭借其先进的表格式设计&#xff0c;为大规模数据分析带来了新的可能。当Iceberg与Hive集成时&#xff0c;这种强强联合为数据管理与分析流程提供了更高的灵活性和效率。本文将聚焦于Iceberg与Hive集成中的非分区表场景&#…

webpack 如何区分开发环境和生产环境

第一种方法: 方法出处&#xff1a;命令行接口&#xff08;CLI&#xff09; | webpack 中文文档 1.利用webpack.config.js 返回的是个函数&#xff0c;利用函数的参数&#xff0c;来区分环境 具体步骤 1&#xff09; package.json文件&#xff1a;在npm scripts 命令后面追加 …

React组件通信——context(提供者/消费者)

Context 是 React 提供的一种组件间通信方式&#xff0c;主要用于解决跨层级组件 props 传递的问题。它允许数据在组件树中"跨级"传递&#xff0c;无需显式地通过每一层 props 向下传递。 一、Context 核心概念 1. 基本组成 React.createContext&#xff1a;创建 C…

“微信短剧小程序开发指南:从架构设计到上线“

1. 引言&#xff1a;短剧市场的机遇与挑战 近年来&#xff0c;短视频和微短剧市场呈现爆发式增长&#xff0c;用户碎片化娱乐需求激增。短剧小程序凭借轻量化、社交传播快、变现能力强等特点&#xff0c;成为内容创业的新风口。然而&#xff0c;开发一个稳定、流畅且具备商业价…

RPC与RESTful对比:两种API设计风格的核心差异与实践选择

# RPC与RESTful对比&#xff1a;两种API设计风格的核心差异与实践选择 ## 一、架构哲学与设计目标差异 1. **RPC&#xff08;Remote Procedure Call&#xff09;** - **核心思想**&#xff1a;将远程服务调用伪装成本地方法调用&#xff08;方法导向&#xff09; - 典型行为…

【pytest进阶】pytest之钩子函数

什么是 hook (钩子)函数 经常会听到钩子函数(hook function)这个概念,最近在看目标检测开源框架mmdetection,里面也出现大量Hook的编程方式,那到底什么是hook?hook的作用是什么? what is hook ?钩子hook,顾名思义,可以理解是一个挂钩,作用是有需要的时候挂一个东西…

深度学习计算——动手学深度学习5

环境&#xff1a;PyCharm python3.8 1. 层和块 块&#xff08;block&#xff09;可以描述 单个层、由多个层组成的组件或整个模型本身。 使用块进行抽象的好处&#xff1a; 可将块组合成更大的组件(这一过程通常是递归) 如 图5.1.1所示。通过定义代码来按需生成任意复杂度…

NodeJS的fs模块的readFile和createReadStream区别以及常见方法

Node.js 本身没有像 Java 那样严格区分字符流和字节流&#xff0c;区别主要靠编码&#xff08;encoding&#xff09;来控制数据是以 Buffer&#xff08;二进制字节&#xff09;形式还是字符串&#xff08;字符&#xff09;形式处理。 详细解释&#xff1a; 方面JavaNode.js字节…

基于二进制XOR运算的机器人运动轨迹与对称图像自动生成算法

原创&#xff1a;项道德&#xff08;daode3056,daode1212&#xff09; 新的算法出现&#xff0c;往往能给某些行业与产业带来革命与突破。为探索机器人运动轨迹与对称图像自动生成算法&#xff0c;本人已经通过18种算法的测试&#xff0c;最终&#xff0c;以二进制的XOR运算为…

Spring AI 项目实战(七):Spring Boot + Spring AI Tools + DeepSeek 智能工具平台(附完整源码)

系列文章 序号文章名称1Spring AI 项目实战(一):Spring AI 核心模块入门2Spring AI 项目实战(二):Spring Boot + AI + DeepSeek 深度实战(附完整源码)3Spring AI 项目实战(三):Spring Boot + AI + DeepSeek 打造智能客服系统(附完整源码)4Spring AI 项目实战(四…

spring-webmvc @RequestHeader 典型用法

典型用法 基础用法&#xff1a;获取指定请求头值 GetMapping("/info") public String getInfo(RequestHeader("User-Agent") String userAgent) {return "User-Agent: " userAgent; }如果请求中包含 User-Agent 请求头&#xff0c;则其值将被…