目录

 一、前期准备

1、查看网卡:

2、配置静态IP

3、设置主机名

4、配置IP与主机名映射

5、关闭防火墙

6、配置免密登录

二、JDK的安装

三、Zookeeper的安装

四、Kafka的安装

1、Kafka的下载安装 

2、修改配置文件

4、分发文件

5、修改其他节点broker.id及advertised.listeners

6、修改zookeeper中的 myid

7、启动ZK集群--启动停止脚本

8、启动Kafka集群--启动停止脚本

四、Kafka命令行操作

1、主题命令行操作

1.1.查看操作主题命令参数

1.2.查看当前服务器中的所有topic

1.3.创建topic

1.4.查看first主题的详情

1.5.修改分区数(注意:分区数只能增加,不能减少)

1.6.再次查看first主题的详情

1.7.删除topic(学生自己演示)

2、生产者命令行操作

2.1.查看操作生产者命令参数

2.2.发送消息

3、消费者命令行操作

3.1.查看操作消费者命令参数

3.2.消费消息


       Kafka 是基于 Java 的,必须先安装 JDK。  此次我们以 kafka_2.12-3.3.1.tgz  即 Kafka3.3.1 为案例,安装jdk1.8。

⚠️ 注意:Kafka 3.9.0 要求本地必须安装 JDK 17 或以上版本。JDK 8 和 11 已不再被官方支持。
     

 一、前期准备

1、查看网卡:

2、配置静态IP

vi /etc/sysconfig/network-scripts/ifcfg-ens32  ----  根据自己网卡设置。

3、设置主机名

hostnamectl --static set-hostname  主机名

例如:

hostnamectl --static set-hostname  hadoop001

4、配置IP与主机名映射

vi /etc/hosts

5、关闭防火墙

systemctl stop firewalld

systemctl disable firewalld

6、配置免密登录

传送门

二、JDK的安装

传送门

三、Zookeeper的安装

传送门

四、Kafka的安装

1、Kafka的下载安装 

1.1. 下载

 https://archive.apache.org/dist/kafka/3.3.1/
​下载  kafka_2.12-3.3.1.tgz 安装包

1.2 上传
使用xshell上传到指定安装路径

此处是安装路径是 /opt/module

​​

1.3 解压重命名

tar -zxvf kafka_2.12-3.3.1.tgz

mv kafka_2.12-3.3.1 kafka

​​

1.4 配置环境变量

vi  /etc/profile

export JAVA_HOME=/opt/module/java

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin

1.5 加载环境变量

source  /etc/profile

验证环境变量是否生效:

env | grep HOME

env | grep PATH

2、修改配置文件

vi /opt/module/kafka/config/server.properties

#broker的全局唯一编号,不能重复,只能是数字。

broker.id=0

#broker对外暴露的IP和端口 (每个节点单独配置)

advertised.listeners=PLAINTEXT://hadoop001:9092

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘IO的线程数量

num.io.threads=8

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔

log.dirs=/opt/module/kafka/datas

#topic在当前broker上的分区个数

num.partitions=1

#用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

# 每个topic创建时的副本数,默认时1个副本

offsets.topic.replication.factor=1

#segment文件保留的最长时间,超时将被删除

log.retention.hours=168

#每个segment文件的大小,默认最大1G

log.segment.bytes=1073741824

# 检查过期数据的时间,默认5分钟检查一次是否数据过期

log.retention.check.interval.ms=300000

#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)

zookeeper.connect=hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka

4、分发文件

scp -r  /etc/profile  root@hadoop002:/etc/profile

scp -r  /etc/profile  root@hadoop003:/etc/profile

scp -r  /opt/module/java root@hadoop002:/opt/module/java 

scp -r  /opt/module/java root@hadoop003:/opt/module/java 

scp -r  /opt/module/zookeeper  root@hadoop002:/opt/module/zookeeper

scp -r  /opt/module/zookeeper  root@hadoop003:/opt/module/zookeeper

scp -r  /opt/module/kafka root@hadoop002:/opt/module/kafka

scp -r  /opt/module/kafka root@hadoop003:/opt/module/kafka

让三台机器文件生效

ssh hadoop001 "source /etc/profile"
ssh hadoop002 "source /etc/profile"
ssh hadoop003 "source /etc/profile"

