#作者:张桐瑞

文章目录

  • 1 生产案例:Controller 选举在故障恢复中的关键作用
    • 1.1 问题背景
    • 1.2 核心操作原理:
  • 2 Controller 元数据全景:从 ZooKeeper 到内存的数据镜像
    • 2.1元数据核心载体:ControllerContext 类
    • 2.2核心元数据深度解析

1 生产案例:Controller 选举在故障恢复中的关键作用

1.1 问题背景

某 Kafka 集群部分核心主题分区一直处于“不可用”状态,通过kafka-toics.sh命令查看,发现分区leader一直处于Leader=-1的 “不可用” 状态。尝试重启旧 Leader 所在 Broker 无效,并且由于是生产环境,不能通过重启整个集群来随意重启,这毕竟是一个非常缺乏计划性的事情。

如何在避免重启集群的情况下,干掉已有Controller并执行新的Controller选举呢?答案就在源码中的ControllerZNode.path上,也就是ZooKeeper的/controller节点。倘若我们手动删除/controller节点,Kafka集群就会触发Controller选举。于是,我们马上实施这个方案,效果出奇得好:之前的受损分区全部恢复正常,业务数据得以正常生产和消费。

1.2 核心操作原理:

Controller 选举后会触发集群元数据全量同步(“重刷” 分区状态),原理如下:

  1. ZooKeeper 的/controller节点存储当前 Controller 信息,删除该节点会触发集群重新选举 Controller
  2. 新 Controller 上任后通过UpdateMetadataRequest向所有 Broker 同步最新元数据,修复分区状态
    注意事项:此操作需谨慎,生产环境建议通过 API 或工具触发元数据更新,避免直接操作 ZooKeeper 节点。

2 Controller 元数据全景:从 ZooKeeper 到内存的数据镜像

Kafka Controller 作为集群元数据的 “真理之源副本”,其核心作用是:

  • 缓存 ZooKeeper 元数据,避免 Broker 直接与 ZooKeeper 交互
  • 未来将替代 ZooKeeper 成为唯一元数据中心(社区规划)

2.1元数据核心载体:ControllerContext 类

类定义路径:
core/src/main/scala/kafka/controller/ControllerContext.scala
数据结构概览:
该类封装 17 项元数据,以下是核心字段解析(按重要性排序):
在这里插入图片描述

2.2核心元数据深度解析

2.2.1 ControllerStats

private[controller] class ControllerStats extends KafkaMetricsGroup {// 统计每秒发生的Unclean Leader选举次数val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)// Controller事件通用的统计速率指标的方法val rateAndTimeMetrics: Map[ControllerState, KafkaTimer] = ControllerState.values.flatMap { state =>state.rateAndTimeMetricName.map { metricName =>state -> new KafkaTimer(newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))}}.toMap
}

其中,前者是计算Controller每秒执行的Unclean Leader选举数量,通常情况下,执行Unclean Leader选举可能造成数据丢失,一般不建议开启它。一旦开启,你就需要时刻关注这个监控指标的值,确保Unclean Leader选举的速率维持在一个很低的水平,否则会出现很多数据丢失的情况。

后者是统计所有Controller状态的速率和时间信息,单位是毫秒。当前,Controller定义了很多事件,比如,TopicDeletion是执行主题删除的Controller事件、ControllerChange是执行Controller重选举的事件。ControllerStats的这个指标通过在每个事件名后拼接字符串RateAndTimeMs的方式,为每类Controller事件都创建了对应的速率监控指标。

2.2.2 offlinePartitionCount
该字段统计集群中所有离线或处于不可用状态的主题分区数量。所谓的不可用状态,就是我最开始举的例子中“Leader=-1”的情况。

ControllerContext中的updatePartitionStateMetrics方法根据给定主题分区的当前状态和目标状态,来判断该分区是否是离线状态的分区。如果是,则累加offlinePartitionCount字段的值,否则递减该值。方法代码如下:

// 更新offlinePartitionCount元数据
private def updatePartitionStateMetrics(partition: TopicPartition, currentState: PartitionState,targetState: PartitionState): Unit = {// 如果该主题当前并未处于删除中状态if (!isTopicDeletionInProgress(partition.topic)) {// targetState表示该分区要变更到的状态// 如果当前状态不是OfflinePartition,即离线状态并且目标状态是离线状态// 这个if语句判断是否要将该主题分区状态转换到离线状态if (currentState != OfflinePartition && targetState == OfflinePartition) {offlinePartitionCount = offlinePartitionCount + 1// 如果当前状态已经是离线状态,但targetState不是// 这个else if语句判断是否要将该主题分区状态转换到非离线状态} else if (currentState == OfflinePartition && targetState != OfflinePartition) {offlinePartitionCount = offlinePartitionCount - 1}}
}

