Apache Kafka 简介


一、什么是 Kafka?

Apache Kafka 是一个高吞吐量、分布式、可扩展的流处理平台,用于构建实时数据管道和流应用程序。它最初由 LinkedIn 开发,并于 2011 年开源,目前由 Apache 软件基金会进行维护。

Kafka 具备以下核心特性:

  • 发布-订阅消息系统:支持生产者向主题(Topic)发送消息,消费者从主题中读取消息。
  • 高吞吐量与低延迟:可处理百万级每秒消息,延迟低于几毫秒。
  • 持久化存储:消息以日志形式存储在磁盘上,可设定保留时间。
  • 可水平扩展:通过分区(Partition)机制轻松扩展读写能力。
  • 高容错性:副本机制保障在节点故障时依旧能够正常运行。

Kafka 不仅是一个消息队列,更是一个用于流数据处理的统一平台


二、Kafka 的应用场景

Kafka 在大数据和分布式系统领域具有广泛应用,主要包括:

1. 日志收集与传输

Kafka 可统一收集来自不同服务或服务器的日志,作为日志系统的核心组件,将数据传输至后端处理系统(如 Hadoop、Elasticsearch 等)。

2. 实时数据分析

结合 Apache Flink、Spark Streaming 等流处理框架,Kafka 可用于构建实时分析平台,实现实时用户行为分析、实时监控等。

3. 事件驱动架构(EDA)

Kafka 作为微服务架构中的事件总线,使服务之间通过事件解耦,从而提高系统灵活性与可维护性。

4. 数据管道(Data Pipeline)

Kafka 能将数据从数据库、日志系统等源系统传输到数据仓库或数据湖,是构建高效可靠数据管道的核心工具。

5. 替代传统消息队列

在对吞吐量、可扩展性有更高要求的系统中,Kafka 可替代传统消息中间件(如 RabbitMQ、ActiveMQ)作为消息传递通道。


三、Kafka 的诞生背景

Kafka 的诞生源于 LinkedIn 内部对于日志处理和数据传输系统的性能瓶颈

  • LinkedIn 的业务快速增长,系统需要处理海量的用户行为数据与日志。
  • 传统的消息队列系统难以满足高吞吐量与高可用性的要求。
  • 工程团队设计了一种新的架构,将“分布式日志”作为核心思想,构建出一个同时支持日志收集、传输与处理的统一平台。

Kafka 在设计上融合了以下理念:

  • 以持久化日志为核心数据结构:每条消息即为一条日志记录,可重复读取。
  • 分布式架构支持高可用性与高扩展性:通过集群部署和分区副本机制实现。
  • 支持批处理和流处理双模式:既可用于数据采集与离线分析,也适合实时流处理。

这一创新架构为 Kafka 后来的广泛应用打下了坚实基础,也推动了现代数据架构的演进。

好的!以下是 Kafka Java 快速入门指南,适合初学者快速了解如何在 Java 程序中使用 Kafka 实现消息的生产与消费。


明白了!以下是使用 Mermaid 格式图形 重新整理的 Kafka 集群搭建指南,清晰展示了 Kafka + ZooKeeper 的集群结构和搭建步骤。


Kafka 集群搭建指南(ZooKeeper 模式)


一、Kafka 集群架构图(Mermaid 格式)

Kafka集群
ZooKeeper集群
Kafka Broker 1
192.168.1.101:9092
Kafka Broker 2
192.168.1.102:9092
Kafka Broker 3
192.168.1.103:9092
ZooKeeper 节点1
192.168.1.101:2181
ZooKeeper 节点2
192.168.1.102:2181
ZooKeeper 节点3
192.168.1.103:2181
Kafka 客户端

Topic 分区布局
Leader
Replica
Replica
Leader
Replica
Replica
Leader
Replica
Replica
Partition 0
Leader: Kafka-1
Replica: Kafka-2, Kafka-3
Partition 1
Leader: Kafka-2
Replica: Kafka-1, Kafka-3
Partition 2
Leader: Kafka-3
Replica: Kafka-1, Kafka-2
Kafka Broker 1
192.168.1.101:9092
Kafka Broker 2
192.168.1.102:9092
Kafka Broker 3
192.168.1.103:9092

二、准备工作

1. 系统要求

  • Linux/CentOS/Ubuntu(或容器)
  • Java 8+(推荐 Java 11)
  • 至少 3 台机器或虚拟节点

2. 下载 Kafka 安装包(每台机器)

wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0

三、配置 ZooKeeper 集群

1. 修改 config/zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888

2. 设置每个节点的 myid

echo 1 > /tmp/zookeeper/myid  # 节点1
echo 2 > /tmp/zookeeper/myid  # 节点2
echo 3 > /tmp/zookeeper/myid  # 节点3

3. 启动 ZooKeeper(每台执行)

bin/zookeeper-server-start.sh config/zookeeper.properties

四、配置 Kafka Broker

每台机器修改 config/server.properties,示例:

broker.id=1                           # 每个节点唯一(如 1、2、3)
listeners=PLAINTEXT://192.168.1.101:9092
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181

