作者:vivo 互联网大数据团队- Chen Jianbo

本文是《vivo Pulsar万亿级消息处理实践》系列文章第3篇。

Pulsar是Apache基金会的开源分布式流处理平台和消息中间件,它实现了Kafka的协议,可以让使用Kafka API的应用直接迁移至Pulsar,这使得Pulsar在Kafka生态系统中更加容易被接受和使用。KoP提供了从Kafka到Pulsar的无缝转换,用户可以使用Kafka API操作Pulsar集群,保留了Kafka的广泛用户基础和丰富生态系统。它使得Pulsar可以更好地与Kafka进行整合,提供更好的消息传输性能、更强的兼容性及可扩展性。vivo在使用Pulsar KoP的过程中遇到过一些问题,本篇主要分享一个分区消费指标缺失的问题。

系列文章:

  1. vivo Pulsar万亿级消息处理实践(1)-数据发送原理解析和性能调优

  2. vivo Pulsar万亿级消息处理实践(2)-从0到1建设Pulsar指标监控链路

文章太长?1分钟看图抓住核心观点👇

图片

一、问题背景

在一次版本灰度升级中,我们发现某个使用KoP的业务topic的消费速率出现了显著下降,具体情况如下图所示:

什么原因导致正常的升级重启服务器会出现这个问题呢?直接查看上报采集的数据报文:

kop_server_MESSAGE_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 3
kop_server_BYTES_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 188

我们看到,KoP消费指标kop_server_MESSAGE

_OUT、kop_server_BYTES_OUT是有上报的,但指标数据里的group标签变成了空串(缺少消费组名称),分区的消费指标就无法展示了。是什么原因导致了消费组名称缺失?

二、问题分析

1、找到问题代码

我们去找下这个消费组名称是在哪里获取的,是否逻辑存在什么问题。根据druid中的kop_subscription对应的消费指标kop_server_

MESSAGE_OUT、kop_server_BYTES_OUT,找到相关代码如下:

