四、协议实现机制探秘

4.1 生产者协议

4.1.1 消息发送流程

Producer 在向 Kafka 集群发送消息时,首先会根据分区策略选择目标分区 。常见的分区策略有轮询、按消息键的哈希值分区以及自定义分区策略 。如果生产者在发送消息时指定了分区号,那么消息就会直接被发送到指定的分区;若未指定分区号,但指定了消息的键(key),则会根据键的哈希值对分区数量取模,得到的结果就是消息要发送到的分区号;若分区号和键都未指定,生产者会采用轮询的方式,依次将消息发送到各个分区,以实现负载均衡。例如,假设有一个包含 3 个分区的 Topic,当生产者未指定分区和键时,第一条消息会被发送到分区 0,第二条发送到分区 1,第三条发送到分区 2,第四条又回到分区 0,以此类推。

消息发送前,Producer 会对消息进行序列化处理,将消息对象转换为字节数组,以便在网络中传输 。然后,消息会被批量处理,Producer 会将多条消息组合成一个批次(batch),这样可以减少网络请求的次数,提高传输效率 。批次的大小可以通过batch.size参数进行配置,默认值为 16384 字节 。当批次中的消息大小达到batch.size,或者等待时间达到linger.ms(默认值为 0,即不等待)时,Producer 就会将批次中的消息发送出去。

在发送消息时,Producer 会与目标分区的 Leader Partition 建立 TCP 连接,并将消息发送给它 。如果当前没有可用的连接,Producer 会创建新的连接 。为了提高性能,Producer 还会对连接进行复用,避免频繁地创建和销毁连接 。在实际应用中,通过合理调整分区策略、批次大小以及连接管理参数,可以有效提升生产者的发送效率和系统的整体性能。例如,在高并发场景下,适当增大batch.size和linger.ms的值,可以减少网络开销,提高吞吐量,但同时也会增加消息的延迟;而在对消息实时性要求较高的场景下,则需要减小这些值,以降低延迟。

4.1.2 消息确认机制

Producer 发送消息后,需要等待 Broker 的确认(ACK),以确保消息已经成功发送到 Kafka 集群 。Kafka 提供了三种不同的确认级别,通过acks参数进行配置:

  • acks = 0:Producer 发送消息后,不需要等待 Broker 的确认,就认为消息已经成功发送 。这种方式的发送速度最快,但可靠性最低,因为如果在发送过程中出现网络故障或 Broker 故障,消息可能会丢失 。例如,在一些对数据准确性要求不高的场景,如日志收集,为了追求高吞吐量,可以采用这种确认级别。
  • acks = 1:Producer 发送消息后,会等待 Leader Partition 将消息写入日志文件后,才认为消息发送成功 。这种方式在一定程度上保证了消息的可靠性,但如果在 Leader Partition 将消息写入日志后,还未将消息同步给 Follower Partition 时,Leader Partition 所在的 Broker 发生故障,那么这条消息可能会丢失 。在一些对数据可靠性有一定要求,但又希望保持较高吞吐量的场景下,可以选择这种确认级别。
  • acks = -1 或 acks = all:Producer 发送消息后,会等待所有在同步副本集合(ISR)中的副本都将消息写入日志后,才认为消息发送成功 。这种方式提供了最高的可靠性,即使 Leader Partition 发生故障,也能保证消息不会丢失 。不过,由于需要等待所有副本的确认,这种方式的延迟最高,吞吐量相对较低 。在对数据可靠性要求极高的场景,如金融交易数据处理,通常会采用这种确认级别。

消息确认机制直接影响着消息的可靠性语义。在选择确认级别时,需要根据具体的业务需求,在可靠性和性能之间进行权衡 。例如,在电商订单处理系统中,订单信息的准确性至关重要,就应该选择acks = -1的确认级别,以确保订单消息不会丢失;而在一些实时监控系统中,对于偶尔丢失一些监控数据的容忍度较高,更注重系统的吞吐量和实时性,就可以选择acks = 0或acks = 1的确认级别。

4.2 消费者协议

4.2.1 消息拉取流程

Consumer 向 Kafka 集群拉取消息时,首先会向 Kafka 集群发送 Fetch 请求 。在请求中,Consumer 需要指定要拉取消息的 Topic、Partition 以及起始的偏移量(offset) 。Kafka 集群接收到 Fetch 请求后,会根据请求中的信息,找到对应的 Partition 的 Leader Partition,并从 Leader Partition 中读取消息 。

