目录

1、kafka消费的流程

2、kafka的消费模式

2.1、点对点模式

2.2、发布-订阅模式

3、consumer消息积压

3.1、处理方案

3.2、积压量

4、消息过期失效

5、kafka注意事项

        Kafka消费积压(Consumer Lag)是指消费者处理消息的速度跟不上生产者发送消息的速度,导致消息在Kafka主题中堆积。

关于kakfa的架构图,如下所示:

更多关于kafka的介绍,参考:关于MQ之kafka的深入研究-CSDN博客https://blog.csdn.net/weixin_50055999/article/details/148535599?spm=1011.2415.3001.5331


1、kafka消费的流程

        之前的章节中,介绍了kafka消息由producer通过hash函数存放到broker节点后,每个broker节点由多个topic主题组成,可水平扩展。

        每个topic由多个partitin组成,partition里面的内容有顺序,跨partition无序。

对于点对点模式下:

        消费组内每个消费者可以消费多个partition、同时保留offset偏移位置,保证下次消费。

对于发布订阅模式

        不同消费组内的消费者可以消费同一个patition,两个消费组不受影响,各自保留彼此的offset的偏移位置。

如图所示:

在消费者消费过程的流程如下:

由上图可知:

1、每个topic里面包含多个partition。

2、每个partition里面的内容是按顺序分布的。

3、每个消费者可以消费多个partition。

4、而partition只能被一个消费者消费。

对于不同消费者组,可以共同消费同一个topic里面的消息。


2、kafka的消费模式

Kafka 的消费订阅模式取决于消费者组的配置方式,可以分为以下两种主要模式:

2.1、点对点模式

特点:一条消息只能被一个消费者消费

实现方式

  • 所有消费者属于同一个消费者组(相同的 group.id

  • Kafka 会在组内消费者之间自动平衡分区分配

// 消费者1和消费者2使用相同的group.id
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "my-consumer-group"); // 相同的组ID
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

工作流程

  1. 假设主题有3个分区(P0, P1, P2)

  2. 如果有1个消费者,它将消费所有3个分区

  3. 如果增加第二个消费者,Kafka会重新平衡:

    • 消费者1可能获得P0和P1

    • 消费者2获得P2

  4. 消息在每个分区内有序,且只被分配给该分区的消费者消费

2.2、发布-订阅模式

特点一条消息可以被多个消费者(不同消费组)消费(本质还是点对点)

实现方式

  • 不同消费者组订阅同一个主题

  • 每个消费者组都会收到完整的消息流

// 组A的消费者
Properties propsA = new Properties();
propsA.put("group.id", "group-a"); // 不同组ID
// ...其他配置
KafkaConsumer<String, String> consumerA = new KafkaConsumer<>(propsA);// 组B的消费者
Properties propsB = new Properties();
propsB.put("group.id", "group-b"); // 不同组ID
// ...其他配置
KafkaConsumer<String, String> consumerB = new KafkaConsumer<>(propsB);

工作流程

  1. 生产者发送消息到主题

  2. 组A的所有消费者(作为一个组)会收到消息的一个副本

  3. 组B的所有消费者(作为另一个独立的组)也会收到消息的一个副本

  4. 在每个组内部,消息仍然遵循点对点模式(组内只有一个消费者收到)


3、consumer消息积压

        Kafka消息积压的问题,核心原因是生产太快、消费太慢,处理速度长期失衡,从而导致消息积压(Lag)的场景,积压到超过队列长度限制,就会出现还未被消费的数据产生丢失的场景。
       如果长时间不解决消息积压,可能会引发资源紧张服务延迟崩溃等问题。解决消息积压的关键是提高消费者的消费能力,并优化Kafka集群的整体处理效率。

3.1、处理方案

1. 如果是Kafka消费能力不足,则可以考虑增加 topic 的 partition 的个数(提高kafka的并行度)同时提升消费者组的消费者数量,消费数 = 分区数 (二者缺一不可)

2. 若是下游数据处理不及时,则提高每批次拉取的数量。批次拉取数量过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。

方法:

1. 增大partion数量。
2. 消费者加了并发,服务, 扩大消费线程。
3. 增加消费组服务数量。
4. kafka单机升级成了集群。
5. 避免消费者消费消息时间过长,导致超时。
6. 使Kafka分区之间的数据均匀分布。

3.2、积压量

  • 生产量:Kafka Topic 在一个时间周期内各partition offset 起止时间差值之和。
  • 消费量:Kafka Topic 在一个时间周期内某个消费者的消费量。
  • 积压量:Kafka Topic 的某个Consumer Group残留在消息中间件未被及时消费的消息量。