该方法首先要判断,此分区所属的主题当前是否处于删除操作的过程中。如果是的话,Kafka就不能修改这个分区的状态,那么代码什么都不做,直接返回。否则,代码会判断该分区是否要转换到离线状态。如果targetState是OfflinePartition,那么就将offlinePartitionCount值加1,毕竟多了一个离线状态的分区。相反地,如果currentState是offlinePartition,而targetState反而不是,那么就将offlinePartitionCount值减1。

2.2.3 shuttingDownBrokerIds
顾名思义,该字段保存所有正在关闭中的Broker ID列表。当Controller在管理集群Broker时,它要依靠这个字段来甄别Broker当前是否已关闭,因为处于关闭状态的Broker是不适合执行某些操作的,如分区重分配(Reassignment)以及主题删除等。
另外,Kafka必须要为这些关闭中的Broker执行很多清扫工作,Controller定义了一个onBrokerFailure方法,它就是用来做这个的。代码如下:

private def onBrokerFailure(deadBrokers: Seq[Int]): Unit = {info(s"Broker failure callback for ${deadBrokers.mkString(",")}")// deadBrokers:给定的一组已终止运行的Broker Id列表// 更新Controller元数据信息,将给定Broker从元数据的replicasOnOfflineDirs中移除deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)// 找出这些Broker上的所有副本对象val deadBrokersThatWereShuttingDown =deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))if (deadBrokersThatWereShuttingDown.nonEmpty)info(s"Removed ${deadBrokersThatWereShuttingDown.mkString(",")} from list of shutting down brokers.")// 执行副本清扫工作val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)onReplicasBecomeOffline(allReplicasOnDeadBrokers)// 取消这些Broker上注册的ZooKeeper监听器unregisterBrokerModificationsHandler(deadBrokers)
}

该方法接收一组已终止运行的Broker ID列表,首先是更新Controller元数据信息,将给定Broker从元数据的replicasOnOfflineDirs和shuttingDownBrokerIds中移除,然后为这组Broker执行必要的副本清扫工作,也就是onReplicasBecomeOffline方法做的事情。

该方法主要依赖于分区状态机和副本状态机来完成对应的工作。在后面的课程中,我们会专门讨论副本状态机和分区状态机,这里你只要简单了解下它要做的事情就行了。后面等我们学完了这两个状态机之后,你可以再看下这个方法的具体实现原理。
这个方法的主要目的是把给定的副本标记成Offline状态,即不可用状态。具体分为以下这几个步骤:

  1. 利用分区状态机将给定副本所在的分区标记为Offline状态;
  2. 将集群上所有新分区和Offline分区状态变更为Online状态;
  3. 将相应的副本对象状态变更为Offline。

2.2.4 liveBrokers
该字段保存当前所有运行中的Broker对象。每个Broker对象就是一个的三元组。ControllerContext中定义了很多方法来管理该字段,如addLiveBrokersAndEpochs、removeLiveBrokers和updateBrokerMetadata等。我拿updateBrokerMetadata方法进行说明,以下是源码:

def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {liveBrokers -= oldMetadataliveBrokers += newMetadata}

每当新增或移除已有Broker时,ZooKeeper就会更新其保存的Broker数据,从而引发Controller修改元数据,也就是会调用updateBrokerMetadata方法来增减Broker列表中的对象。

2.2.5 liveBrokerEpochs
该字段保存所有运行中Broker的Epoch信息。Kafka使用Epoch数据防止Zombie Broker,即一个非常老的Broker被选举成为Controller。
另外,源码大多使用这个字段来获取所有运行中Broker的ID序号,如下面这个方法定义的那样:
def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet – shuttingDownBrokerIds

liveBrokerEpochs的keySet方法返回Broker序号列表,然后从中移除关闭中的Broker序号,剩下的自然就是处于运行中的Broker序号列表了。

2.2.6 epoch & epochZkVersion
这两个字段一起说,因为它们都有“epoch”字眼,放在一起说,可以帮助你更好地理解两者的区别。epoch实际上就是ZooKeeper中/controller_epoch节点的值,你可以认为它就是Controller在整个Kafka集群的版本号,而epochZkVersion实际上是/controller_epoch节点的dataVersion值。

Kafka使用epochZkVersion来判断和防止Zombie Controller。这也就是说,原先在老Controller任期内的Controller操作在新Controller不能成功执行,因为新Controller的epochZkVersion要比老Controller的大。
另外,你可能会问:“这里的两个Epoch和上面的liveBrokerEpochs有啥区别呢?”实际上,这里的两个Epoch值都是属于Controller侧的数据,而liveBrokerEpochs是每个Broker自己的Epoch值。

2.2.7 allTopics
该字段保存集群上所有的主题名称。每当有主题的增减,Controller就要更新该字段的值。

