1、Kafka概述

Apache Kafka是由LinkedIn公司于2010年开发的一款分布式消息系统,旨在解决当时传统消息队列(如ActiveMQ、RabbitMQ)在高吞吐量实时性场景下的性能瓶颈。随着LinkedIn内部对实时日志处理、用户行为追踪等需求的激增,Kafka逐渐演化为一个支持水平扩展持久化存储的流数据平台。2011年,Kafka成为Apache基金会顶级开源项目,并在全球范围内被广泛应用于大数据、实时计算和微服务架构领域。

Kafka的设计哲学源于发布-订阅模型,但其创新性地引入了分布式存储分区化处理机制,使得系统能够高效处理每秒百万级的消息吞吐。这一特性使其迅速成为现代数据管道(Data Pipeline)和流式处理(Stream Processing)的核心组件。

Kafka是一个开源的高吞吐量的分布式消息中间件,对比于其他

1、缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。

2、解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。

3、冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。

4、健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。

5、异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

2、核心组件

组件

说明

Producer

消息生产者,向Kafka发送消息

Consumer

消息消费者,从Kafka读取消息

Broker

Kafka服务器节点,组成Kafka集群

Topic

消息类别/主题,生产者发送到特定Topic,消费者订阅特定Topic

Partition

Topic的分区,实现并行处理和水平扩展

Offset

消息在分区中的唯一标识(位移)

ZooKeeper

管理Kafka集群元数据(新版本已逐步移除ZooKeeper依赖)

3、Kafka的特点与优势

1. 高吞吐量与低延迟

Kafka通过批处理顺序磁盘I/O零拷贝技术(Zero-Copy)优化数据传输效率。生产者(Producer)将消息批量发送至Broker,消费者(Consumer)按顺序拉取数据,避免了传统消息系统的频繁网络交互。实测中,单台Broker可轻松支持每秒数十万条消息的读写。

2. 水平扩展与容错性

Kafka集群由多个Broker(服务器节点)组成,支持动态扩容。每个主题(Topic)被划分为多个分区(Partition),分区可分布在不同Broker上,通过多副本(Replica)机制实现数据冗余。若某Broker宕机,其他副本会自动接管服务,确保系统的高可用性。

3. 持久化存储与回溯消费

消息在Kafka中默认保留7天(可配置为永久存储),消费者可随时重置偏移量(Offset)以重新消费历史数据。这一特性在数据重放、故障恢复等场景中至关重要。

4. 生态兼容性

Kafka与主流大数据工具(如Spark、Flink、Hadoop)深度集成,并提供了Connect API和Streams API,支持构建端到端的流处理管道。

4、Kafka 使用场景

  1. 实时流处理:用户行为追踪、实时推荐
  2. 日志收集:集中式日志系统
  3. 事件源:微服务间的事件驱动架构
  4. 消息队列:系统解耦、削峰填谷
  5. Metrics收集:监控数据聚合

5、什么是Zookeeper?

Zookeeper是一个高性能、高可靠的分布式协调服务,最初由雅虎开发,是Google Chubby的开源实现。它被广泛应用于分布式系统中,用于解决分布式应用中的协调问题,如配置管理、服务注册与发现、分布式锁等。Zookeeper的设计目标是封装复杂且容易出错的关键服务,为分布式应用提供简单易用的接口。

6、Zookeeper的应用场景

Zookeeper在分布式系统中扮演着重要的角色,其典型应用场景包括:

  • 配置管理:Zookeeper可以作为分布式系统的配置中心,集中管理配置信息,确保所有节点能够获取到最新的配置。
  • 服务注册与发现:分布式系统中的服务可以通过Zookeeper进行注册,客户端可以通过查询Zookeeper来发现所需服务。
  • 分布式锁:Zookeeper提供了一种实现分布式锁的机制,确保多个节点对共享资源的访问是互斥的。
  • 集群管理:Zookeeper能够监控集群中节点的状态,及时发现并处理节点故障。
  • 消息队列:Zookeeper可以用于实现分布式消息队列中的协调功能。

7、Zookeeper核心特性

Zookeeper具有以下关键特性:

  • 顺序一致性:客户端的更新操作按照其发送的顺序被应用到Zookeeper上,确保了操作的顺序性。
  • 原子性:所有对Zookeeper的操作都是原子的,要么全部成功,要么全部失败。
  • 单一系统映像:无论Zookeeper集群中有多少节点,客户端看到的都是一个单一的、一致的视图。
  • 可靠性:Zookeeper通过副本机制和选举算法确保系统的高可用性。
  • 实时性:Zookeeper能够实时监控节点的状态变化,并及时通知客户端。