4、消息过期失效

        产生消息堆积,消费不及时,kafka数据有过期时间,一些数据就丢失了,主要是消费不及时。

当出现这种现象的时候,可参考以下经验,进行规避:

1. 消费kafka消息时,应该尽量减少每次消费时间,可通过减少调用三方接口、读库等操作,
   从而减少消息堆积的可能性。
2. 如果消息来不及消费,可以先存在数据库中,然后逐条消费(可以保存消费记录,方便定位问题)。
3. 每次接受kafka消息时,先打印出日志,包括消息产生的时间戳。
4. kafka消息保留时间(修改kafka配置文件, 默认一周)
5. 任务启动从上次提交offset处开始消费处理


5、kafka注意事项

1. 由于Kafka消息key设置,在Kafka producer处,给key加随机后缀,使其均衡。
 
2. 数据量很大,合理的增加Kafka分区数是关键。
   Kafka分区数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,
   会影响Kafka consumer消费的吞吐量. 如果利用的是Spark流和Kafka direct approach方式,
   也可以对KafkaRDD进行repartition重分区,增加并行度处理.


参考文章:

1、Kafka如何处理大量积压消息_kafka消息堆积过多了怎么办-CSDN博客https://blog.csdn.net/AlbenXie/article/details/128300018?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522dcefb6fbf11572c5ef4526b40c68a37c%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=dcefb6fbf11572c5ef4526b40c68a37c&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_click~default-1-128300018-null-null.142^v102^pc_search_result_base1&utm_term=kafka%E6%B6%88%E6%81%AF%E7%A7%AF%E5%8E%8B%E6%80%8E%E4%B9%88%E5%A4%84%E7%90%86&spm=1018.2226.3001.4187

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/87437.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/87437.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/87437.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RAG实践:Routing机制与Query Construction策略

Routing机制与Query Construction策略 前言RoutingLogical RoutingChatOpenAIStructuredRouting DatasourceConclusion Semantic RoutingEmbedding & LLMPromptRounting PromptConclusion Query ConstructionGrab Youtube video informationStructuredPrompt GithubReferen…

基于python的web系统界面登录

#让我们的电脑可以支持服务访问 #需要一个web框架 #pip install Flask from flask import Flask, render_template,request from random import randint app Flask(__name__) app.route(/index) def index():uname request.args.get("uname")return f"主页&am…

MATLAB Simulink 终极入门指南:从零设计智能控制系统

为什么工程师都爱Simulink? 想象一下:不写一行代码就能设计机器人控制器、飞行算法甚至核反应堆! MATLAB Simulink正是这样的可视化神器。全球70%的汽车ECU、航天器控制系统用它开发。本文将带你从零设计一个智能温控系统,融入创新性的模糊PID控制,并生成可部署的C代码!…

vue3 javascript 复杂数值计算操作技巧

在Vue 3中处理复杂数值计算&#xff0c;你可以采用多种策略来确保代码的可读性、可维护性和性能。以下是一些实用的技巧和最佳实践&#xff1a; 1. 使用计算属性&#xff08;Computed Properties&#xff09; Vue 3的computed属性非常适合处理复杂的数值计算。它们是基于响应…

26.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--单体转微服务--角色权限管理

在现代企业级应用中&#xff0c;角色权限管理是保障系统安全和提升用户体验的核心基础功能。一个高效的角色权限系统不仅能够有效防止越权访问&#xff0c;还能简化系统的维护和扩展。本文将系统性介绍角色权限管理的核心实现思路&#xff0c;包括架构设计、性能优化、安全机制…

[VSCode] VSCode 设置 python 的编译器

VSCode 设置 python 的编译器 快捷键&#xff1a;CTRL SHIFT P 弹出 VSCode 的命令框输入 Python : select Interpretor选择自己需要的 python 环境&#xff1b;如 python 3.8 或者 python 3.10 版本

基于PEMFC质子交换膜燃料电池系统的simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序 4.系统仿真参数 5.系统原理简介 6.参考文献 7.完整工程文件 1.课题概述 本课题是一个燃料电池&#xff08;大概率是质子交换膜燃料电池&#xff0c;PEMFC &#xff09;的数学模型仿真框图&#xff0c;用于模拟燃料电池的电特…

git-build-package 工具代码详细解读

git-build-package&#xff08;gbp&#xff09;是一个用于从 Git 仓库管理 Debian 软件包的工具&#xff0c;其代码架构和实现原理体现了对 Git 版本控制系统和 Debian 打包流程的深度整合。以下是对其代码的详细解读&#xff1a; 代码架构设计 gbp 的代码架构设计围绕其核心…

如何使用ChatGPT快速完成一篇论文初稿?