private void handleEntries(final List<Entry> entries,final TopicPartition topicPartition,final FetchRequest.PartitionData partitionData,final KafkaTopicConsumerManager tcm,final ManagedCursor cursor,final AtomicLong cursorOffset,final boolean readCommitted) {
....// 处理消费数据时,获取消费组名称CompletableFuture<String> groupNameFuture = requestHandler.getCurrentConnectedGroup().computeIfAbsent(clientHost, clientHost -> {CompletableFuture<String> future = new CompletableFuture<>();String groupIdPath = GroupIdUtils.groupIdPathFormat(clientHost, header.clientId());requestHandler.getMetadataStore().get(requestHandler.getGroupIdStoredPath() + groupIdPath).thenAccept(getResultOpt -> {if (getResultOpt.isPresent()) {GetResult getResult = getResultOpt.get();future.complete(new String(getResult.getValue() == null? new byte[0] : getResult.getValue(), StandardCharsets.UTF_8));} else {// 从zk节点 /client_group_id/xxx 获取不到消费组,消费组就是空的future.complete("");}}).exceptionally(ex -> {future.completeExceptionally(ex);return null;});returnfuture;});// this part is heavyweight, and we should not execute in the ManagedLedger Ordered executor threadgroupNameFuture.whenCompleteAsync((groupName, ex) -> {if (ex != null) {log.error("Get groupId failed.", ex);groupName = "";}
.....// 获得消费组名称后,记录消费组对应的消费指标decodeResult.updateConsumerStats(topicPartition,entries.size(),groupName,statsLogger);

代码的逻辑是,从requestHandler的currentConnectedGroup(map)中通过host获取groupName,不存在则通过MetadataStore(带缓存的zk存储对象)获取,如果zk缓存也没有,再发起zk读请求(路径为/client_group_id/host-clientId)。读取到消费组名称后,用它来更新消费组指标。从复现的集群确定走的是这个分支,即是从metadataStore(带缓存的zk客户端)获取不到对应zk节点/client_group_id/xxx。

2、查找可能导致zk节点/client_group_id/xxx节点获取不到的原因

有两种可能性:一是没写进去,二是写进去但是被删除了。

    @Overrideprotected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator,CompletableFuture<AbstractResponse> resultFuture) {
...// Store group name to metadata store for current client, use to collect consumer metrics.storeGroupId(groupId, groupIdPath).whenComplete((stat, ex) -> {if (ex != null) {// /client_group_id/xxx节点写入失败log.warn("Store groupId failed, the groupId might already stored.", ex);}findBroker(TopicName.get(pulsarTopicName)).whenComplete((node, throwable) -> {....});});
...

从代码看到,clientId与groupId的关联关系是通过handleFindCoordinatorRequest(FindCoordinator)写进去的,而且只有这个方法入口。由于没有找到warn日志,排除了第一种没写进去的可能性。看看删除的逻辑:

protected void close(){if (isActive.getAndSet(false)) {...currentConnectedClientId.forEach(clientId -> {String path = groupIdStoredPath + GroupIdUtils.groupIdPathFormat(clientHost, clientId);// 删除zk上的 /client_group_id/xxx 节点metadataStore.delete(path, Optional.empty()).whenComplete((__, ex) -> {if (ex != null) {if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {if (log.isDebugEnabled()) {log.debug("The groupId store path doesn't exist. Path: [{}]", path);}return;}log.error("Delete groupId failed. Path: [{}]", path, ex);return;}if (log.isDebugEnabled()) {log.debug("Delete groupId success. Path: [{}]", path);}});});}
}

删除是在requsetHandler.close方法中执行,也就是说连接断开就会触发zk节点删除。

但有几个疑问:

  • /client_group_id/xxx 到底是干嘛用的?消费指标为什么要依赖它

  • 为什么要在handleFindCoordinatorRequest写入?

  • 节点/client_group_id/xxx为什么要删除,而且是在连接断开时删除,删除时机是否有问题?

首先回答第1个问题,通过阅读代码可以知道,/client_group_id/xxx 这个zk节点是用于在不同broker实例间交换数据用的(相当redis cache),用于临时存放IP+clientId与groupId的映射关系。由于fetch接口(拉取数据)的request没有groupId的,只能依赖加入Group过程中的元数据,在fetch消费时才能知道当前拉数据的consumer是哪个消费组的。

3、复现

若要解决问题,最好能够稳定地复现出问题,这样才能确定问题的根本原因,并且确认修复是否完成。

因为节点是在requsetHandle.close方法中执行删除,broker节点关闭会触发连接关闭,进而触发删除。假设:客户端通过brokerA发起FindCoordinator请求,写入zk节点/client_group

_id/xxx,同时请求返回brokerB作为Coordinator,后续与brokerB进行joinGroup、syncGroup等交互确定消费关系,客户端在brokerA、brokerB、brokerC都有分区消费。这时重启brokerA,分区均衡到BrokerC上,但此时/client_group_id/xxx因关闭broker而断开连接被删除,consumer消费刚转移到topic1-partition-1的分区就无法获取到groupId。

按照假设,有3个broker,开启生产和消费,通过在FindCoordinator返回前获取node.leader()的返回节点BrokerB,关闭brokerA后,brokerC出现断点复现,再关闭brokerC,brokerA也会复现(假设分区在brokerA与brokerC之间转移)。

图片

复现要几个条件:

  1. broker数量要足够多(不小于3个)

  2. broker内部有zk缓存metadataCache默认为5分钟,可以把时间调小为1毫秒,相当于没有cache

  3. findCoordinator返回的必须是其他broker的IP

  4. 重启的必须是接收到findCoordinator请求那台broker,而不是真正的coordinator,这时会从zk删除节点

  5. 分区转移到其他broker,这时新的broker会重新读取zk节点数据

到此,我们基本上清楚了问题原因:连接关闭导致zk节点被删除了,别的broker节点需要时就读取不到了。那怎么解决?

三、问题解决

方案一

既然知道把消费者与FindCoordinator的连接进行绑定不合适的,那么是否应该把FindCoordinator写入zk节点换成由JoinGroup写入,断连即删除。

图片

consumer统一由Coordinator管理,由于FindCoordinator接口不一定是Coordinator处理的,如果换成由Coordinator处理的JoinGroup接口是否就可以了,这样consumer断开与Coordinator的连接就应该删除数据。但实现验证时却发现,客户端在断连后也不会再重连,所以没法重新写入zk,不符合预期。

方案二

还是由FindCoordinator写入zk节点,但删除改为GroupCoordinator监听consumer断开触发。

因为consumer统一由Coordinator管理,它能监听到consumer加入或者离开。GroupCoordinator的removeMemberAndUpdateGroup方法是coordinator对consumer成员管理。

private void removeMemberAndUpdateGroup(GroupMetadata group,MemberMetadata member) {group.remove(member.memberId());switch (group.currentState()) {case Dead:case Empty:return;case Stable:case CompletingRebalance:maybePrepareRebalance(group);break;case PreparingRebalance:joinPurgatory.checkAndComplete(new GroupKey(group.groupId()));break;default:break;}// 删除 /client_group_id/xxx 节点deleteClientIdGroupMapping(group, member.clientHost(), member.clientId());
}

调用入口有两个,其中handleLeaveGroup是主动离开,onExpireHeartbeat是超时被动离开,客户端正常退出或者宕机都可以调用removeMemberAndUpdateGroup方法触发删除。

public CompletableFuture<Errors> handleLeaveGroup(String groupId,String memberId
) {return validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).map(error ->CompletableFuture.completedFuture(error)).orElseGet(() -> {return groupManager.getGroup(groupId).map(group -> {return group.inLock(() -> {if (group.is(Dead) || !group.has(memberId)) {return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);} else {...// 触发删除消费者consumerremoveMemberAndUpdateGroup(group, member);return CompletableFuture.completedFuture(Errors.NONE);}});})....});
}void onExpireHeartbeat(GroupMetadata group,MemberMetadata member,long heartbeatDeadline) {group.inLock(() -> {if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {log.info("Member {} in group {} has failed, removing it from the group",member.memberId(), group.groupId());// 触发删除消费者consumerremoveMemberAndUpdateGroup(group, member);}return null;});
}

但这个方案有个问题是,日志运维关闭broker也会触发一个onExpireHeartbeat事件删除zk节点,与此同时客户端发现Coordinator断开了会马上触发FindCoordinator写入新的zk节点,但如果删除晚于写入的话,会导致误删除新写入的节点。我们干脆在关闭broker时,使用ShutdownHook加上shuttingdown状态防止关闭broker时删除zk节点,只有客户端断开时才删除。

这个方案修改上线半个月后,还是出现了一个客户端的消费指标无法上报的情况。后来定位发现,如果客户端因FullGC出现卡顿情况,客户端可能会先于broker触发超时,也就是先超时的客户端新写入的数据被后监听到超时的broker误删除了。因为写入与删除并不是由同一个节点处理,所以无法在进程级别做并发控制,而且也无法判断哪次删除对应哪次的写入,所以用zk也是很难实现并发控制。

方案三

其实这并不是新的方案,只是在方案二基础上优化:数据一致性检查。

既然我们很难控制好写入与删除的先后顺序,我们可以做数据一致性检查,类似于交易系统里的对账。因为GroupCoordinator是负责管理consumer成员的,维护着consumer的实时状态,就算zk节点被误删除,我们也可以从consumer成员信息中恢复,重新写入zk节点。

private void checkZkGroupMapping(){  for (GroupMetadata group : groupManager.currentGroups()) {  for (MemberMetadata memberMetadata : group.allMemberMetadata()) {  String clientPath = GroupIdUtils.groupIdPathFormat(memberMetadata.clientHost(), memberMetadata.clientId());  String zkGroupClientPath = kafkaConfig.getGroupIdZooKeeperPath() + clientPath;  // 查找zk中是否存在节点metadataStore.get(zkGroupClientPath).thenAccept(resultOpt -> {  if (!resultOpt.isPresent()) {  // 不存在则进行补偿修复metadataStore.put(zkGroupClientPath, memberMetadata.groupId().getBytes(UTF\_8), Optional.empty())  .thenAccept(stat -> {  log.info("repaired clientId and group mapping: {}({})",  zkGroupClientPath, memberMetadata.groupId());  })  .exceptionally(ex -> {  log.warn("repaired clientId and group mapping failed: {}({})",  zkGroupClientPath, memberMetadata.groupId());  return null;  });  }  }).exceptionally(ex -> {  log.warn("repaired clientId and group mapping failed: {} ", zkGroupClientPath, ex);  return null;  });  }  }  
}

经过方案三的优化上线,即使是历史存在问题的消费组,个别分区消费流量指标缺少group字段的问题也得到了修复。具体效果如下图所示:

图片

四、总结

经过多个版本的优化和线上验证,最终通过方案三比较完美的解决了这个消费指标问题。在分布式系统中,并发问题往往难以模拟和复现,我们也在尝试多个版本后才找到有效的解决方案。如果您在这方面有更好的经验或想法,欢迎提出,我们共同探讨和交流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913969.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913969.shtml
英文地址,请注明出处:http://en.pswp.cn/news/913969.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Marin说PCB之Allegro高亮BOM器件技巧详解

一&#xff0c;首先在原理图输出BOM的时候&#xff0c;只需要勾选器件的位号这个选项即可&#xff0c;具体操作如下所示&#xff1a;二&#xff0c;输出BOM完成后&#xff0c;打开表格选择我们器件的位号那列即可&#xff0c;然后复制到我们的TEXT文本中。三&#xff0c;接着就…

数据结构与算法——从递归入手一维动态规划【2】

前言&#xff1a; 记录一下对左程云系列算法课程--算法讲解066【必备】的剩余习题的学习。本文主要简单记录个人学习心得和提供C版本代码。如需要题目的细致讲解&#xff0c;请前往原视频。 涉及内容&#xff1a; 动态规划、三指针、 参考视频&#xff1a; 左程云--算法讲…

【理念●体系】Windows AI 开发环境搭建实录:六层架构的逐步实现与路径治理指南

【理念●体系】从零打造 Windows WSL Docker Anaconda PyCharm 的 AI 全链路开发体系-CSDN博客 Windows AI 开发环境搭建实录&#xff1a;六层架构的逐步实现与路径治理指南 ——理念落地篇&#xff0c;从路径规划到系统治理&#xff0c;打造结构化可复现的 AI 开发环境 AI…

5G标准学习笔记15 --CSI-RS测量

5G标准学习笔记15 --CSI-RS测量 前言 前面讲了&#xff0c;在5GNR中&#xff0c;CSI-RS 是支持信道状态评估、波束管理和无线资源管理&#xff08;RRM&#xff09;的关键参考信号。下面孬孬基于3GPP TS 38.331中的内容&#xff0c;详细定义了基于 CSI-RS 的测量程序&#xff0c…

第P28:阿尔茨海默病诊断(优化特征选择版)

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 一、进阶说明 针对于特征对模型结果的影响我们做了特征分析 特征选择 1. SelectFromModel 工作原理&#xff1a;基于模型的特征选择方法&#xff0c;使用…

AI的欧几里得要素时刻:从语言模型到可计算思维

引言 人工智能正在经历一个关键的转折点。就像欧几里得的《几何原本》为数学奠定了公理化基础一样&#xff0c;AI也正在寻找自己的"要素时刻"——一个能够将当前的语言模型能力转化为真正可计算、可验证思考的转变。 最近发表的论文《AI’s Euclid’s Elements Momen…

番外-linux系统运行.net framework 4.0的项目

基础环境&#xff1a;linux系统&#xff0c;.net framework 4.0&#xff0c;npgsql 2.2.5.0 &#xff08;版本不同&#xff0c;构建可能失败&#xff09; 方法背景&#xff1a;linux不支持运行.net framework 4.0&#xff0c;高版本mono不支持npgsql 2.x 主要使用&#xff1a…

国内AI训练都有哪些企业?:技术深耕与场景实践

国内AI训练都有哪些企业&#xff1f;当人工智能从实验室走向产业一线&#xff0c;AI 训练就像为智能系统 “施肥浇水” 的关键环节&#xff0c;让技术根系在各行业土壤里扎得更深。国内一批 AI 训练企业正各展所长&#xff0c;有的专攻技术优化&#xff0c;有的深耕场景应用。它…

微算法科技基于格密码的量子加密技术,融入LSQb算法的信息隐藏与传输过程中,实现抗量子攻击策略强化

随着量子计算技术的发展&#xff0c;传统加密算法面临被量子计算机破解的风险&#xff0c;LSQb 算法也需考虑应对未来可能的量子攻击。微算法科技基于格密码的量子加密技术&#xff0c;融入LSQb算法的信息隐藏与传输过程中&#xff0c;实现抗量子攻击策略强化。格密码在面对量子…

xAI发布Grok4+代码神器Grok4 Code,教你如何在国内升级订阅SuperGrok并使用到Grok4教程

就在今天&#xff0c;马斯克旗下xAI发布了其最新的旗舰AI模型Grok4&#xff0c;并同步推出专为开发者打造的编程利器 Grok 4 Code&#xff0c;还推出了一项全新的AI订阅计划——每月300美元的SuperGrokHeavy。 那最新发布的Grok4以及有哪些特性呢&#xff1f;以及如何才能使用…

Rust 变量遮蔽(Variable Shadowing)

在 Rust 中&#xff0c;变量遮蔽&#xff08;Variable Shadowing&#xff09; 是一种在同一作用域内重新声明同名变量的特性。它允许你创建一个新变量覆盖之前的同名变量&#xff0c;新变量与旧变量类型可以不同&#xff0c;且旧变量会被完全隐藏。核心特点允许同名变量重复声明…

【VScode | 快捷键】全局搜索快捷键(ctrl+shift+f)失效原因及解决方法

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f60e;金句分享&#x1f60e;&a…

Windows 与 Linux 内核安全及 Metasploit/LinEnum 在渗透测试中的综合应用

目录 &#x1f6e0;️ 1. 内核安全如何助力渗透测试与黑客行业 1.1 内核安全的战略价值 1.2 结合 Metasploit 与 LinEnum 的作用 &#x1f50d; 2. Metasploit 信息收集模块及其在内核安全中的应用 2.1 Windows 信息收集模块 2.2 Linux 信息收集模块 2.3 使用步骤 Wind…

京东携手HarmonyOS SDK首发家电AR高精摆放功能

在电商行业的演进中&#xff0c;商品的呈现方式不断升级&#xff1a;从文字、图片到视频&#xff0c;再到如今逐渐兴起的3D与AR技术。作为XR应用探索的先行者&#xff0c;京东正站在这场体验革新的最前沿&#xff0c;不断突破商品展示的边界&#xff0c;致力于通过创新技术让消…

瞄准Win10难民,苹果正推出塑料外壳、手机CPU的MacBook

最近有消息称&#xff0c;苹果正在研发一款定位“低价”的MacBook&#xff0c;售价可能低于800美元&#xff08;约合人民币5800元&#xff09;&#xff0c;采用的是A18 Pro芯片&#xff0c;也就是未来iPhone 16 Pro同款的“手机芯片”&#xff0c;而不是现有的M系列。这款产品预…

原子级 macOS 信息窃取程序升级:新增后门实现持久化控制

臭名昭著的 Atomic macOS Stealer&#xff08;AMOS&#xff0c;原子级 macOS 窃取程序&#xff09;恶意软件近期完成危险升级&#xff0c;全球 Mac 用户面临更严峻威胁。这款与俄罗斯有关联的窃密程序首次植入后门模块&#xff0c;使攻击者能维持对受感染系统的持久访问、执行远…

Shader面试题100道之(81-100)

Shader面试题&#xff08;第81-100题&#xff09; 以下是第81到第100道Shader相关的面试题及答案&#xff1a; 81. Unity中如何实现屏幕空间的热扭曲效果&#xff08;Heat Distortion&#xff09;&#xff1f; 热扭曲效果可以通过GrabPass抓取当前屏幕图像&#xff0c;然后在片…

C#洗牌算法

洗牌算法是一种将序列&#xff08;如数组、列表&#xff09;元素随机打乱的经典算法&#xff0c;核心目标是让每个元素在打乱后出现在任意位置的概率均等。在 C# 中&#xff0c;常用的洗牌算法有Fisher-Yates 洗牌算法&#xff08;也称 Knuth 洗牌算法&#xff09;&#xff0c;…

Python PDFplumber详解:从入门到精通的PDF处理指南

一、PDFplumber核心优势解析 在数字化办公场景中&#xff0c;PDF文档处理是数据分析师和开发者的必备技能。相较于PyPDF2、pdfminer等传统库&#xff0c;PDFplumber凭借其三大核心优势脱颖而出&#xff1a; 精准表格提取&#xff1a;采用流式布局分析算法&#xff0c;支持复杂表…

Flutter 与 Android 的互通几种方式

Flutter 与 Android 的互通主要通过以下几种方式实现&#xff0c;每种方式适用于不同的场景&#xff1a;1. 平台通道&#xff08;Platform Channels&#xff09; Flutter 与原生 Android 代码通信的核心方式&#xff0c;支持双向调用。 类型&#xff1a; MethodChannel&#xf…