5、修改其他节点broker.idadvertised.listeners

修改 hadoop002 的

vi /opt/module/kafka/config/server.properties

broker.id=1

#broker对外暴露的IP和端口 (每个节点单独配置)

advertised.listeners=PLAINTEXT://hadoop002:9092

修改 hadoop003 的

vi /opt/module/kafka/config/server.properties

broker.id=2

#broker对外暴露的IP和端口 (每个节点单独配置)

advertised.listeners=PLAINTEXT://hadoop003:9092

或者使用下面脚本

ssh hadoop002 "sed -i 's/^broker.id=0$/broker.id=1/' /opt/module/kafka/config/server.properties"
ssh hadoop003 "sed -i 's/^broker.id=0$/broker.id=2/' /opt/module/kafka/config/server.properties"

ssh hadoop002 "sed -i 's/^advertised.listeners=PLAINTEXT:\/\/hadoop001:9092$/advertised.listeners=PLAINTEXT:\/\/hadoop002:9092/' /opt/module/kafka/config/server.properties"
ssh hadoop003 "sed -i 's/^advertised.listeners=PLAINTEXT:\/\/hadoop001:9092$/advertised.listeners=PLAINTEXT:\/\/hadoop003:9092/' /opt/module/kafka/config/server.properties"

查看效果

ssh hadoop001 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"

ssh hadoop002 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"

ssh hadoop003 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"

6、修改zookeeper中的 myid

修改 hadoop002 的 /opt/module/zookeeper/zkData/myid 内容为 2

vi /opt/module/zookeeper/zkData/myid

2

修改 hadoop003 的 /opt/module/zookeeper/zkData/myid 内容为 3

vi /opt/module/zookeeper/zkData/myid

3

或者使用下面脚本

ssh hadoop002 "echo '2' > /opt/module/zookeeper/zkData/myid"
ssh hadoop003 "echo '3' > /opt/module/zookeeper/zkData/myid"

查看效果

ssh hadoop001 "cat /opt/module/zookeeper/zkData/myid"

ssh hadoop002 "cat /opt/module/zookeeper/zkData/myid"

ssh hadoop003 "cat /opt/module/zookeeper/zkData/myid"

7、启动ZK集群--启动停止脚本

touch /usr/bin/zkall.sh

chmod 777 /usr/bin/zkall.sh

vi /usr/bin/zkall.sh

#!/bin/bashcase $1 in
"start"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 启动 ------------ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh start"done
};;
"stop"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 停止 ------------    ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh stop"done
};;
"status"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 状态 ------------    ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh status"done
};;
esac

启动:/usr/bin/zkall.sh start

停止:/usr/bin/zkall.sh stop

状态:/usr/bin/zkall.sh status

8、启动Kafka集群--启动停止脚本

touch /usr/bin/kfall.sh

chmod 777 /usr/bin/kfall.sh

vi /usr/bin/kfall.sh

#! /bin/bashcase $1 in
"start"){for i in hadoop001 hadoop002 hadoop003doecho " --------启动 $i Kafka-------"ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
};;
"stop"){for i in hadoop001 hadoop002 hadoop003doecho " --------停止 $i Kafka-------"ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-stop.sh "done
};;
esac

启动:/usr/bin/kfall.sh start

停止:/usr/bin/kfall.sh stop

四、Kafka命令行操作

1、主题命令行操作

1.1.查看操作主题命令参数

        bin/kafka-topics.sh

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

--create

创建主题。

--delete

删除主题。

--alter

修改主题。

--list

查看所有主题。

--describe

查看主题详细描述。

--partitions <Integer: # of partitions>

设置分区数。

--replication-factor<Integer: replication factor>

设置分区副本。

--config <String: name=value>

更新系统默认的配置。

1.2.查看当前服务器中的所有topic

在任意一台安装zk的机器上都可以启动客户端

cd /opt/module/kafka/

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --list

1.3.创建topic

cd /opt/module/kafka/

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --create --partitions 1 --replication-factor 3 --topic first

选项说明:

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

1.4.查看first主题的详情

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --describe --topic first

1.5.修改分区数(注意:分区数只能增加,不能减少)

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --alter --topic first --partitions 3

1.6.再次查看first主题的详情

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --describe --topic first