2小时写完论文初稿&#xff0c;学境思源&#xff0c;听起来是不是有点不真实&#xff1f;一键生成论文初稿&#xff01;但如果你有一个清晰的框架、良好的写作节奏&#xff0c;acaids.com。再配合像ChatGPT这样的写作助手——真的可以做到。 这篇文章就是手把手告诉你&#xf…

Docker PowerJob

1. Docker PowerJob 1. 拉取PowerJob服务端镜像 docker pull tjqq/powerjob-server:4.3.92. 创建数据卷目录用于持久化数据 mkdir -p /home/docker/powerjob/logs mkdir -p /home/docker/powerjob/data mkdir -p /home/docker/powerjob/server mkdir -p /home/docker/powerjob…

Python数据可视化:NumPy生成与Matplotlib折线图绘制

一、数据生成与可视化概述 在数据分析和科学计算领域,Python已成为最受欢迎的编程语言之一。这主要得益于其丰富的数据处理库和强大的可视化工具。数据可视化是将抽象数据转化为直观图形表示的过程,它能够帮助我们发现数据中的模式、趋势和异常值,从而做出更明智的决策。 …

26.多表查询

1.笛卡尔集 创建俩表&#xff1a; -- 创建部门表&#xff08;dept&#xff09; use mysql_learn CREATE TABLE dept (deptno INT PRIMARY KEY, dname VARCHAR(50) NOT NULL, loc VARCHAR(50) );-- 创建员工表&#xff08;emp&#xff09; CREATE TABLE emp (em…

深度学习题目(仅供参考)

一、注意力和transformer 一、选择题 注意力机制的核心步骤不包括&#xff1f; A. 计算注意力分布 B. 加权平均输入信息 C. 随机丢弃部分输入 D. 打分函数计算相关性 答案&#xff1a;C&#xff08;硬性注意力虽随机选择输入&#xff0c;但核心步骤仍为分布计算与加权&#xf…

WebWorker:提升前端性能的多线程利器

简介 在现代Web开发中&#xff0c;随着应用越来越复杂&#xff0c;JavaScript的单线程模型开始显现其局限性。Web Workers的出现为解决这一问题提供了优雅的方案&#xff0c;它允许开发者在后台线程中运行脚本&#xff0c;而不会影响主线程的性能。 Web Workers是HTML5标准的…

milvus教程:collection和scheme

环境配置&#xff1a;可以看上一节 一.数据库使用 连接 Milvus Standalone创建数据库 my_database_1&#xff08;无额外属性&#xff09;创建数据库 my_database_2&#xff08;设置副本数为 3&#xff09;列出所有数据库查看默认数据库&#xff08;default&#xff09;详情修…

14:00开始面试,14:06就出来了,问的问题有点变态。。。

从小厂出来&#xff0c;没想到在另一家公司又寄了。 到这家公司开始上班&#xff0c;加班是每天必不可少的&#xff0c;看在钱给的比较多的份上&#xff0c;就不太计较了。没想到6月一纸通知&#xff0c;所有人不准加班&#xff0c;加班费不仅没有了&#xff0c;薪资还要降40%…

Electron(01)

Electron Electron是什么 electron可以使用前端技术开发桌面应用&#xff0c;跨平台性&#xff0c;开发一套应用&#xff0c;可以打包到三个平台。 electron结合Chromium&#xff08;谷歌内核&#xff09;和 Node.js 和Native Api 当使用 Electron 时&#xff0c;很重要的一…

Kafka 拦截器深度剖析:原理、配置与实践

引言 在构建高可用、可扩展的消息系统时&#xff0c;Kafka以其卓越的性能和稳定性成为众多企业的首选。而Kafka拦截器作为Kafka生态中强大且灵活的功能组件&#xff0c;能够在消息的生产和消费过程中实现自定义逻辑的注入&#xff0c;为消息处理流程带来极大的扩展性和可控性。…

Flutter 与原生技术(Objective-C/Swift,java)的关系

在 iOS 开发中&#xff0c;Flutter 与原生技术&#xff08;Objective-C/Swift&#xff09;的关系 一、技术定位与核心差异 Flutter 语言&#xff1a;使用Dart 语言开发&#xff0c;通过 AOT&#xff08;提前编译&#xff09;将代码转换为原生 ARM 指令&#xff0c;无需依赖 iOS…

最新期刊影响因子,基本包含全部期刊

原文链接&#xff1a;2024年期刊最新影响因子&#xff08;IF&#xff09; 2024年期刊最新影响因子&#xff08;IF&#xff09; BioinfoR生信筆記 &#xff0c;注于分享生物信息学相关知识和R语言绘图教程。