8、ZooKeeper 的作用

ZooKeeper 是一个分布式协调服务,为 Kafka 提供以下关键功能:

功能

具体说明

集群管理

记录 Kafka Broker 的节点状态(存活/下线),维护 Broker 列表。

Controller 选举

Kafka 集群需要一个主控制器(Controller)处理分区和副本管理,ZooKeeper 负责选举。

Topic 元数据存储

存储 Topic 的分区信息、副本分配、Leader 选举结果等元数据。

消费者组管理

记录消费者组的 offset(Kafka 2.8+ 已支持脱离 ZooKeeper,默认仍依赖)。

分布式锁

确保多个 Broker 或客户端操作时的数据一致性(如分区迁移)。

9、Kafka的安装

1、安装JAVA环境

yum -y install java-1.8.0-openjdk

2、安装ZooKeeper

mkdir -p /opt/kafka
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.1-bin.tar.gz
cd apache-zookeeper-3.8.1-bin/conf
cp zoo_sample.cfg zoo.cfg
../bin/zkServer.sh  start

3、安装Kafka

# 解压
tar xvf kafka_2.13-2.8.2.tgz
cd kafka_2.13-2.8.2
ls
# 启动
bin/kafka-server-start.sh config/server.properties#后台启动
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &查看日志
tail -f kafka.log

单机启动,并且ZooKeeper也刚好在本机,所以我们默认不需要修改任何配置就可以直接启动。如果不在一起则修改配置文件:config/server.properties文件。

4、IDEA连接Kafka

cd /opt/kafka/kafka_2.13-2.8.2/configvim server.properties# 修改内容
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.142.131:9092

安装Kafka插件进行连接

10、SpringBoot整合Kafka

1、导入jar包

<!--kafka-->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.1</version>
</dependency>

2、编写配置

  # Kafka 相关配置kafka:# Kafka 服务器地址bootstrap-servers: 192.168.142.131:9092# 生产者配置producer:# 序列化器,用于将键转换为字节key-serializer: org.apache.kafka.common.serialization.StringSerializer# 序列化器,用于将值转换为字节value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者配置consumer:# 消费者组ID,用于区分不同的消费者组group-id: my-application-group# 自动偏移量重置策略,当没有初始偏移量时从最早的偏移量开始消费auto-offset-reset: earliest# 禁用自动提交功能enable-auto-commit: false# 反序列化器,用于将字节转换为键key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 反序列化器,用于将字节转换为值value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 配置监听器的确认模式为手动立即确认listener:ack-mode: manual_immediate# 主题配置topic:default: default-topicmanual: manual-commit-topic

3、Kafka消息生产者

package com.lw.mqdemo.mq.kafka;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;/*** Kafka消息生产者服务类*/
@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送消息到指定主题* @param topic 主题名称* @param message 消息内容*/public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}/*** 发送消息到指定主题和分区* @param topic 主题名称* @param partition 分区号* @param key 消息键* @param message 消息内容*/public void sendMessage(String topic, Integer partition, String key, String message) {kafkaTemplate.send(topic, partition, key, message);}
}

4、Kafka消息消费者

