#作者:张桐瑞

文章目录

  • 1 问题概述
  • 2 修改方案
    • 2.1修改参数
    • 2.2配置示例
  • 3 消费者组均分脚本
    • 3.1使用说明
    • 3.2脚本内容
    • 3.3实现原理说明
  • 4 KAFKA-EXPORTER流程代码
    • 4.1KAFKA-EXPORTER拉取数据流程

1 问题概述

由于kafka-exporter获取kafka指标时间过长,无法通过curl kafka-exporter:9308/metrics 获取指标,通过查看kafka-exporter日志,发现
time=“2025-07-22T02:18:00Z” level=error msg=“Cannot get offset of group xxxx: kafka: broker not connected” source=“kafka_exporter.go:396”
的报错信息,发现kafka-exporter在查询消费者组时出现超时,通过命令
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
同样出现超时信息。
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeConsumerGroups, deadlineMs=1753170317381, tries=1, nextAllowedTryMs=1753170317482) timed out at 1753170317382 after 1 attempt(s)
通过针对具体消费者组进行查看,发现返回信息较快,故计划过滤一部分groups来减少数据量。

2 修改方案

当前环境中,Kafka与Kafka Exporter共同部署在同一Pod 内,且各个Kafka Exporter未配置过滤规则,导致采集数据存在大量重复。为提升采集效率和准确性,计划通过以下措施进行优化:

  1. 基于不同的 ReplicaSet(RC)配置差异化过滤规则;
    针对每个RC单独设置专属的消费者组过滤规则,避免重复采集。
  2. 利用消费者均分脚本生成过滤正则表达式;
    通过脚本将所有消费者组按规则均分,生成适用于Kafka Exporter的正则表达式,保证每个 Exporter 只采集其分配范围内的消费者组。
  3. 将生成的消费者组过滤正则应用于不同 Exporter 配置中;
    将对应的正则表达式配置到各自 Exporter 实例中,实现消费组采集的隔离和均衡分布

2.1修改参数

group.filter .* Regex that determines which consumer groups to collect
group.filter # 正则表达式,用于确定要收集哪些消费者组(Consumer Groups)的指标。

2.2配置示例

只需为每个Kafka Exporter添加对应的YAML配置段落即可。

2.2.1KAFKA_EXPORTER_1:

  • args:
    • –kafka.server=localhost:9092
    • –web.listen-address=:9308
    • –group.filter=^(A|a|B).*

2.2.2KAFKA_EXPORTER_2:

  • args:
    • –kafka.server=localhost:9092
    • –web.listen-address=:9308
    • –group.filter=^(E|b).*

2.2.3KAFKA_EXPORTER_3:

  • args:
    • –kafka.server=localhost:9092
    • –web.listen-address=:9308
    • –group.filter=^©.*

3 消费者组均分脚本

3.1使用说明

配置参数:

  • BOOTSTRAP_SERVER: 指定 Kafka 服务地址;
  • EXPORTER_COUNT: 指定需要生成多少个 Exporter 的过滤规则。
# Bash group-distributor.sh使用消费者组均分脚本进行消费者组均匀分布,脚本执行结果如下:
bash-5.0$ bash /bitnami/kafka/data/topic_to_consumer_group.sh================ Hash by 首字母 (区分大小写) =================
Exporter 1: --group.filter=^(A|a|B).*  [3 groups]
Exporter 2: --group.filter=^(E|b).*  [2 groups]
Exporter 3: --group.filter=^(C).*  [1 groups]

可参考对应的exporter过滤正则结果。

3.2脚本内容

3.2.1Hash分组脚本