在拉取消息时,Consumer 可以通过fetch.min.bytes和fetch.max.bytes参数来控制每次拉取的最小和最大字节数 。fetch.min.bytes表示 Consumer 期望每次拉取到的最小数据量,默认值为 1 字节 。当 Broker 中可用的消息字节数小于fetch.min.bytes时,Broker 会等待,直到有足够的数据可供拉取,或者等待时间超过fetch.wait.max.ms(默认值为 500 毫秒) 。fetch.max.bytes表示 Consumer 每次拉取消息的最大字节数,默认值为 52428800 字节(50MB) 。通过合理配置这两个参数,可以优化 Consumer 的拉取性能。例如,在处理大数据量的场景下,可以适当增大fetch.max.bytes的值,减少拉取次数,提高效率;而在对实时性要求较高的场景下,可以减小fetch.min.bytes和fetch.wait.max.ms的值,降低延迟。

Consumer 在拉取消息时,还可以指定消费的起始位置(offset) 。如果 Consumer 是第一次消费某个 Partition 的消息,它可以从最早的消息开始消费(offset = 0),也可以从最新的消息开始消费(offset = -1) 。如果 Consumer 之前已经消费过该 Partition 的消息,它可以根据之前记录的 offset 继续消费 。在实际应用中,根据业务需求选择合适的起始位置非常重要。例如,在数据备份和恢复场景中,可能需要从最早的消息开始消费,以确保数据的完整性;而在实时数据分析场景中,通常只需要从最新的消息开始消费,获取最新的业务数据。

4.2.2 Offset 管理

在 Kafka 中,Offset 由 Consumer 自己维护 。Consumer 在消费消息的过程中,会定期将自己已经消费到的 offset 提交到 Kafka 集群中 。Offset 的提交方式有自动提交和手动提交两种 。

  • 自动提交:Consumer 可以通过设置enable.auto.commit参数为true来开启自动提交功能 。自动提交的时间间隔可以通过auto.commit.interval.ms参数进行配置,默认值为 5000 毫秒 。在自动提交模式下,Consumer 会每隔auto.commit.interval.ms毫秒,自动将当前消费到的 offset 提交到 Kafka 集群 。这种方式简单方便,但存在一定的风险。例如,如果在自动提交 offset 后,Consumer 还未处理完消息就发生了故障,那么下次重启后,Consumer 会从已提交的 offset 开始消费,可能会导致部分消息被重复消费。
  • 手动提交:Consumer 可以通过设置enable.auto.commit参数为false来关闭自动提交功能,然后使用commitSync()或commitAsync()方法手动提交 offset 。commitSync()方法会同步提交 offset,即等待 Kafka 集群确认提交成功后才返回 。这种方式可以确保 offset 的提交成功,但会阻塞 Consumer 的线程,影响消费效率 。commitAsync()方法会异步提交 offset,即不会等待 Kafka 集群的确认就返回 。这种方式不会阻塞 Consumer 的线程,提高了消费效率,但由于是异步提交,可能会出现提交失败的情况 。在实际应用中,为了保证数据的准确性,通常会在消息处理完成后,手动提交 offset 。例如,在处理订单消息时,当订单处理完成后,再手动提交 offset,这样可以确保不会重复处理订单。

Offset 的更新时机对消息消费语义有着重要影响 。如果在消息处理之前就提交 offset,那么当 Consumer 发生故障重启后,可能会导致部分消息未被处理就被认为已经消费,从而出现消息丢失的情况,这就是 “at most once” 的消费语义 。如果在消息处理完成后才提交 offset,那么当 Consumer 发生故障重启后,可能会导致部分消息被重复消费,这就是 “at least once” 的消费语义 。在实际应用中,需要根据业务需求选择合适的消费语义和 offset 更新时机 。例如,在一些对数据准确性要求极高的场景,如金融交易处理,通常会选择 “at least once” 的消费语义,并在消息处理完成后再提交 offset;而在一些对数据准确性要求不高,但对实时性要求较高的场景,如实时监控数据处理,可以选择 “at most once” 的消费语义,以提高处理速度。

4.3 副本同步协议

4.3.1 备份机制

Kafka 对每个 topic 的 partition 进行备份,以保证数据的可靠性和高可用性 。每个 partition 都有一个 Leader 副本和若干个 Follower 副本 。Leader 副本负责处理来自 Producer 和 Consumer 的读写请求,而 Follower 副本则负责从 Leader 副本同步消息,保持与 Leader 副本的数据一致性 。