package com.lw.mqdemo.mq.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;/*** Kafka 消费者服务*/
@Service
public class KafkaConsumerService {/*** 监听指定主题的消息(自动提交)* @param message 消息内容*/@KafkaListener(topics = "${spring.kafka.topic.default}", groupId = "${spring.kafka.consumer.group-id}")public void consumeAutoCommit(String message) {System.out.println("接收到自动提交消息: " + message);// 业务处理逻辑}/*** 监听指定主题的消息(手动提交)* 偏移量(Offset) 是 Kafka 为分区(Partition)中的每条消息分配的唯一序号(从 0 开始递增),用于标识消息在分区中的位置。* @param record 消息记录(包含元数据)* @param ack 确认对象*/@KafkaListener(topics = "${spring.kafka.topic.manual}", groupId = "${spring.kafka.consumer.group-id}")public void consumeManualCommit(ConsumerRecord<String, String> record, Acknowledgment ack) {try {System.out.println("接收到手动提交消息: key=" + record.key() + ", value=" + record.value() + ", topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset());// 业务处理逻辑// 手动提交偏移量ack.acknowledge();} catch (Exception e) {// 处理异常,可以选择不提交偏移量以便重试System.err.println("消息处理失败: " + e.getMessage());}}
}

5、测试类

package com.lw.mqdemo.controller;import com.lw.mqdemo.mq.kafka.KafkaProducerService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** Kafka 控制器*/
@RestController
@RequestMapping("/api/kafka")
public class KafkaController {private final KafkaProducerService producerService;public KafkaController(KafkaProducerService producerService) {this.producerService = producerService;}/*** 发送消息到指定的Kafka主题*/@PostMapping("/send")public String sendMessage(@RequestParam("topic") String topic,@RequestParam("message") String message) {producerService.sendMessage(topic, message);return "消息已发送到主题: " + topic;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/85366.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/85366.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/85366.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能指针 c++

C 智能指针详解 智能指针是 C11 引入的内存管理工具&#xff0c;位于 <memory> 头文件中&#xff0c;用于自动管理动态分配的内存&#xff0c;防止内存泄漏。主要类型如下&#xff1a; 1. std::unique_ptr (独占所有权) 特点&#xff1a;唯一拥有所指对象&#xff0c;不…

Python应用八股文

大家好!在 Python 学习的道路上&#xff0c;掌握一些基础知识要点至关重要&#xff0c;这些要点常被称为“Python 八股”。以下是对它们的简易总结&#xff0c;帮助你快速回顾和巩固 Python 的核心概念。 一、数据结构 列表&#xff08;List&#xff09;&#xff1a;有序可变序…

【技术深度】领码SPARK破解微服务数据依赖困局:架构设计与实践指南

——深度解析分布式数据冗余与异步消息机制&#xff0c;驱动企业数字化转型加速 ✨ 核心摘要 本文从技术架构与工程实现的角度&#xff0c;系统讲解领码SPARK融合平台如何精准解决微服务架构下数据依赖“卡脖子”问题。通过设计高效的数据冗余模型和完善的异步消息更新机制&am…

关于前端的防抖和节流

给我解释下 前端开发中的防抖和节流 并举个具体的例子 防抖&#xff08;Debounce&#xff09;与节流&#xff08;Throttle&#xff09;详解 在前端开发中&#xff0c;防抖&#xff08;Debounce&#xff09; 和 节流&#xff08;Throttle&#xff09; 是两种优化高频触发事件的…

React-router 多类型历史记录栈

react-router 为了满足开发者更多路由历史存储场景&#xff0c;提供了以下几种模式&#xff1a; 浏览器原生历史记录 浏览器 hash 内存型 服务端记录 以上实现分别对应于一下 API 实现&#xff1a; createBrowserRouter&#xff1a;浏览器提供的历史管理。 createHashRou…

java设计模式[3]之结构型模式

文章目录 一 代理模式1.1 静态代理1.1.1 静态代理的结构1.1.2 静态代理的特点1.1.3 静态代理的应用场景1.1.4 静态代理的案例代码 1.2 JDK动态代理1.2.1 JDK动态代理概述1.2.2 JDK动态代理案例代码1.2.3 JDK动态代理的应用场景1.2.4 JDK动态代理的特点1.2.5 与创建型模式的区别…

鸿蒙Harmony测试-wukong稳定性工具(类似Android的Monkey测试)

一、功能介绍 wukong是系统自带的一种命令行工具&#xff0c;支持Ability的随机事件注入、控件注入、异常捕获、报告生成和对Ability数据遍历截图等特性。通过模拟用户行为&#xff0c;对系统或应用进行稳定性压力测试。wukong分为随机测试、专项测试和专注测试。 随机测试是指…

从零学起VIM

前言 笔者早年刚入行的时候就接触过Vim,当时还是真正的菜鸟&#xff0c;带我的师父是一个华为骨干员工&#xff0c;犹记得他给我指导如何保存并关闭文本&#xff1a;按Esc&#xff0c;然后输入:wq。还记得自己打开Vim编辑器&#xff0c;一个字符都敲不进去&#xff0c;然后问旁…

不依赖rerank 模型排序通过使用 PostgreSQL 中的 pgvector 与 tsearch2 函数进行混合搜索提高召回率

前言 在向量搜索中&#xff0c;召回率是一个关键指标&#xff0c;它衡量搜索结果的相关性。然而&#xff0c;提高召回率往往会牺牲其他指标&#xff0c;如索引大小或查询延迟。为了平衡这些权衡&#xff0c;混合搜索技术应运而生。本文将介绍如何在 PostgreSQL 中结合 pgvecto…

Uniapp 跨平台开发框架全面解析:一次开发,多端运行

在移动互联网时代&#xff0c;开发者面临着一个重要挑战&#xff1a;如何高效地开发出能在多个平台&#xff08;iOS、Android、Web、小程序等&#xff09;上运行的应用&#xff1f;传统的原生开发方式需要为每个平台单独编写代码&#xff0c;导致开发周期长、维护成本高。而 Un…

ios如何把H5网页变成主屏幕webapp应用

一、将 H5 页面添加到主屏幕的步骤 打开 Safari 浏览器 在 iPhone 上打开 Safari 浏览器&#xff0c;访问目标网页&#xff08;H5 页面&#xff09;。 点击分享按钮 在 Safari 浏览器底部点击 “分享” 图标&#xff08;箭头向上的按钮&#xff09;。 添加到主屏幕 在分享菜单…

Node.js 项目启动命令大全 (形象版)

文章目录 Node.js 项目启动命令大全 &#x1f31f;✨&#xff08;形象版&#xff09;一、&#x1f50d; 如何查看项目启动命令&#xff08;魔法书目录&#xff09;package.json scripts 参数详解开发相关脚本测试相关脚本构建相关脚本代码质量相关脚本最佳实践 二、&#x1f68…

爱普特APT32F1104C8T6单片机 高抗干扰+硬件加密双保障

爱普特APT32F1104C8T6单片机深度解析 1. 产品定位 APT32F1104C8T6 是爱普特半导体&#xff08;APT&#xff09;推出的 32位高性能经济型单片机&#xff0c;基于 ARM Cortex-M0内核&#xff0c;采用 LQFP48封装&#xff0c;主打 高性价比、低功耗、强抗干扰&#xff0c;是替代进…

使用uni-app ios 打包流程

配置几个步骤即可 1、打包ios需要BundleID ID 2、证书私钥密码 3、信任文件证书文件 4、私钥证书 5、打包 6、获取打包后的ipa文件 7、通过爱思助手安装到iso手机上 8、完成 1、下载&#xff1a;App Uploader去获取我们想要的证书私钥等文件 2、下载完成解压后的文件如下打…

仿muduo库实现并发服务器

1.实现目标 仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器&#xff1a; 通过实现高并发服务器的组件&#xff0c;可以快速实现一个高并发服务器的搭建&#xff0c;并且&#xff0c;通过组内不同应用层协议的支持&#xff0c;可以快速完成高性能服务器的搭建…

迭代器模式:集合遍历的统一之道

引言&#xff1a;集合遍历的演进之路 在软件开发中&#xff0c;集合遍历是我们每天都要面对的基础操作。从最初的数组索引遍历到现代的流式处理&#xff0c;我们经历了&#xff1a; #mermaid-svg-KwTr9k8JgbwRTDhU {font-family:"trebuchet ms",verdana,arial,sans-…

Spring Security OAuth2 组件

我们来系统地讲解一下 Spring Security OAuth2 这个强大的组件。我会从概念、作用、核心组件&#xff0c;以及实际应用场景来为你剖析。 1. 什么是 Spring Security OAuth2&#xff1f; 简单来说&#xff0c;Spring Security OAuth2 是 Spring Security 框架的一个模块&#…

Redis的持久化功能

Redis的持久化功能能够将内存中的数据保存到磁盘&#xff0c;从而在重启后恢复数据。下面为你详细介绍Redis的两种主要持久化方式及其配置方法。 RDB&#xff08;Redis Database&#xff09;持久化 RDB持久化是通过生成某个时间点的数据集快照来实现的。它具有高性能的特点&a…

Chrome 将成为下一个 IE6

最近在技术圈刷到一个帖子&#xff0c;说&#xff1a;“Chrome 就快变成新的 IE6 了。” 乍一看有点危言耸听&#xff0c;但你一细品&#xff0c;发现还真挺像回事。 想当年&#xff1a;IE6 是怎么垮的&#xff1f; IE6 当年多风光&#xff1f;全球市场份额一度超过 90%&#…

Redis 配置文件详解redis.conf 从入门到实战

一、redis.conf 是什么&#xff1f; Redis 的配置文件&#xff08;默认命名为 redis.conf&#xff0c;Redis 8.0 之后改为 redis-full.conf&#xff09;控制着服务运行的各项参数。该文件采用以下结构&#xff1a; 指令名 参数1 参数2 ... 参数N例如&#xff1a; replicaof …