#!/bin/bashBOOTSTRAP_SERVER="localhost:9092"
EXPORTER_COUNT=3# 获取所有消费者组
all_groups=$(kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP_SERVER" --list)# 初始化数组
for ((i=0; i<EXPORTER_COUNT; i++)); doprefix_sets[$i]=""count[$i]=0
donedeclare -A seen_prefix# 遍历组名,按首字母 hash 均分
while IFS= read -r group; doprefix=$(echo "$group" | cut -c1)[[ -z "$prefix" ]] && continue# 计算哈希分配位置hash_hex=$(echo -n "$prefix" | md5sum | awk '{print $1}' | cut -c1-8)hash_dec=$((16#$hash_hex))idx=$((hash_dec % EXPORTER_COUNT))# 记录唯一首字母,用于生成正则if [[ -z "${seen_prefix[$prefix]}" ]]; thenseen_prefix[$prefix]=1if [[ -z "${prefix_sets[$idx]}" ]]; thenprefix_sets[$idx]="$prefix"elseprefix_sets[$idx]+="|$prefix"fifi# 每个实际组都会计入对应 exporter 的数量count[$idx]=$((count[$idx] + 1))
done <<< "$all_groups"# 输出结果
echo "================ Hash by 首字母 (区分大小写) ================="
for ((i=0; i<EXPORTER_COUNT; i++)); doif [[ -n "${prefix_sets[$i]}" ]]; thenecho "Exporter $((i+1)): --group.filter='^(${prefix_sets[$i]}).*'  [${count[$i]} groups]"elseecho "Exporter $((i+1)): --group.filter='^()'  [${count[$i]} groups]"fi
done

3.2.2 排序分组脚本

#!/bin/bashBOOTSTRAP_SERVER="localhost:9092"
EXPORTER_COUNT=3# 获取所有消费组
all_groups=$(kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP_SERVER" --list)# 按首字母分组(区分大小写)
declare -A initial_to_groups
declare -A initial_countswhile IFS= read -r group; do[[ -z "$group" ]] && continuefirst_char=${group:0:1}initial_to_groups["$first_char"]+="$group"$'\n'initial_counts["$first_char"]=$((initial_counts["$first_char"] + 1))
done <<< "$all_groups"# 将首字母按消费组数量降序排序
sorted_initials=$(for k in "${!initial_counts[@]}"; doecho -e "${initial_counts[$k]}\t$k"
done | sort -rn | awk '{print $2}')# 初始化每个 exporter 的负载、过滤项和组列表
for ((i=0; i<EXPORTER_COUNT; i++)); doexporter_filters[$i]=""exporter_loads[$i]=0exporter_groups[$i]=""
done# 分配首字母到 exporter,按消费组数量均衡
for initial in $sorted_initials; docount=${initial_counts[$initial]}# 找当前负载最小的 exportermin_index=0min_load=${exporter_loads[0]}for ((i=1; i<EXPORTER_COUNT; i++)); doif (( exporter_loads[i] < min_load )); thenmin_index=$imin_load=${exporter_loads[i]}fidone# 添加到对应 exporterif [[ -z "${exporter_filters[$min_index]}" ]]; thenexporter_filters[$min_index]="$initial"elseexporter_filters[$min_index]+="|$initial"fiexporter_loads[$min_index]=$((exporter_loads[$min_index] + count))exporter_groups[$min_index]+="${initial_to_groups[$initial]}"
done# 输出格式
echo "================ kafka-exporter group.filter 正则分片 ================="
for ((i=0; i<EXPORTER_COUNT; i++)); dogroup_count=$(echo -n "${exporter_groups[$i]}" | grep -c '^')echo "Exporter $((i+1)): ($group_count groups)"echo "  --group.filter='^(${exporter_filters[$i]}).*'"echo
done

3.3实现原理说明

3.3.1Hash分组脚本说明
该脚本通过以下核心逻辑实现消费者组的分配与正则表达式生成:

  1. 获取消费者组列表:
    利用 kafka-consumer-groups.sh --list 获取当前 Kafka 所有消费者组名。
  2. 提取首字母并去重:
    对每个消费者组名提取首字母(区分大小写),使用关联数组 seen_prefix 确保相同首字母只处理一次。
  3. Hash 均分逻辑:
    1)使用 md5sum 对首字母进行 Hash 处理;
    2)将 Hash 值转为十进制,再通过取模操作将其平均分配到 N 个 Exporter 中;
    3)每个 Exporter 收集分配到的首字母集合,组合成正则表达式。
  4. 生成过滤规则:
    每个Exporter输出一条形如–group.filter=^(A|B|C).* 的正则规则,仅匹配以指定前缀开头的消费者组。
    3.3.2排序分钟脚本说明
    在首字母提取之后,脚本还会对首字母列表进行 排序处理,其作用是:
  5. 确保稳定性
    通过排序(sort 命令),可以保证即使 Kafka 集群中的消费者组顺序发生变化,脚本对首字母的处理顺序仍保持一致,从而保证正则分配的稳定性。
  6. 提升分配公平性
    排序后的首字母经过统一的 Hash 计算和取模分配,有助于在 Exporter间更公平地分摊消费者组,提高采集性能和均衡性。
  7. 简化调试和查看
    将字母排序输出后,便于在查看分配结果时快速定位具体字母属于哪个Exporter,也有助于问题定位和正则表达式核查。

4 KAFKA-EXPORTER流程代码

4.1KAFKA-EXPORTER拉取数据流程

4.1.1拉取所有消费者组
在getConsumerGroupMetrics函数中,首先通过broker.ListGroups(&sarama.ListGroupsRequest{})向Kafka broker拉取所有的消费者组(group)列表。
这一步会返回集群中所有存在的group名称。

groupIds := make([]string, 0)for groupId := range groups.Groups {if e.groupFilter.MatchString(groupId) {groupIds = append(groupIds, groupId)}}