Follower 副本通过向 Leader 副本发送 Fetch 请求来同步消息 。在 Fetch 请求中,Follower 副本会携带自己当前的日志末端偏移量(LEO,Log End Offset),表示它已经同步到的消息位置 。Leader 副本接收到 Fetch 请求后,会从自己的日志中读取从 Follower 副本的 LEO 开始的消息,并将这些消息发送给 Follower 副本 。Follower 副本收到消息后,将其追加到自己的日志中,并更新自己的 LEO 。例如,假设 Follower 副本的 LEO 为 100,Leader 副本的日志中有消息 101、102、103,那么 Leader 副本会将消息 101、102、103 发送给 Follower 副本,Follower 副本接收并写入这些消息后,将 LEO 更新为 104。

通过这种备份机制,即使某个 Broker 发生故障,导致其上的 Leader 副本不可用,Kafka 也可以从其他存活的 Follower 副本中选举出新的 Leader 副本,继续提供服务,从而保证数据的可靠性和系统的高可用性 。在实际的分布式系统中,这种备份机制能够有效应对各种硬件故障和网络问题,确保数据的安全和业务的连续性。例如,在一个大规模的电商系统中,Kafka 的备份机制可以保证订单数据、用户数据等关键信息不会因为个别服务器的故障而丢失或不可用。

4.3.2 ISR 与选举机制

Kafka 通过动态维护同步备份集合(ISR,In-Sync Replicas)来确保数据的一致性和可靠性 。ISR 集合中包含了与 Leader 副本保持同步的 Follower 副本 。只有在 ISR 集合中的副本才有资格被选举为新的 Leader 副本 。

Kafka 判断 Follower 副本是否与 Leader 副本同步的依据是replica.lag.time.max.ms参数,默认值为 10000 毫秒(10 秒) 。如果一个 Follower 副本在超过replica.lag.time.max.ms的时间内没有向 Leader 副本发送 Fetch 请求,或者虽然发送了 Fetch 请求,但在replica.lag.time.max.ms时间内没有追上 Leader 副本的消息进度,那么这个 Follower 副本就会被认为与 Leader 副本不同步,会被从 ISR 集合中移除 。例如,假设replica.lag.time.max.ms为 10 秒,某个 Follower 副本在 15 秒内都没有向 Leader 副本发送 Fetch 请求,那么它就会被移出 ISR 集合。

当 Leader 副本发生故障时,Kafka 会从 ISR 集合中选举新的 Leader 副本 。选举的过程由 Kafka 的 Controller 负责,Controller 是 Kafka 集群中的一个特殊的 Broker,它负责管理集群的元数据信息和分区的 Leader 选举等工作 。在选举新的 Leader 副本时,Controller 会优先选择 ISR 集合中 LEO 最大的 Follower 副本作为新的 Leader 副本,因为这个副本的数据与原 Leader 副本最为接近,能够最大程度地保证数据的一致性 。例如,ISR 集合中有三个 Follower 副本,它们的 LEO 分别为 100、105、103,那么 Controller 会选择 LEO 为 105 的 Follower 副本作为新的 Leader 副本。

如果在选举时 ISR 集合为空,即所有的 Follower 副本都与 Leader 副本不同步,此时 Kafka 可以选择等待 ISR 集合中的副本恢复,或者从非 ISR 集合中的副本中选举新的 Leader 副本 。从非 ISR 集合中选举新的 Leader 副本可能会导致数据丢失,因为这些副本的数据可能与原 Leader 副本不一致 。因此,在实际应用中,需要根据具体的业务需求和数据一致性要求,合理配置相关参数,确保 ISR 集合的稳定性和可靠性 。例如,在对数据一致性要求极高的金融领域,通常会严格控制 ISR 集合的成员,避免从非 ISR 集合中选举 Leader 副本,以防止数据丢失;而在一些对数据一致性要求相对较低,但对系统可用性要求较高的场景,如一些实时监控系统,可以适当放宽 ISR 集合的条件,允许在 ISR 集合为空时从非 ISR 集合中选举 Leader 副本,以保证系统的持续运行。

五、总结与展望

通过对 Kafka 消息存储与协议实现的深入剖析,我们全面了解了其内部的工作原理和机制 。在消息存储方面,Kafka 采用了独特的物理存储结构,通过合理的分区分配策略和高效的文件管理机制,实现了海量消息的可靠存储和快速检索 。其文件格式设计以及索引文件机制,为消息的读写操作提供了坚实的支持,使得 Kafka 在高并发场景下依然能够保持出色的性能 。