启动 Kafka:

bin/kafka-server-start.sh config/server.properties

五、验证集群

1. 创建主题

bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092 \
--replication-factor 3 --partitions 3 \
--topic test-topic

2. 查看主题分布

bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server 192.168.1.101:9092

六、发送与消费消息

生产者:

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.1.101:9092

消费者:

bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server 192.168.1.102:9092 --from-beginning

七、常见问题

问题原因
Kafka 启动失败broker.id 重复或端口冲突
消息无法消费ZooKeeper 未正常连接,主题未正确创建
节点日志混乱或冲突log.dirs 配置重复,broker.id 没有区分
ZooKeeper 单点故障节点不足,推荐部署奇数个节点(3/5)

Kafka Java 快速入门指南


一、准备工作

1. 添加 Maven 依赖

在你的项目的 pom.xml 中添加以下依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency>
</dependencies>

二、Kafka Producer 示例(生产者)

1. 编写 KafkaProducerDemo.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerDemo {public static void main(String[] args) {// 配置Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka 地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 5; i++) {String message = "Hello Kafka " + i;producer.send(new ProducerRecord<>("test-topic", message));System.out.println("Sent: " + message);}producer.close();}
}

三、Kafka Consumer 示例(消费者)

1. 编写 KafkaConsumerDemo.java

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerDemo {public static void main(String[] args) {// 配置Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group"); // 消费组props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest"); // 从头开始消费// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: key=%s, value=%s, offset=%d%n",record.key(), record.value(), record.offset());}}}
}

四、运行顺序建议

  1. 启动 Kafka 服务(本地或远程)
  2. 先运行消费者 KafkaConsumerDemo(等待监听)
  3. 再运行生产者 KafkaProducerDemo(发送消息)

五、调试小技巧

  • 确保 Kafka 服务正常运行,端口默认为 9092
  • 主题 test-topic 必须提前创建,或在 Kafka 开启 auto.create.topics.enable=true 的情况下自动创建。
  • 消费者默认是“只消费一次”,再次运行需更改 group.id 或开启重复读取逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/79319.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/79319.shtml
英文地址,请注明出处:http://en.pswp.cn/web/79319.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Blender 初学者指南 以及模型格式怎么下载

glbxz.com glbxz.com 可以直接下载Blender格式模型 第 1 步&#xff1a;打开 这就是 blender 打开时的样子。 您面对的是左侧和右侧的工具栏&#xff0c;顶部是文件作&#xff0c;底部是时间轴&#xff0c;中间是 3D 视图。 Blender 的默认起始网格是一个立方体&#xff0c…

RV1126 ROS2环境交叉编译及部署(基于官方Docker)

RV1126 ROS2环境交叉编译及部署(基于官方Docker) 0 前言1 SDK源码更新1.1 启动Docker容器1.2 更新SDK源码1.3 SDK更新问题2 ROS2编译配置3 Buildroot rootfs编译ROS2的依赖包3.1 编译问题解决4 使用Docker交叉编译ROS24.1 准备Linux(Ubuntu) PC机的依赖环境4.1.1 Ubuntu PC机…

Go 面向对象,封装、继承、多态

Go 面向对象&#xff0c;封装、继承、多态 经典OO&#xff08;Object-oriented 面向对象&#xff09;的三大特性是封装、继承与多态&#xff0c;这里我们看看Go中是如何对应的。 1. 封装 封装就是把数据以及操作数据的方法“打包”到一个抽象数据类型中&#xff0c;这个类型…

无线网络设备中AP和AC是什么?有什么区别?

无线网络设备中AP和AC是什么&#xff1f;有什么区别&#xff1f; 一. 什么是AP&#xff1f;二. 什么是AC&#xff1f;三. AP与AC的关系 前言 肝文不易&#xff0c;点个免费的赞和关注&#xff0c;有错误的地方请指出&#xff0c;看个人主页有惊喜。 作者&#xff1a;神的孩子都…

Android SDK

Windows纯净卸载Android SDK 1.关闭所有安卓相关的程序 Android StudioEmulators 如模拟器Command prompts using SDK 如appium服务 2.移除SDK相关目录 # Delete your SDK directory F:\android_sdk\android-sdk-windows# Also check and remove if present: $env:LOCALAPP…

Android耗电优化全解析:从原理到实践的深度治理指南

引言 在移动应用性能优化体系中&#xff0c;耗电优化是用户体验的核心指标之一。据Google官方统计&#xff0c;超过60%的用户会因为应用耗电过快而选择卸载应用。本文将从耗电统计原理、监控手段、治理策略三个维度展开&#xff0c;结合Android系统源码与实际代码示例&#xf…

QMK自定义4*4键盘固件创建教程:最新架构详解

QMK自定义4*4键盘固件创建教程&#xff1a;最新架构详解 前言 通过本教程&#xff0c;你将学习如何在QMK框架下创建自己的键盘固件。QMK是一个强大的开源键盘固件框架&#xff0c;广泛用于DIY机械键盘的制作。本文将详细介绍最新架构下所需创建的文件及其功能。 准备工作 在…

DAMA第10章深度解析:参考数据与主数据管理的核心要义与实践指南

引言 在数字化转型的浪潮中&#xff0c;数据已成为企业的核心资产。然而&#xff0c;数据孤岛、冗余和不一致问题严重制约了数据价值的释放。DAMA&#xff08;数据管理协会&#xff09;提出的参考数据&#xff08;Reference Data&#xff09;与主数据&#xff08;Master Data&…

力扣题解:2、两数相加

个人认为&#xff0c;该题目可以看作合并两个链表的变种题&#xff0c;本题与21题不同的是&#xff0c;再处理两个结点时&#xff0c;对比的不是两者的大小&#xff0c;而是两者和是否大于10&#xff0c;加法计算中大于10要进位&#xff0c;所以我们需要声明一个用来标记是否进…

深度学习部署包含哪些步骤?

深度学习部署包含哪些步骤&#xff1f; 阶段说明示例工具模型导出把 .pt、.h5 等格式模型导出为通用格式&#xff08;如ONNX&#xff09;PyTorch, TensorFlow, ONNX推理优化减小模型体积、加速推理&#xff08;量化、剪枝&#xff09;TensorRT, ONNX Runtime系统集成将模型嵌入…

路由策略和策略路由的区别以及配置案例

区别 路由策略&#xff1a;路由策略是通过ACL等方式控制路由发布&#xff0c;让对方学到适当路由条目&#xff0c;比如有20条路由&#xff0c;只想让某个路由器学到10条&#xff0c;可以通过路由策略进行过滤。 策略路由&#xff1a;策略路由是通过定义策略和应用&#xff0c…

LeetCode 热题 100 64. 最小路径和

LeetCode 热题 100 | 64. 最小路径和 大家好&#xff0c;今天我们来解决一道经典的动态规划问题——最小路径和。这道题在 LeetCode 上被标记为中等难度&#xff0c;要求找到从网格的左上角到右下角的路径&#xff0c;使得路径上的数字总和为最小。 问题描述 给定一个包含非负…

JavaSE核心知识点02面向对象编程02-06(泛型)

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 JavaSE核心知识点02面向对象编程02-06&#…

LVGL对象的盒子模型和样式

文章目录 &#x1f9f1; LVGL 对象盒子模型结构&#x1f50d; 组成部分说明&#x1f3ae; 示例代码&#x1f4cc; 总结一句话 &#x1f9f1; 一、样式的本质&#xff1a;lv_style_t 对象&#x1f3a8; 二、样式应用的方式&#x1f9e9; 三、样式属性分类&#xff08;核心&#…

Github上如何准确地搜索开源项目

Github上如何准确地搜索开源项目&#xff1a; 因为寻找项目练手是最快速掌握技术的途径&#xff0c;而Github上有最全最好的开源项目。 就像我的毕业设计“机器翻译”就可以在Github上查找开源项目来参考。 以下搜索针对&#xff1a;项目名的关键词&#xff0c;关注数限制&a…

正点原子IMX6U开发板移植Qt时出现乱码

移植Qt时出现乱码 1、前言2、问题3、总结 1、前言 记录一下正点原子IMX6U开发板移植Qt时出现乱码的解决方法&#xff0c;方便自己日后回顾&#xff0c;也可以给有需要的人提供帮助。 2、问题 用正点原子IMX6U开发板移植Qt时移植Qt后&#xff0c;sd卡里已经存储了Qt的各种库&…

python-django项目启动寻找静态页面html顺序

目录结构 settings模块 urls模块 views模块 1.settings文件下没有DIR目录,按照各app注册顺序寻找静态页面 启动效果&#xff0c;直接返回注册的app即app01下的templates文件夹下的html页面 2.settings文件添加上DIR目录 启动效果&#xff0c;会优先去找项目下的templates文件…

MySQL索引详解(上)(结构/分类/语法篇)

一、索引概述 索引本质是帮助MySQL高效获取数据的排序数据结构&#xff08;类似书籍目录&#xff09;&#xff0c;通过减少磁盘I/O次数提升查询效率。其核心价值体现在大数据量场景下的快速定位能力&#xff0c;但同时带来存储和维护成本。 核心特点&#xff1a; 优点&#…

数据集-目标检测系列- 烟雾 检测数据集 smoke >> DataBall

数据集-目标检测系列- 消防 浓烟 检测数据集 smoke>> DataBall 数据集-目标检测系列- 烟雾 检测数据集 smoke &#xff1e;&#xff1e; DataBall * 相关项目 1&#xff09;数据集可视化项目&#xff1a;gitcode: https://gitcode.com/DataBall/DataBall-detections-10…

docker + K3S + Jenkins + Harbor自动化部署

最近公司在研究自动化部署的一套流程&#xff0c;下面记录一下配置流程 需要提前准备好Jenkins Harbor Git(其他管理工具也可以) 我这里的打包编译流程是Jenkins上配置打包任务-->自动到git目录下找打包文件---->项目编译后打镜像包------>打完镜像包将镜像上传到…