目录
一、kafka简介
1.1、概述
1.2、消息系统介绍
1.3、点对点消息传递模式
1.4、发布-订阅消息传递模式
二、kafka术语解释
2.1、结构概述
2.2、broker
2.3、topic
2.4、producer
2.5、consumer
2.6、consumer group
2.7、leader
2.8、follower
2.9、partition
2.10、offset
2.11、replica
2.12、message
2.13、zookeeper
三、kafka架构
四、kafka的部署
4.1、软件下载
4.1.1、jdk的安装
4.1.2、zookeeper安装
4.1.3、kafka的安装
4.2、单机模式
4.3、集群部署
4.3.1、针对每一个节点的hosts文件添加节点的ip映射信息
4.3.2、时间同步
4.3.3、zookeeper配置
4.3.4、创建对应的服务id
4.3.5、zoo.cfg参数解析
4.3.6、集群kafka配置
一、kafka简介
1.1、概述
1.2、消息系统介绍
1.3、点对点消息传递模式

1.4、发布-订阅消息传递模式

二、kafka术语解释
2.1、结构概述
2.2、broker
2.3、topic
2.4、producer
2.5、consumer
2.6、consumer group
2.7、leader
2.8、follower
2.9、partition
2.10、offset

2.11、replica
2.12、message
2.13、zookeeper
三、kafka架构