1.7.删除topic(学生自己演示)

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --delete --topic first

2、生产者命令行操作

2.1.查看操作生产者命令参数

bin/kafka-console-producer.sh

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

2.2.发送消息

bin/kafka-console-producer.sh --bootstrap-server hadoop001:9092 --topic first

>hello world

>hello beijing

>hello china

>welecome to china

3、消费者命令行操作

3.1.查看操作消费者命令参数

bin/kafka-console-consumer.sh

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

--from-beginning

从头开始消费。

--group <String: consumer group id>

指定消费者组名称。

3.2.消费消息

(1)消费first主题中的数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic first

2把主题中所有的数据都读取出来(包括历史数据)

bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --from-beginning --topic first

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/87189.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/87189.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/87189.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

opencv入门(6) TrackBar调整图片和键盘响应

文章目录 1 创建trackbar2 使用userdata传入函数3 键盘响应 1 创建trackbar 1.trackbar名称 2.创建在哪个窗口上 3.拖动trackbar改变的值 4.trackBar的最大值 5.trackbar改变时的回调函数 6. 带入回调函数的数据&#xff0c;可以不用带,是一个void指针 createTrackbar(“Value …

QT<33> 修改窗口标题栏背景颜色

前言&#xff1a; 在做项目或者开发QT软件时&#xff0c;如果想要修改窗口标题栏背景颜色&#xff0c;发现没有代码可以直接设置&#xff0c;目前有两种方法可以设置。 第一种&#xff0c;自定义一个界面类&#xff0c;用QLabelQWidget实现&#xff0c;QLabel当作标题栏。 第…

JavaEE-博客系统项目

项目介绍 准备工作 创建数据表 创建项目 添加依赖 创建对应目录 除了基本的数据层mapper&#xff0c;业务层service&#xff0c;交互层controller&#xff0c;还创建了公共类的层common&#xff0c;枚举类层enums&#xff0c;异常类层&#xff0c;和实体类层pojo。 配置项目配…

Java项目:基于SSM框架实现的软件工程项目管理系统【ssm+B/S架构+源码+数据库+毕业论文+开题报告】

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本项目管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息&am…

[按键手机安卓/IOS脚本插件开发] 按键插件调试与判断循环结构辅助工具

实现按键插件的核心原理 通过一个table类型的QMPlugin变量实现按键精灵调用Lua函数&#xff0c;例如 -- Lua代码 -- 实现两数相加求和 function QMPlugin.Add(a, b) return a b end 将以上代码保存成.lua文件&#xff0c;例如test.lua后&#xff0c;放入按键精灵手机助手的p…

提示词框架(9)--CARE

提示词框架不止是AI的框架&#xff0c;也可以是我们的思考框架&#xff0c;拆解问题的方法&#xff01;&#xff01;&#xff01; CARE框架是一种用于优化提示词设计的方法&#xff0c;它帮助用户更有效地与AI进行交互&#xff0c;特别是在需要获取特定信息或实现某些任务时。…

uniapp+vue2 input不显示明文密码,点击小眼睛显示或隐藏密码

<u-input placeholder"请输入密码" prefixIcon"lock" :password"showPassword" v-model"formData.password"prefixIconStyle"font-size: 25px;color: #3C9CFF" border"none"><template slot"suffix…

时间序列的类增量学习:基准与评估

论文地址&#xff1a;https://dl.acm.org/doi/abs/10.1145/3637528.3671581 论文源码&#xff1a;https://github.com/zqiao11/TSCIL 会议&#xff1a;SIGKDD 2024&#xff08;CCF-A&#xff09; 现实环境本质上是非平稳的&#xff0c;随着时间推移常会引入新类别。这在时间序…

智能攻击原理和架构

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 智能攻击系统基于**人工智能&#xff08;AI&#xff09;与大语言模型&#xff08;LLM&#xff09;技术**&#xff0c;通过**环境感知→自主决策→动态执行→对抗进化**的闭环架构实现高效网络入侵。以下…

Beamer-LaTeX学习(教程批注版)【4】

众所周知&#xff0c;随着显示器普及宽屏化&#xff0c;PPT排版需要多列化来充分利用页面的横向空间。 这节课我们来学习如何创建列&#xff0c;即在beamer中增加列内容&#xff0c;以及如何对齐列。 1 使用不同的宽度来创建列 在beamer中我们通过在页面上使用columns环境来创…