比如Controller有个processTopicChange方法,从名字上来看,它就是处理主题变更的。我们来看下它的代码实现,我把主要逻辑以注释的方式标注了出来:

private def processTopicChange(): Unit = {if (!isActive) return // 如果Contorller已经关闭,直接返回val topics = zkClient.getAllTopicsInCluster(true) // 从ZooKeeper中获取当前所有主题列表val newTopics = topics -- controllerContext.allTopics // 找出当前元数据中不存在、ZooKeeper中存在的主题,视为新增主题val deletedTopics = controllerContext.allTopics -- topics // 找出当前元数据中存在、ZooKeeper中不存在的主题,视为已删除主题controllerContext.allTopics = topics // 更新Controller元数据// 为新增主题和已删除主题执行后续处理操作registerPartitionModificationsHandlers(newTopics.toSeq)val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)deletedTopics.foreach(controllerContext.removeTopic)addedPartitionReplicaAssignment.foreach {case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)}info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +s"[$addedPartitionReplicaAssignment]")if (addedPartitionReplicaAssignment.nonEmpty)onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)}

2.2.8 partitionAssignments
该字段保存所有主题分区的副本分配情况。在我看来,这是Controller最重要的元数据了。事实上,你可以从这个字段衍生、定义很多实用的方法,来帮助Kafka从各种维度获取数据。

比如,如果Kafka要获取某个Broker上的所有分区,那么,它可以这样定义:

partitionAssignments.flatMap {case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter {case (_, partitionAssignment) => partitionAssignment.replicas.contains(brokerId)}.map {case (partition, _) => new TopicPartition(topic, partition)}}.toSet

再比如,如果Kafka要获取某个主题的所有分区对象,代码可以这样写:

partitionAssignments.getOrElse(topic, mutable.Map.empty).map {case (partition, _) => new TopicPartition(topic, partition)}.toSet

实际上,这两段代码分别是ControllerContext.scala中partitionsOnBroker方法和partitionsForTopic两个方法的主体实现代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/89474.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/89474.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/89474.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《寻北技术的全面剖析与应用前景研究报告》

一、引言 1.1 研究背景与意义 寻北,作为确定地理北极方向的关键技术,在众多领域中扮演着举足轻重的角色。在军事领域,精确的寻北对于武器系统的瞄准、导弹的精确制导以及部队的战略部署都至关重要。例如,火炮在发射前需要精确寻…

深入比较 Gin 与 Beego:Go Web 框架的两大选择

引言 在 Go 语言生态系统中,Gin 和 Beego 是两个非常受欢迎的 Web 框架。它们各自有着不同的设计理念和目标用户群体。本文将对这两个框架进行深入比较,并帮助你理解它们之间的区别,以便根据项目需求做出合适的选择。 一、Gin 概述 Gin是一…

全新大模型开源,腾讯(int4能打DeepSeek) Vs 谷歌(2GB运行多模态)

大家好,我是 Ai 学习的老章 最近除了阿里 Qwen3 模型更新了图片生成和处理能力,大家都可以玩转吉卜力风格 还有几个最近发布的大模型值得关注 1 是腾讯开源了 80B 混元 A13B 模型,亮点是精度无损的 int4 很能打 2 是谷歌开源的小参数 Gemm…

向量数据库milvus中文全文检索取不到数据的处理办法

​检查中文分词配置​ Milvus 2.5 支持原生中文全文检索,但需显式配置中文分词器: 创建集合时指定分词器类型为 chinese python schema.add_field(field_name"text", datatypeDataType.VARCHAR, max_length65535, enable_analyzerTrue, an…

Stable Diffusion 项目实战落地:从0到1 掌握ControlNet 第一篇 打造光影字形的创意秘技

大家好呀,欢迎来到 AI造字工坊! 在这篇文章中,我们将带领你走进一个神奇的世界——ControlNet。你可能听说过它,但可能还没摸清它的深奥之处。 今天,我们就来揭开它神秘的面纱,轻松带你玩转字形设计! 话说回来,相信大家对图片生成、提示词、放大操作、抽卡这些基本操…

从零用java实现 小红书 springboot vue uniapp (12)实现分类筛选与视频笔记功能

移动端演示 http://8.146.211.120:8081/#/ 管理端演示 http://8.146.211.120:8088/#/ 项目整体介绍及演示 前言 在前面的系列文章中,我们已经基本完成了小红书项目的核心框架搭建和图文笔记的发布、展示流程。为了丰富App的功能和用户体验,今天我们将在…

Python与Web3.py库交互实践

目录 Python与Web3.py库交互实践引言:连接Python与区块链的桥梁1. 环境配置与基础连接1.1 安装Web3.py1.2 连接以太坊节点2. 基础区块链交互2.1 账户与余额查询2.2 创建并发送交易3. 智能合约交互3.1 加载和部署合约3.2 与已部署合约交互4. 高级功能实践4.1 事件监听4.2 与ERC…

《汇编语言:基于X86处理器》第6章 条件处理(2)

本章向程序员的汇编语言工具箱中引入一个重要的内容,使得编写出来的程序具备作决策的功能。几乎所有的程序都需要这种能力。首先,介绍布尔操作,由于能影响CPU状态标志,它们是所有条件指令的核心。然后,说明怎样使用演绎…

深度剖析NumPy核心函数reshape()

深度剖析NumPy核心函数reshape reshape()函数基础概念reshape()函数语法与参数详解reshape()函数使用示例基本的形状重塑使用-1自动计算维度多维数组的形状重塑不同order参数的效果 reshape()函数的应用场景数据预处理机器学习模型输入算法实现 当我们使用np.array()创建好数组…

Linux平台MinGW32/MinGW64交叉编译完全指南:原理、部署与组件详解

一、MinGW是什么?为什么需要交叉编译? MinGW(Minimalist GNU for Windows)是一套在Linux上构建Windows应用程序的完整工具链。它允许开发者: 在Linux环境下编译Windows可执行文件(.exe/.dll)避…

为什么我画的频谱图和audacity、audition不一样?

文章目录 系列文章目录 目录 文章目录 前言 一、问题引入 二、使用步骤 三、分析和改进 总结 前言 我们知道audacity和audition都有频谱分析这个窗口,一般过程肯定是分帧加窗,fft变换然后呈现, 大体这个过程是没问题的,但为什…

责任链模式 Go 语言实战

责任链模式(Chain of Responsibility) 责任链模式是一种行为设计模式,它允许将请求沿着处理者链进行传递,直到有一个处理者能够处理它。这个模式的主要目的是解耦请求的发送者和接收者,使得多个对象都有机会处理这个请…

使用开源项目youlai_boot 导入到ecplise 中出现很多错误

我是使用ecplise 导入得youlai_boot 这个项目,但是导入到ecplise 中一直出现报错,然后各种maven clean 和maven install 以及update Maven 都没有效果不知道怎么办才好,怎么样解决这个问题,原来是我本地的环境中没有安装 lombok.…

06_Americanas精益管理项目_数据分析

文章目录 Americanas精益管理项目_数据分析(一)思维方法1、数据分析思维2、零售行业-万能「人货场」分析框架(二)商品分析1、品类销量分析2、销量趋势分析3、帕累托法则分析4、商品TopN分析(三)用户分析(四)场景分析Americanas精益管理项目_数据分析 数据分析与数据开…

ES6从入门到精通:类与继承

ES6 类的基本概念 ES6 引入了基于类的面向对象编程语法,通过 class 关键字定义类。类可以包含构造函数、方法和属性。 class Person {constructor(name) {this.name name;}greet() {console.log(Hello, ${this.name}!);} }const person new Person(Alice); pers…

【经验】新版Chrome中Proxy SwitchyOmega2已实效,改为ZeroOmega

1、问题描述 手欠更新了 Chrome 导致无法“上网”,原因是 Proxy SwitchyOmega2 已实效。 2、解决方法 2.1 下载 新版Chrome中Proxy SwitchyOmega2已实效,改为ZeroOmega; 想方设法去下载 ZeroOmega 的crx包,最新的为&#xff1…

在windows上设置python的环境

安装好了python,再具体说下python语言的相关环境。 #01 关于Python Python 是一个高级别的、边运行边解释的、动态类型的编程语言,以简洁的语法、强大的功能和丰富的资源库而闻名。广泛应用于 Web 开发、数据分析、人工智能、自动化脚本等多个领域。 目前 Python 语言有两…

3D 建模与点云建模:从虚拟构建到实景复刻的数字孪生双引擎

在数字化浪潮席卷全球的当下,3D 建模与点云建模如同数字世界的左膀右臂,一个以抽象化的创意构建虚拟蓝图,一个以高精度的实景数据复刻现实世界。它们不仅深刻重塑了影视娱乐、工业制造、建筑设计等传统领域,更成为数字孪生技术蓬勃…

智能检测原理和架构

大家读完觉得有帮助记得关注和点赞!!! 智能检测系统基于AI和大数据分析技术,通过主动感知、行为建模与实时响应构建动态防御体系。其核心在于将传统规则匹配升级为**多模态威胁认知**,实现对新型攻击(如AI…

2025年6月个人工作生活总结

本文为 2025年6月工作生活总结。 研发编码 某国产操作系统curl下载sftp服务器文件问题记录 场景: 某国产系统curl版本信息: # curl --version curl 7.71.1 (x86_64-koji-linux-gnu) libcurl/7.71.1 OpenSSL/1.1.1f-fips zlib/1.2.11 brotli/1.0.7 li…