在协议实现方面,生产者协议、消费者协议和副本同步协议协同工作,保障了消息在 Kafka 集群中的高效传输、准确消费以及数据的一致性和高可用性 。生产者通过灵活的分区策略和消息确认机制,确保消息能够可靠地发送到 Kafka 集群;消费者通过精确的消息拉取流程和可控的 Offset 管理方式,实现了对消息的有序消费;副本同步协议则通过备份机制和 ISR 与选举机制,保证了数据在集群中的安全性和可恢复性 。

深入理解这些机制对于优化 Kafka 性能和解决生产问题具有重要意义 。在实际应用中,我们可以根据业务需求和场景特点,合理调整 Kafka 的配置参数,如分区数量、副本因子、消息确认级别、Offset 提交方式等,以达到最佳的性能表现 。同时,当遇到诸如消息丢失、重复消费、集群性能瓶颈等问题时,能够依据对内部机制的理解,快速定位问题根源并找到解决方案 。

展望未来,随着大数据和分布式系统技术的不断发展,Kafka 在消息队列领域有望继续保持领先地位并不断演进 。一方面,Kafka 可能会在性能优化、扩展性提升以及功能增强等方面持续创新,以满足不断增长的业务需求 。例如,进一步优化消息存储和传输机制,提高集群的吞吐量和低延迟性能;增强对新的存储介质和硬件架构的支持,提升系统的整体效率 。另一方面,随着云计算、容器化技术的普及,Kafka 与这些新兴技术的融合也将成为发展趋势,实现更加便捷的部署、管理和运维 。此外,面对日益复杂的业务场景和数据处理需求,Kafka 可能会不断拓展其应用领域,如在实时数据处理、人工智能模型训练数据传输等方面发挥更大的作用 。总之,Kafka 作为消息队列领域的佼佼者,未来充满着无限的可能和发展空间 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/911857.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/911857.shtml
英文地址,请注明出处:http://en.pswp.cn/news/911857.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue.js 与 TypeScript:最佳实践

1. 引言 Vue.js 是一个渐进式、灵活的 JavaScript 框架,广泛用于构建用户界面和单页应用(SPA)。而 TypeScript 是 JavaScript 的一个超集,添加了静态类型和其他高级特性。将两者结合使用,可以帮助开发者构建更具可维护…

webpack5 css-loader:从基础到原理

webpack 处理样式 webpack本身是不能识别样式资源的,需要借助Loader来帮助webpack解析样式资源,样式资源包括但不限于css/less/sass/scss/styl 未使用样式处理加载器前 运行webpack打包命令 bash npx webpack报错信息如图,提示无法识别css…

【GESP】C++三级练习 luogu-B2096 直方图

GESP C三级练习,一维数组练习,难度★★☆☆☆。 题目题解详见:【GESP】C三级练习 luogu-B2096 直方图 | https://www.coderli.com/gesp-3-luogu-b2096/ 【GESP】C三级练习 luogu-B2096 直方图 | OneCoderGESP C三级练习,一维数组…

【网站内容安全检测】之2:从网站所有URL页面中提取所有外部及内部域名信息

还没写成Go的,用Python吧,稍微慢一点 依赖内容(安装命令pip install -r requirements.txt) requirements.txt aiohttp beautifulsoup44.12.2 tqdm4.66.1 redis5.2.1 motor3.3.1 pymongo4.6.0 chardet提取域名的程序 domain_extractor.py …

【LLaMA-Factory 实战系列】四、API 篇 - 部署推理服务与批量调用实战

【LLaMA-Factory 实战系列】四、API 篇 - 部署推理服务与批量调用实战 1. 引言2. 推理后端的选择与对比3. 部署 API 推理服务3.1 创建 API 配置文件3.2 启动 API 服务3.3 探索交互式 API 文档 4. 编写 Python 脚本进行批量调用4.1 准备工作4.2 批量调用脚本4.3 运行脚本并查看结…

C++工厂模式的作用(工厂方法、Factory Method、Factory Pattern)

文章目录 代码示例工厂的作用1. 对象创建的封装 🏭2. 解耦客户端和具体类 🔗3. 统一的创建入口 🚪4. 隐藏实现细节 🎭 在这个项目中的具体体现总结 代码示例 https://gitee.com/arnold_s/my-learning-test/tree/master/20250610_…

9-C#修改任务管理的名称

C#修改任务管理的名称

Fisco Bcos学习 - 搭建第一个区块链网络