四、kafka的部署
4.1、软件下载
4.1.1、jdk的安装
自带JDK,这种JDK可以使用java -version检查,如果使用javac就不行了,所以进行安装sudo yum install java-1.8.0-openjdk-devel -y
4.1.2、zookeeper安装
Apache ZooKeeper
选择3.5.7版本
上传服务器,安装
解压
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin zookeeper3.5.7
mv zookeeper3.5.7/ /opt创建软链接
ln -s /opt/zookeeper3.5.7/ /opt/zookeeper配置环境变量
vim /etc/profile添加
export ZK_HOME=/opt/zookeeper
export PATH=$PATH:$ZK_HOME/binsource /etc/profile将Zookeeper提供的配置文件复制一份,复制成Zookeeper默认寻找的文件
cd /opt/zookeeper/conf
ls
cp zoo_sample.cfg zoo.cfg
cd ..创建数据存放目录
mkdir data
chmod 755 /opt/zookeeper/data修改数据存放位置
cd conf/
vim zoo.cfg##修改以下配置
dataDir=/opt/zookeeper/data启动 Zookeeper,Zookeeper的bin目录下
cd ..
./bin/zkServer.sh start zoo.cfg
检测zookeeper是否正常
jps # 看到控制台成功输出 QuorumPeerMain,表示启动成功./bin/zkServer.sh status zoo.cfg ## Mode: standalone表示ok
4.1.3、kafka的安装
https://kafka.apache.org/downloads
选择 kafka_2.12-3.8.0.tgz
进行下载,Scala 2.12 和 Scala 2.13 主要是使用Scala编译的版本不同,两者皆可
上传服务器,安装
解压
tar -zxvf kafka_2.12-2.7.0.tgz
mv kafka_2.12-2.7.0 /opt
cd /opt创建软链接
ln -s /opt/kafka_2.12-2.7.0/ /opt/kafka
ls配置环境变量
vim /etc/profile添加
export KAFKA_HOME=/opt/kafka
export PATH=:$PATH:${KAFKA_HOME}source /etc/profile
4.2、单机模式
在Kafka的config目录下存在相关的配置信息——本次我们只想让Kafka快速启动起来只关注server.properties文件即可cd ${KAFKA_HOME}/config
ls
#connect-console-sink.properties connect-file-source.properties consumer.properties server.properties
#connect-console-source.properties connect-log4j.properties kraft tools-log4j.properties
#connect-distributed.properties connect-mirror-maker.properties log4j.properties trogdor.conf
#connect-file-sink.properties connect-standalone.properties producer.properties zookeeper.properties打开配置文件,并主要注意以下几个配置
vim server.propertiesbroker.id=0 #kafka服务节点的唯一标识,这里是单机不用修改
# listeners = PLAINTEXT://host1:9092 别忘了设置成自己的主机名
listeners=PLAINTEXT://host1:9092 #kafka底层监听的服务地址,注意是使用主机名,不是ip。
# log.dirs 指定的目录 kafka启动时可以自动创建,因此不要忘了让kafka可以有读写这个目录的权限。
log.dirs=/opt/kafka/data ##kafka的分区以日志的形式存储在集群中(其实就是broker数据存储的目录)log.retention.hours=168 #日志的留存策略,默认168小时也就是一周
# zookeeper 的连接地址 ,别忘了设置成自己的主机名,单机情况下可以使用 localhost
zookeeper.connect=host1:2181
启动kafka
./bin/kafka-server-start.sh -daemon config/server.properties #后台启动kafka使用 jps 查看是否成功启动kafka
jps
34843 QuorumPeerMain
21756 Jps
116076 Kafka
4.3、集群部署
4.3.1、针对每一个节点的hosts文件添加节点的ip映射信息
vim /etc/hosts
192.168.157.80 host1
192.168.157.81 host2
192.168.157.82 host3
4.3.2、时间同步
yum install ntp -y
ntpdate cn.pool.ntp.org | ntp[1-7].aliyun.com #两个时钟同步地址选择一个就行
4.3.3、zookeeper配置
vim /opt/zookeeper/conf/zoo.cfg
##额外添加以下配置
server.1=host1:2888:3888 #数据同步端口:领导选举时服务器监听的端口
server.2=host2:2888:3888
server.3=host3:2888:3888
4.3.4、创建对应的服务id
# host1
echo 1 > /opt/zookeeper/data/myid #在这个文件中写入自己服务的id号
# host2
echo 2 > /opt/zookeeper/data/myid
# host3
echo 3 > /opt/zookeeper/data/myid
4.3.5、zoo.cfg参数解析
tickTime=2000: 通信心跳数,用于设置Zookeeper服务器与客户端之间的心跳时间间隔,单位是毫秒。这个时间间隔是Zookeeper使用的基本时间单位,用于服务器之间或客户端与服务器之间维持心跳的时间间隔。initLimit=10: LF初始通信时限,用于设置集群中的Follower跟随者服务器与Leader领导者服务器之间启动时能容忍的最多心跳数。如果在这个时限内(10个心跳时间)领导和根随者没有发出心跳通信,就视为失效的连接,领导和根随者彻底断开。syncLimit=5: LF同步通信时限,用于设置集群启动后,Leader与Follower之间的最大响应时间单位。假如响应超过这个时间(syncLimit * tick Time -> 10秒),Leader就认为Follower已经死掉,会将Follower从服务器列表中删除。dataDir: 数据文件目录+数据持久化路径,主要用于保存Zookeeper中的数据。dataLogDir: 日志文件目录,用于存储Zookeeper的日志文件。clientPort=2181: 客户端连接端口,用于监听客户端连接的端口
4.3.6、集群kafka配置
server.properties
配置文件
cd ${KAFKA_HOME}/config
vim server.propertiesbroker.id=0 #kafka服务节点的唯一标识
# listeners = PLAINTEXT://your.host.name:9092 别忘了设置成自己的主机名
listeners=PLAINTEXT://host1:9092 #集群中需要设置成每个节点自己的
# log.dirs 指定的目录 kafka启动时可以自动创建,因此不要忘了让kafka可以有读写这个目录的权限。
log.dirs=/opt/kafka/data ##kafka的分区以日志的形式存储在集群中(其实就是broker数据存储的目录)
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168 #日志的留存策略,默认168小时也就是一周
# zookeeper 集群的连接地址
zookeeper.connect=host1:2181,host2:2181,host3:2181
其余配置:
##修改差异配置
cd ${KAFKA_HOME}/config
vim server.properties# host2节点
broker.id=1
listeners=PLAINTEXT://host2:9092
# host3节点
broker.id=2
listeners=PLAINTEXT://host3:9092
kafka集群即可正常启动
kafka其余命令./bin/kafka-server-stop.sh #关闭kafka
kafka-console-consumer.sh #消费命令
kafka-console-producer.sh #生产命令
kafka-consumer-groups.sh #查看消费者组,重置消费位点等
kafka-topics.sh #查询topic状态,新建,删除,扩容
kafka-acls.sh #配置,查看kafka集群鉴权信息
kafka-configs.sh #查看,修改kafka配置
kafka-mirror-maker.sh #kafka集群间同步命令
kafka-preferred-replica-election.sh #重新选举topic分区leader
kafka-producer-perf-test.sh #kafka自带生产性能测试命令
kafka-reassign-partitions.sh #kafka数据重平衡命令
kafka-run-class.sh #kafka执行脚本