通俗理解JVM细节-面试篇

文章目录 前言JVM概述JVM是什么&#xff1f;解决了什么问题&#xff1f;JVM运行流程JVM 与 JRE&#xff0c;JDK的关系 JVM内存结构JVM区域划分程序计数器栈堆方法区 类加载机制五个阶段加载验证准备解析初始化总结双亲委派模型 垃圾回收内存管理什么是GC&#xff1f;如何判定谁…

意识边疆保卫战:22:47深圳AI-BioFab人机融合危机全息实录

前言 前些天发现了一个巨牛的人工智能免费学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站 &#x1f9e0;《意识边疆保卫战&#xff1a;22&#xff1a;47深圳AI-BioFab人机融合危机全息实录》 副标题&#xff1a;机械义…

langchain从入门到精通(三十四)——RAG优化策略(十)父文档检索器实现拆分和存储平衡

1. 拆分文档与检索的冲突 在 RAG 应用开发中&#xff0c;文档拆分 和 文档检索 通常存在相互冲突的愿望&#xff0c;例如&#xff1a; 我们可能希望拥有小型文档&#xff0c;以便它们的嵌入可以最准确地反映它们的含义&#xff0c;如果太长&#xff0c;嵌入/向量没法记录太多…

Javaweb - 7 xml

XML 是EXtensible Markup Language 的缩写&#xff0c;翻译过来就是可扩展标记语言。即&#xff0c;XML 和 HTML 一样&#xff0c;都是标记语言&#xff0c;也就是说&#xff0c;它们的基本语法都是标签。 特点&#xff1a; 1. 可扩展&#xff1a;即 XML 是允许自定义格式的。但…

qml实现 裁剪进度条

我们需要实现一个垂直进度条效果&#xff0c;但使用图片裁剪的方式。具体需求&#xff1a; 1. 图片位置固定&#xff0c;容器对齐其左边和下边&#xff0c;宽度和图片相同。 2. 容器背景透明&#xff0c;调整裁剪容器的高度&#xff0c;使其有高度进度条的感觉&#xff08;从下…

Dify 工作流全栈解析:从零构建你的 AI 应用流程引擎

AI工作流的导言&#xff1a; 工作流系统&#xff08;Workflow System&#xff09;是 Dify 的核心组成部分&#xff0c;它通过可视化编程界面支持创建复杂的 AI 应用程序。用户可以将不同的功能块连接起来&#xff0c;从而设计出用于处理数据、与 AI 模型交互、管理条件以及执行…

上下位机通讯规则

0&#xff1a;事由 最近开发&#xff0c;上位机Qt与下位机通讯的时候发现通讯规则有些不一样&#xff0c;这里简单记录一下 。所有代码基于元宝生成&#xff0c;属于伪代码不保证真实可用&#xff0c;啊但是逻辑是这么个逻辑。 1&#xff1a;底层通讯规则 以STM32向上位机通讯…

创建平衡二叉树C++

给你一个整数数组 nums &#xff0c;其中元素已经按 升序 排列&#xff0c;请你将其转换为一棵 平衡 二叉搜索树。 平衡二叉树&#xff1a;每个节点的左右子树高度差不超过1 class Solution { public:TreeNode* dfs(vector<int>& nums, int left, int right){if(l…

海光の初体验

背景 八张K100的风扇已经将近一年没转过了…早在今年4月29日&#xff0c;Qwen3正式发布并全部开源8款「混合推理模型」。作为Qwen系列中的最新一代大型语言模型&#xff0c;Qwen3在推理、指令遵循、工具调用、多语言能力等方面进行了全面增强。海光DCU&#xff08;Deep Comput…

场外交易(OTC)财富管理系统开发及解决方案报告

——跨境金融科技赋能机构客户新增长 一、OTC市场现状与机构业务痛点 1. 政策机遇与市场扩容 “北向互换通”期限延长&#xff1a;2025年7月1日&#xff0c;中国外汇交易中心联合香港交易所将利率互换合约期限延长至30年&#xff0c;首日交易规模达15.3亿元&#xff0c;填补超…