4.1.2 过滤消费者组
拉取到所有group后,遍历每个groupId,用e.groupFilter.MatchString(groupId)判断该group是否匹配过滤条件。
只有匹配的groupId才会被加入到后续的处理流程。

 groupIds := make([]string, 0)for groupId := range groups.Groups {if e.groupFilter.MatchString(groupId) {groupIds = append(groupIds, groupId)}}

4.1.3 DESCRIBE消费者组
只对通过过滤的group进行DescribeGroups、FetchOffset等详细指标采集。

describeGroups, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupIds})if err != nil {glog.Errorf("Cannot get describe groups: %v", err)return}for _, group := range describeGroups.Groups {offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1}if e.offsetShowAll {for topic, partitions := range offset {for partition := range partitions {offsetFetchRequest.AddPartition(topic, partition)}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/92469.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/92469.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/92469.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AT32的freertos下modbus TCP移植

1.准备模板 打开雅特力官网&#xff0c;也就是带有LwIP的示例。 下载官方源码&#xff1a;modbus 2.移植 我这里是在这里新建两个文件夹&#xff0c;分别是modbus与port&#xff0c;这个任意&#xff0c;只需要将必要的文件加入项目即可。 将源码中的modbus这些都移植过来&a…

Redis面试精讲 Day 16:Redis性能监控与分析工具

【Redis面试精讲 Day 16】Redis性能监控与分析工具 开篇 欢迎来到"Redis面试精讲"系列第16天&#xff0c;今天我们将深入探讨Redis性能监控与分析工具。在大型分布式系统中&#xff0c;Redis作为关键的数据存储和缓存组件&#xff0c;其性能指标直接影响整个系统的…

vue3+vue-flow制作简单可拖拽可增删改流程图

实现效果实现代码 准备工作 安装依赖 npm install vue-flow/core npm install vue-flow/minimap //小地图 npm install vue-flow/controls //自带的缩放、居中、加锁功能我这里只用到上述三个&#xff0c;还有其余的可根据实际情况配合官方文档使用。 npm install vue-flow/bac…

itextPdf获取pdf文件宽高不准确

正常情况下我们通过下面方式获取宽高PdfReader reader new PdfReader(file.getPath()); float width reader.getPageSize(1).getWidth(); float height reader.getPageSize(1).getHeight();但是这样获取的宽高是不准确的&#xff0c;永远都是 宽 > 高&#xff0c;也就是横…

NodeJs学习日志(2):windows安装使用node.js 安装express,suquelize,mysql,nodemon

windows安装使用node.js 安装express&#xff0c;suquelize&#xff0c;mysql&#xff0c;nodemon 系统是win10&#xff0c;默认已经安装好nodejs与npm包名作用expressWeb应用框架suquelize数据库ORMmysql数据库nodemon代码热重载安装express 添加express生成器 npm add expres…

VueCropper 图片裁剪组件在Vue项目中的实践应用

VueCropper 图片裁剪组件在Vue项目中的实践应用 1. 组件介绍 VueCropper 是一个基于 Vue.js 的图片裁剪组件&#xff0c;它提供了丰富的图片裁剪功能&#xff0c;包括&#xff1a; 图片缩放、旋转、移动固定比例裁剪高质量图片输出多种裁剪模式选择 2. 安装与引入 首先需要安装…

给同一个wordpress网站绑定多个域名的实现方法

在WordPress网站上绑定多个域名&#xff0c;可以通过以下几种方法实现&#xff1a; 1. 修改wp-config.php文件 在wp-config.php文件中&#xff0c;找到define(‘WP_DEBUG’, false);&#xff0c;在其下方添加以下代码&#xff1a; define(WP_SITEURL, http:// . $_SERVER[HT…

HarmonyOS分布式开发实战:打造跨设备协同应用

&#x1f4d6; 文章目录 第一章&#xff1a;HarmonyOS分布式架构揭秘第二章&#xff1a;跨设备协同的核心技术第三章&#xff1a;开发环境搭建与配置第四章&#xff1a;实战项目&#xff1a;智能家居控制系统第五章&#xff1a;数据同步与状态管理第六章&#xff1a;性能优化与…

用 Enigma Virtual Box 把 Qt 程序压成单文件 EXE——从编译、收集依赖到一键封包

关键词&#xff1a;Qt、windeployqt、Enigma Virtual Box、单文件、绿色软件 为什么要打成单文件&#xff1f; 传统做法&#xff1a;用 windeployqt 把依赖拷进 release 目录&#xff0c;发给用户一个文件夹&#xff0c;文件又多又乱。理想做法&#xff1a;把整个目录压成一个…

unity中实现选中人物脚下显示圆形标识且完美贴合复杂地形(如弹坑) 的效果

要实现人物脚下圆形 完美贴合复杂地形&#xff08;如弹坑&#xff09; 的效果&#xff0c;核心思路是 「动态生成贴合地面的 Mesh」 —— 即根据地面的高度场实时计算环形顶点的 Y 坐标&#xff0c;让每个顶点都 “贴” 在地面上。核心逻辑&#xff1a;确定环形范围&#xff1a…

引领GameFi 2.0新范式:D.Plan携手顶级财经媒体启动“龙珠创意秀”

在GameFi赛道寻求新突破的今天&#xff0c;一个名为Dragonverse Plan&#xff08;D.Plan&#xff09;的项目正以其独特的经济模型和宏大愿景&#xff0c;吸引着整个Web3社区的目光。据悉&#xff0c;D.Plan即将联合中文区顶级加密媒体金色财经与非小号&#xff08;Feixiaohao&a…

通信算法之307:fpga之时序图绘制

时序图绘制软件 一. 序言 在FPGA设计过程中&#xff0c;经常需要编写设计文档&#xff0c;其中&#xff0c;不可缺少的就是波形图的绘制&#xff0c;可以直接截取Vivado或者Modelsim平台实际仿真波形&#xff0c;但是往往由于信号杂乱无法凸显重点。因此&#xff0c;通过相应软…

计网学习笔记第3章 数据链路层(灰灰题库)

题目 11 单选题 下列说法正确的是______。 A. 路由器具有路由选择功能&#xff0c;交换机没有路由选择功能 B. 三层交换机具有路由选择功能&#xff0c;二层交换机没有路由选择功能 C. 三层交换机适合异构网络&#xff0c;二层交换机不适合异构网络 D. 路由器适合异构网络&…

SQL的LEFT JOIN优化

原sql&#xff0c;一个base表a,LEFT JOIN三个表抽数 SELECT ccu.*, ctr.*, om.*, of.* FROM ods.a ccu LEFT JOIN ods.b ctr ON ccu.coupon_code ctr.coupon_code AND ctr.is_deleted 0 LEFT JOIN ods.c om ON ctr.bill_code om.order_id AND om.deleted 0 LEFT JOIN ods.…

Redis 核心概念、命令详解与应用实践:从基础到分布式集成

目录 1. 认识 Redis 2. Redis 特性 2.1 操作内存 2.2 速度快 2.3 丰富的功能 2.4 简单稳定 2.5 客户端语言多 2.6 持久化 2.7 主从复制 2.8 高可用 和 分布式 2.9 单线程架构 2.9.1 引出单线程模型 2.9.2 单线程快的原因 2.10 Redis 和 MySQL 的特性对比 2.11 R…

【Day 18】Linux-DNS解析

目录 一、DNS概念 1、概念和作用 2、域名解析类型 3、 软件与服务 4、DNS核心概念 区域 记录 5、查询类型 6、分层结构 二、DNS操作 配置本机为DNS内网解析服务器 &#xff08;1&#xff09;修改主配置文件 &#xff08;2&#xff09;添加区域 正向解析区域&#xff1a; …

Python 中 OpenCV (cv2) 安装与使用介绍

Python 中 OpenCV (cv2) 安装与使用详细指南 OpenCV (Open Source Computer Vision Library) 是计算机视觉领域最流行的库之一。Python 通过 cv2 模块提供 OpenCV 的接口。 一、安装 OpenCV 方法 1&#xff1a;基础安装&#xff08;推荐&#xff09; # 安装核心包&#xff0…

微软WSUS替代方案

微软WSUS事件回顾2025年7月10日&#xff0c;微软最新确认Windows Server Update Services&#xff08;WSUS&#xff09;出现了问题&#xff0c;导致IT管理员无法正常同步和部署Windows更新。WSUS是允许管理员根据策略配置&#xff0c;将更新推送到特定计算机&#xff0c;并优化…

Minio 分布式集群安装配置

目录创建 mkdir -p /opt/minio/run && mkdir -p /etc/minio && mkdir -p /indata/disk_0/minio/datarun&#xff1a;启动脚本及二进制文件目录/etc/minio&#xff1a;配置文件目录data&#xff1a;数据存储目录下载 minio wget https://dl.min.io/server/minio…

Spring Boot + ShardingSphere 实现分库分表 + 读写分离实战

&#x1f680; Spring Boot ShardingSphere 实现分库分表 读写分离&#xff08;涵盖99%真实场景&#xff09; &#x1f3f7;️ 标签&#xff1a;ShardingSphere、分库分表、读写分离、MySQL 主从、Spring Boot 实战 分库分表 vs 读写分离 vs 主从配置与数据库高可用架构区别 …