文章目录 一、前言二、环境准备三、安装依赖在 macOS 上安装依赖在 Ubuntu 上安装依赖在 CentOS 上安装依赖 四、创建操作目录并下载安装脚本五、搭建单群组 4 节点联盟链六、启动 FISCO BCOS 链七、检查进程八、检查日志输出 在数字化时代,区块链技术正逐渐成为推动…

可视化图解算法53:表达式求值

牛客网 面试笔试 TOP 101 1. 题目 描述 请写一个整数计算器,支持加减乘三种运算和括号。 数据范围:0≤∣s∣≤100,保证计算结果始终在整型范围内 要求:空间复杂度: O(n),时间复杂度 O(n) 示例1 输入…

小白成长之路-Nginx配置(二)

文章目录 一、localtion配置1.匹配规则2.匹配优先级3.配置案例 二、rewrite1、 语法2、 可写入字段3 配置案例4 if 指令5.sutoindex6. nginx配置中的常用变量 三、配置Nginx状态统计1.下载vts模块2.编译nginx 提示:以下是本篇文章正文内容,下面案例可供参…

Qt的第一个程序

Qt的第一个程序 1.hello world2.使用图形化拖拽方式3.使用C代码的方式3.1.头文件3.2.setText3.3.对象树 4.设计MyLabel5.乱码问题 🌟🌟hello,各位读者大大们你们好呀🌟🌟 🚀🚀系列专栏&#xff…

图书数据接口

基本说明: 接口地址:http://data.isbn.work/openApi/getInfoByIsbn?isbn{isbn}&appKey{appkey}返回格式:json请求方式:get请求示例:http://data.isbn.work/openApi/getInfoByIsbn?isbn9787513159074&appKey…

MongoDB原理

目录 一、概念 二、架构 2.1 逻辑结构 2.2 数据模型 2.3 存储引擎:WiredTiger 三、事务 一、概念 MongoDB是文档数据库,基本存储单元是 文档(Document),以BSON格式(一种类json的二进制形式&#xff…

《解码音频:从基础到未来的听觉探索》

音频:开启声音世界的大门 在生活的每一个角落,音频如影随形,编织出丰富多彩的听觉体验。清晨,第一缕阳光尚未完全照进房间,手机里温柔的闹钟铃声,将我们从睡梦中轻轻唤醒,开启活力满满的一天。通…

web安全之h2注入系统学习

起初是在N1 Junior 2025 上面碰到一题,考点是h2的sql注入。由于之前没有见过,趁此机会系统学习一番 实验代码 public class H2Inject {public static void main(String[] args) throws Exception{JdbcDataSource dataSource new JdbcDataSource();dataS…

AWS认证系列:考点解析 - cloud trail,cloud watch,aws config

🎯一句话总览: 服务名类比/角色主要功能CloudTrail监控摄像头录像回放记录“谁在什么时候做了什么操作”CloudWatch护士测体温 护士喊医生实时监控系统状态,并能报警/自动应对AWS Config保安巡逻 记录资产变更历史记录 AWS 资源的“配置状…

Java八股文——数据结构「数据结构篇」

了解哪些数据结构? 面试官您好,我了解并使用过多种数据结构。在我的理解中,数据结构可以分为几个大的类别,每一类都有其独特的优势和适用场景。 1. 线性结构 (Linear Structures) 这类结构的特点是数据元素之间存在一对一的线性…

C#测试调用EPPlus根据批注设置excel单元格内容

EPPlus也是常用的Excel文件操作库,但不同于ClosedXML,使用EPPlus前需要设置授权信息,商业应用需要设置商业授权,个人使用或非商业应用也需要设置授权(测试的时候只需设置全名,保存excel文件时会保存到文件详…

windows本地搭建skywalking, 线程池中traceId不丢失

1.从官网下载9.0.0版本 Downloads | Apache SkyWalking 其它历史版本的 下载地址 Index of /dist/skywalking 这个页面 可以下载 apm服务: apache-skywalking-apm-9.0.0.tar.gz agent的包: apache-skywalking-java-agent-9.0.0.tgz 2.解压后, (看情况去config路径下 appli…

多模态大语言模型arxiv论文略读(135)

Agent S: An Open Agentic Framework that Uses Computers Like a Human ➡️ 论文标题:Agent S: An Open Agentic Framework that Uses Computers Like a Human ➡️ 论文作者:Saaket Agashe, Jiuzhou Han, Shuyu Gan, Jiachen Yang, Ang Li, Xin Eric…