文章目录

  • 一、日志收集所解决的问题
  • 二、`Elastic Stack` 组件介绍
    • 2.1 `Elasticsearch`
    • 2.2 `Logstash`
    • 2.3 `Kibana`
    • 2.4 `Filebeat` beats
  • 三、`ELK Stack`集群安装
    • 3.1 安装`JAVA`环境(所有`ES`节点)
    • 3.2 安装`ES`集群
      • 3.2.1 `ES`单节点部署
      • 3.2.2 `ES` `JAVA`调优:堆(`heap`)内存大小
      • 3.2.3 `ES`使用自定义`JAVA`环境配置
      • 3.2.3 部署`ES`高可用集群
    • 3.3 `Kibana`安装(`elk03`节点)
    • 3.4 `Logstash`安装(`elk02`)
    • 3.5 `Filebeat`安装(`elk01`)
      • 3.5.1 `Filebeat`架构
  • 四、`Filebeat`日志收集
    • 4.1 `Filebeat`标准输入和输出
      • 4.2.2 `Filebeat `基于`Log`类型进行输入并且自定义`Tags`
      • 4.2.8 `Filebeat`输出数据至`Elasticsearch`集群
      • 4.2.12 `FIlebeat` 收集`JSON`格式的`nginx`访问日志
  • 五、`Logstash` 日志收集
    • 5.1 `Logstash`安装
    • 5.3 `Logstash`基于`File`的形式进行`input`
    • 5.4 `Logstash`基于`Filebeat`形式进行`input`
    • 5.5 Logstash基于ES形式进行output
  • 六、`Logstash grok`插件使用
    • 6.1 `Logstash` 内置正则使用
    • 6.4 `Logstash`删除指定字段
  • 八、`Logstash useragent`插件分析用户客户端类型
  • 九、`Logstash`多`if`分支
  • 十、`Kibana`操作使用
    • 10.1 `Kibana`手动创建索引模板
  • 九、`kafka`简介和使用
    • 9.1`kafka`相关概念
      • 9.1.1 `Kafka`介绍
      • 9.1.2 `Kafka`相关专业术语
    • 9.2 `Zookeeper`使用和配置
      • 9.2.1 简介
      • 9.2.2 `Zookeeper`集群部署:三节点😀
      • 9.2.3 `Zookeeper`角色划分
      • 9.2.4 `zookeeper`配置内存堆栈
    • 9.3 `Kafka`集群安装:三节点😀
    • 9.4 `Kafka Topic`日常操作
      • 9.4.1 查看`Topic`相关信息
      • 9.4.2 创建`Topic`
    • 9.5 `Filebeat`收集日志至`Kafka`
    • 9.6 `Logstash`收集`Kafka Topic`日志
  • 十、`Elasticsearch Restful`风格`API`实战
    • 10.1 `ES`集群状态
    • 10.2 `Elasticsearch`术语

一、日志收集所解决的问题

运维工程师首要职责维护服务的正常运行,什么是正常运行状态?服务的正常运行是万千报错中的一种“特例”,维护这个特例?

  1. 生产环境出现问题后,需要查看各种⽇志进行分析排错;
  2. 日常运维工作,日志文件分散,运维工作繁琐。可以实现日志聚合;
  3. 开发人员没有登陆服务器的权限。开发人员可以通过web界面查看日志;
  4. 面对大量的访问日志,可以统计出各项指标,比如PV UV。

二、Elastic Stack 组件介绍

Elasticsearch 9部署文档:https://www.elastic.co/docs/deploy-manage/deploy/self-managed/install-elasticsearch-from-archive-on-linux-macos

2.1 Elasticsearch

Elasticsearch(简称ES)是一个分布式、**RESTful**风格的开源搜索和数据分析引擎,专为全文搜索、结构化搜索、分析、和数据可视化而设计。支持几乎实时的搜索,并且具有高扩展性,能够处理PB级别的数据。

  • Index(索引): 索引类似于关系型数据库中的表,它存储着相似结构的数据。在Elasticsearch中,每个索引包含一个或多个文档(Document)。
  • Document(文档): 文档是Elasticsearch的基本数据单位,相当于关系型数据库中的一行记录。文档以JSON格式存储,包含字段和值的对。
  • Shards(分片)和Replicas(副本)Elasticsearch将数据分为多个分片存储,每个索引可以包含一个或多个分片。分片可以提高并行查询的性能。副本是分片的备份,保证了数据的高可用性和容错性。
  • Cluster(集群) Elasticsearch集群由一个或多个节点(Node)组成。每个节点负责存储部分数据,并参与集群中的索引和搜索操作。集群有一个主节点,用于管理集群状态。
  • Node(节点): 节点是Elasticsearch的运行实例,每个节点属于某个集群。节点可以充当主节点、数据节点。
  • 使用场景:
    • 日志和事件数据分析Elasticsearch常用于集中化日志管理和分析,帮助企业在海量日志中快速定位问题。
    • 电商网站的搜索功能Elasticsearch被广泛用于电商网站的产品搜索,提供快速、相关性高的搜索结果。
    • 数据存储与检索 Elasticsearch能有效存储和检索结构化或非结构化数据,适用于大规模数据的管理。 JSON key=values

2.2 Logstash

Logstash 是一个开源的数据收集、处理和传输引擎,主要用于实时的数据管道管理。是 Elastic Stack的重要组成部分,负责将各种来源的数据收集起来,进行过滤和格式化处理,并最终将其输出到指定的目标,如 Elasticsearch、文件或其他存储系统。

2.3 Kibana

Kibana 提供强大的数据可视化能力,使用户可以在浏览器中实时查看、搜索和分析存储在 Elasticsearch 中的数据。它提供丰富的可视化选项,如图表、地图和表格,支持构建交互式仪表板。

2.4 Filebeat beats

Filebeat 是一个轻量级的日志收集和传输工具。它通常用于从各种数据源收集日志,并将这些日志传输到LogstashElasticsearch 进行进一步处理和分析。Filebeat 是一个边车代理,安装在需要监控的服务器上,用于高效地读取和转发日志数据。它旨在减少资源消耗,提供可靠的日志传输,并确保在出现故障时不会丢失日志数据。

三、ELK Stack集群安装

3.1 安装JAVA环境(所有ES节点)

#>>> 上传jdk1.8安装包
$ ll jdk-8u381-linux-x64.tar.gz 
-rw-r--r--. 1 root root 139273048 731 10:23 jdk-8u381-linux-x64.tar.gz#>>> 解压jdk到指定目录
$ tar xf jdk-8u381-linux-x64.tar.gz  -C /usr/local/#>>> 创建软链接
$ cd /usr/local/
$ ln -s jdk1.8.0_381/ java#>>> 声明Java环境变量
$ cat >> /etc/profile.d/jdk.sh <<-EOF
#!/bin/bash
export JAVA_HOME=/usr/local/java
export PATH=\$PATH:\$JAVA_HOME/bin
EOF#>>> 重新加载配置文件
$ source /etc/profile.d/jdk.sh #>>> 测试java
$ java -version
java version "1.8.0_381"
Java(TM) SE Runtime Environment (build 1.8.0_381-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.381-b09, mixed mode)

3.2 安装ES集群

3.2.1 ES单节点部署

#>>> 解压es安装包到指定目录
$ tar xf elasticsearch-7.17.11-linux-x86_64.tar.gz  -C /usr/local#>>> 创建es软链接
$ cd /usr/local/
$ ln -s elasticsearch-7.17.11/ es#>>> 声明ES环境变量
$ cat >> /etc/profile.d/es.sh <<-EOF
#! /bin/bash
export ES_HOME=/usr/local/es
export PATH=\$PATH:\$ES_HOME/bin
EOF#>>> 重新加载环境变量
$ source /etc/profile.d/es.sh #>>> 启动es实例
$ elasticsearch

此时启动会有报错信息,记住详细看报错信息哟!报错信息如下:此服务不能以超级管理员的身份运行,需要单独创建es普通用户。

解决方法如下操作:

#>>> 创建用于启动ES的用户
$ useradd es#>>> 查看用户是否创建
$ id es
uid=1000(es) gid=1000(es)=1000(es)#>>> 修改elasticsearch属主和数组
$ chown  -R es.es /usr/local/elasticsearch-7.17.11/
#>>> 查看是否修改成功
$ ll /usr/local/elasticsearch-7.17.11/
#>>> 切换es工作目录
$ cd /usr/local/elasticsearch-7.17.11/config/#>>> 备份es配置文件
$ cp elasticsearch.yml{,.bak}#>>> 修改文件相关参数
$ egrep -v  "^(#|$)" elasticsearch.yml
···
# es集群名称
cluster.name: study-elk-cluster
# 节点名称(一般以当前主机名一致)
node.name: elk01	
# 监听地址,,配置IP则知允许基于IP地址进行访问
network.host: 0.0.0.0	
# 自动发现节点IP
discovery.seed_hosts: ["192.168.100.160"]	
# 初始化master节点
cluster.initial_master_nodes: ["192.168.100.160"] 
# 添加以下参数关闭geoip数据库的更新,减少对带宽和存储的消耗,禁用自动更新。GeoIP根据IP地址确定地理位置的技术,用于日志分析、用户活动追踪等场景。
ingest.geoip.downloader.enabled: false
···#>>> 测试启动ES
$ su -c "elasticsearch" es

虽然es能够正常启动,但是启动日志会出下报错信息,需要我们修改启动的内核参数,第一个为修改系统最大的文件描述符,第二个修改系统的虚拟内存。

#>>> 修改es需要的limits参数(重新连接会话框才能成功加载参数)
$ cat >>/etc/security/limits.d/elk.conf <<EOF
* soft nofile 65535
* hard nofile 131070
EOF# 参数解释:soft nofile 65535表示将文件描述符的软限制设置为65535。hard nofile 131070表示将文件描述符的硬限制设置为131070。#>>> 查看limits参数是否加载
$ ulimit -Sn
65535
$ ulimit -Hn
131070#>>> 修改内核参数
$ cat >> /etc/sysctl.d/elk.conf <<'EOF'
vm.max_map_count = 262144
EOF# 参数解释vm.max_map_count = 262144 用于设置单个进程可以拥有的内存映射的最大数量。#>>> 加载内核参数
$ sysctl -f /etc/sysctl.d/elk.conf
vm.max_map_count = 262144
$ sysctl -q vm.max_map_count
vm.max_map_count = 262144#>>> 测试启动ES
$ su -c "elasticsearch" es
#>>> 或者使用一下的参数实现后台运行
$ su -c "elasticsearch -d" es#>>> 测试节点
$ curl  192.168.100.160:9200
{"name" : "elk01",  # 节点的名称,一般以当前主机名命名。"cluster_name" : "study-elk-cluster",		#  Elasticsearch集群的名称,多个节点可以组成一个集群,共同提供搜索和存储功能。"cluster_uuid" : "EyOTgfNNSHOvUsQNLQ9f7Q", # 集群的唯一标识符(UUID)。每个Elasticsearch集群都有一个唯一的cluster_uuid,用于在集群内识别和管理节点之间的关系。"version" : {"number" : "7.17.11",  # Elasticsearch 实例的版本"build_flavor" : "default",  # 示这个版本没有特殊的自定义修改或变化。"build_type" : "tar",  # Elasticsearch 是通过 tar 包安装的。"build_hash" : "eeedb98c60326ea3d46caef960fb4c77958fb885",  # Git提交哈希。"build_date" : "2023-06-23T05:33:12.261262042Z", # 版本的构建日期和时间。"build_snapshot" : false,  # 此版本适用于生产"lucene_version" : "8.11.1",  # Lucene版本"minimum_wire_compatibility_version" : "6.8.0", # elasticsearch实例能够与运行 6.8.0 及以上版本的其他节点进行通讯。"minimum_index_compatibility_version" : "6.0.0-beta1"  # Elasticsearch兼容的最低索引版本。},"tagline" : "You Know, for Search" #  Elasticsearch的标语或宣传口号。
}

端口解释:

  • 9200 对外暴露端口。使用的HTTP协议, ElasticsearchREST API 服务的默认端口。开发者和应用程序通过HTTP协议与 Elasticsearch 进行交互,发送查询、索引文档、管理集群等操作。
  • 9300 集群内部通讯端口。使用TCP协议。 用于集群内的节点间同步和协调。节点之间通过这个端口交换数据,包括集群管理信息、索引和搜索操作等。

3.2.2 ES JAVA调优:堆(heap)内存大小

#>>> 查看运行的JAVA程序
$ jps  
14917 Jps
14713 Elasticsearch#>>> 查看ES堆内存大小
$ jmap -heap 14713(pid)

#>>> 修改堆内存大小(最大设置为32G,要么内存的一半)
$ vim /usr/local/elasticsearch-7.17.11/config/jvm.options
···
-Xms256m
-Xmx256m
···#>>> 删除数据、日志
$ rm -rf /usr/local/es/{data,logs}/*
#>>> 删除缓存数据
$ rm -rf /tmp/*#>>> 准备ES启动脚本并启动(加入systemd)
$ cat >> /usr/lib/systemd/system/es.service <<EOF
[Unit]
# 描述服务的简短说明,这里描述为 "ELK"。
Description=ELK
# 指定服务的启动顺序。表示该服务在网络目标 (network.target) 之后启动,确保网络服务已启动并可用。
After=network.target[Service]
# 指定服务的启动类型为 forking,表示服务启动后会产生一个子进程,并且主进程会在启动完成后退出。这通常用于后台运行的守护进程。
Type=forking
# 指定启动服务的命令
ExecStart=/usr/local/es/bin/elasticsearch -d
# 指定服务在退出后不重启。可以根据需要将其更改为 always 或 on-failure,以确保服务在失败后自动重启。
Restart=no
# 指定以 es 用户的身份运行服务。
User=es
# 指定服务所属的组为 es。
Group=es
# 设置进程打开文件的最大数量(文件描述符限制)
LimitNOFILE=131070[Install]
# 指定服务的目标,表示该服务在多用户模式下可用。
WantedBy=multi-user.target
EOF#>>> 重新加载systemd配置文件
$ systemctl daemon-reload#>>> 启动es
$ systemctl enable --now es

3.2.3 ES使用自定义JAVA环境配置

由于默认情况启动ES时,使用的ES家目录下的JAVA环境。如果需要自定义JAVA环境启动ES。请配置一下参数:

# Step 1:切换至ES命令存放目录
$ cd /usr/local/es/bin/# Step 2:修改ES默认启动脚本
$ vim elasticsearch# 脚本内添加以下参数:# 设置 Java 路径export JAVA_HOME=/usr/local/javaexport PATH=$JAVA_HOME/bin:$PATH# Java 选项:配置堆栈大小export JVM_OPTIONS="-Xms1g -Xmx4g"
image-20250209153240919
# Step 3:重启ES
$ systemctl restart es# Step 4:查看es进程信息
$ ps -aux | grep java

image-20250209153725259

3.2.3 部署ES高可用集群

#>>> 创建用于启动ES的用户
$ useradd es 
$ id es
uid=1000(elasticsearch) gid=1000(elasticsearch)=1000(elasticsearch)#>>> 创建ES数据目录和日志目录存放目录
$ mkdir -p /opt/{data,logs}
$ install -d /opt/{data,logs}/es -o es -g es#>>> 解压es安装包到指定目录
$ tar xf elasticsearch-7.17.11-linux-x86_64.tar.gz  -C /opt/#>>> 更改目录名
$ cd /opt/ && mv  elasticsearch-7.17.11  es#>>> 创建ES环境变量
$ vim  >> /etc/profile.d/es.sh <<-EOF
#! /bin/bash
export ES_HOME=/opt/es
export PATH=\$PATH:\$ES_HOME/bin
EOF#>>> 重新加载环境变量
$ source /etc/profile.d/es.sh #>>> 修改elasticsearch属主和数组
$ chown -R  es,es  /opt/es#>>> 修改es需要的limits参数(重新连接会话框才能成功加载参数)
$ cat >> /etc/security/limits.d/elk.conf <<-EOF
* soft nofile 65535
* hard nofile 131070
EOF#>>> 查看limits参数是否加载
$ ulimit -Sn
65535
$ ulimit -Hn
131070#>>> 修改内核参数
$ cat > /etc/sysctl.d/elk.conf <<EOF
vm.max_map_count = 262144
EOF#>>> 加载内核参数
$ sysctl -f /etc/sysctl.d/elk.conf
vm.max_map_count = 262144
$ sysctl -q vm.max_map_count
vm.max_map_count = 262144#>>> 修改堆内存大小(最大设置为32G,要么内存的一半)
$ vim /opt/elasticsearch-7.17.11/config/jvm.options
···
-Xms256m
-Xmx256m
···
#>>> elk01修改配置文件
$ egrep -v "^(#|$)" /opt/es/config/elasticsearch.yml 
cluster.name: study-elk-cluster
node.name: elk01
path.data: /opt/data/es	# 指定数据目录
path.logs: /opt/logs/es	# 指定日志目录
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
#>>> elk02修改配置文件
$ egrep -v "^(#|$)" /opt/es/config/elasticsearch.yml 
cluster.name: study-elk-cluster
node.name: elk02
path.data: /opt/data/es
path.logs: /opt/logs/es
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
#>>> elk03修改配置文件
$ egrep -v "^#|^$" /opt/es/config/elasticsearch.yml 
cluster.name: study-elk-cluster
node.name: elk03
path.data: /opt/data/es
path.logs: /opt/logs/es
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
#>>> 所有节点添加elk启动脚本
$ cat > /usr/lib/systemd/system/es.service <<EOF
[Unit]
Description=ELK
After=network.target
[Service]
Type=forking
ExecStart=/opt/es/bin/elasticsearch -d
Restart=no
User=es
Group=es
LimitNOFILE=131070
[Install]
WantedBy=multi-user.target
EOF#>>> 所有节点重新加载并启动
$ systemctl daemon-reload
$ systemctl restart es#>>> 测试集群
[root@elk01 ~]# curl 192.168.100.160:9200/_cat/nodes
192.168.100.160 40 59  8 0.14 0.10 0.06 cdfhilmrstw * elk01
192.168.100.161 54 30 11 0.55 0.29 0.11 cdfhilmrstw - elk02
192.168.100.162 48 28  8 0.29 0.16 0.06 cdfhilmrstw - elk03# 参数解释:第一列:每个节点的IP地址;第二列:每个节点的CPU使用率;第三列:每个节点的内存的使用率;第四列:每个节点的活跃分片数量;第五列:每个节点的1分钟、5分钟、15分钟平均负载;第六列:每个节点在集群中的角色;第七列:*代表主节点;主节点负责管理集群的元数据、分片分配和集群状态等任务。第八列:节点的名称

3.3 Kibana安装(elk03节点)

#>>> 安装kibana
$ yum localinstall -y kibana-7.17.11-x86_64.rpm #>>> 备份配置文件
$ cd /etc/kibana/
$ cp kibana.yml kibana.yml.bak#>>> 修改kibana配置文件
$ egrep -v "^(#|$)" kibana.yml 
# 服务端口
server.port: 5601
# 主机地址或者主机名
server.host: "0.0.0.0"
# 服务名称
server.name: "study-elk-kibana"
# ES主机组地址
elasticsearch.hosts: ["http://192.168.100.160:9200","http://192.168.100.161:9200","http://192.168.100.162:9200"]
# 修改语言
i18n.locale: "zh-CN"#>>> 启动Kibana
$ systemctl enable --now kibana#>>> 游览器IP+5601访问

image-20250208162459343

image-20250208162536494

image-20250208162559343

3.4 Logstash安装(elk02

#>>> 安装Logstash
$ yum localinstall -y logstash-7.17.11-x86_64.rpm #>>> 创建软连接
$ ln -s /usr/share/logstash/bin/logstash   /usr/bin/logstash#>>> 编写测试文件
$ mkdir conf-logstash
$ cat  conf-logstash/01-stdin-to-stdout.conf 
input {stdin {}
}
output{stdout {}
}#>>> 测试Logstash
$ logstash -f ~/conf-logstash/01-stdin-to-stdout.conf 

image.png

3.5 Filebeat安装(elk01

边车代理,需要安装在每台服务器上,并且通过tail -f 读取指定目录的日志文件。“收割机” Beats

​ 帮助文档:https://www.elastic.co/guide/en/beats/filebeat/7.17/index.html

# Step 1:elk01主机安装Filebeat
$ yum localinstall -y filebeat-7.17.11-x86_64.rpm # Step 2:elk01主机测试Filebeat
$ mkdir /root/filebeat-config
$ cp /etc/filebeat/filebeat.yml  /root/filebeat-config/01-test.yml
$ cat /root/filebeat-config/filebeat-config/01-test.yml
filebeat.inputs:
- type: stdinenabled: trueoutput.console:pretty: true    # 启动美观格式输出# Step 2:elk01主机启动Filebeat实例
$ filebeat -e -c /opt/filebeat/filebeat-config/01-test.yml 

3.5.1 Filebeat架构

架构图:
Input(数据源)收集方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/configuration-filebeat-options.html
Output(输出地)推送方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/configuring-output.html

四、Filebeat日志收集

Input方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-input-stdin.html

4.1 Filebeat标准输入和输出

​ 标准输入官方文档:https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-input-stdin.html
​ 标准输出官方文档:https://www.elastic.co/guide/en/beats/filebeat/7.17/console-output.html
简介:使用终端输入从标准输入读取事件。

# Step 1:elk01主机创建Filebeat配置文件测试目录
$ mkdir filebeat-config# Step 2:elk01主机创建配置文件
$ vim filebeat-config/01-stdin-on-stdout.yml 
# 指定输入类型
filebeat.inputs:# 终端标准输入
- type: stdin
# 标准输出类型:终端输出output.console:#如果设置为 true,则写入 stdout 的事件将采用良好的格式。默认值为 false。pretty: true# Step 3:elk01主机测试
$ filebeat  -e -c /root/filebeat-config/01-stdin-on-stdout.yml 

4.2.2 Filebeat 基于Log类型进行输入并且自定义Tags

$ vim  filebeat-config/02-log-on-stdout.yml 
filebeat.inputs:
- type: logpaths:- /var/log/messages# 对当前的输入日志文件,指定独有的标签tags: ["system logs"]fields:log_type: system- type: logpaths:- /var/log/nginx/access.log# 对当前的输入日志文件,指定独有的标签 tags: ["nginx access_log"]fields:log_type: nginx_accessoutput.console:pretty: true#>>> 清除偏移量
$ > /var/lib/filebeat/registry/filebeat/log.json #>>> 启动Filebeat实例
$ filebeat  -e -c /root/filebeat-config/02-log-on-stdout.yml 

对于Tags的设定,将Filebeat收集到的日志转发至ES集群时,可以针对不同的Tag设置不同的索引,方便查阅。

4.2.8 Filebeat输出数据至Elasticsearch集群

​ 官方链接:https://www.elastic.co/guide/en/beats/filebeat/7.17/elasticsearch-output.html#elasticsearch-output
​ 简介:Elasticsearch输出使用Elasticsearch HTTP API将事件直接发送到Elasticearch

#>>> 编写Filebeat配置文件
$ vim filebeat-config/06-log-nginx-access-es.yml 
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["nginx_access"]fields:log_type: nginx_accessfields_under_root: true# 输出至ES集群
output.elasticsearch:# ES集群地址hosts: ["http://192.168.174.140:9200","http://192.168.174.141:9200","http://192.168.174.142:9200"]#>>> 启动Filebeat实例
$ filebeat  -e -c /root/filebeat-config/06-log-nginx-access-es.yml 


Kibana查看ES数据

通过索引模式来匹配索引

4.2.12 FIlebeat 收集JSON格式的nginx访问日志

#>>> 安装nginx
$ vim /etc/yum.repos.d/nginx.repo
[nginx-stable]
name=nginx stable repo
baseurl=http://nginx.org/packages/centos/$releasever/$basearch/
gpgcheck=0
enabled=1
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true$ yum install -y nginx#>>> 启动nginx
$ systemctl enable --now nginx#>>> 修改nginx访问日志格式
$ vim /etc/nginx/nginx.conf 
···log_format  test_nginx_json '{"@timestamp":"$time_iso8601",''"host":"$server_addr",''"clientip":"$remote_addr",''"SendBytes":$body_bytes_sent,''"responsetime":$request_time,''"upstreamtime":"$upstream_response_time",''"upstreamhost":"$upstream_addr",''"http_host":"$host",''"uri":"$uri",''"domain":"$host",''"xff":"$http_x_forwarded_for",''"referer":"$http_referer",''"tcp_xff":"$proxy_protocol_addr",''"http_user_agent":"$http_user_agent",''"status":"$status"}';access_log  /var/log/nginx/access.log  test_nginx_json;
···#>>> 检查语法格式
$ nginx -t#>>> 重启nginx
$ systemctl reload nginx#>>> 测试
$ curl localhost
$ cat /var/log/nginx/access.log 

#>>> 修改Filebeat配置(识别json格式日志,生成相关字段)
$ vim filebeat-config/10-log-nginx-log-shard-json-es.yml 
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/nginx/access.log*tags: ["access"]
# 启动json格式  json.keys_under_root: trueoutput.elasticsearch:hosts: ["http://192.168.174.140:9200","http://192.168.174.141:9200","http://192.168.174.142:9200"]#>>> 清除filebeat的指针偏移,用于从第一行重新读取日志文件
$ rm -rf /var/lib/filebeat/*#>>> 清楚以前的ES nginx索引,防止出现脏数据#>>> 重启filebeat
$ filebeat -e -c ~/filebeat-config/10-log-nginx-log-shard-json-es.yml 

五、Logstash 日志收集

​ 官网:https://www.elastic.co/guide/en/logstash/7.17/
​ 简介:Logstash是一个具有实时流水线功能的开源数据收集引擎。Logstash可以动态地统一来自不同来源的数据,并将数据规范化为您选择的目的地。为各种高级下游分析和可视化用例清理和民主化您的所有数据。虽然Logstash最初推动了日志收集的创新,但其功能远远超出了该用例。任何类型的事件都可以通过广泛的输入、过滤和输出插件进行丰富和转换。

5.1 Logstash安装

#>>> 安装
$ yum install -y logstash-7.17.11-x86_64.rpm #>>> 声明软连接
$ ln -s /usr/share/logstash/bin/logstash   /sbin/#>>> 创建Logstash 测试文件目录
$ mkdir logstash-config

5.3 Logstash基于File的形式进行input

​ 官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-file.html

#>>> 编写配置文件
$ vim 02-file-input.conf 
input {file {# 指定日志文件收集的路径path => ["/tmp/*.txt"]# 从日志文件的第一行进行读取,只会在第一次启动时从头读取start_position => "beginning" # 从日志文件的尾行读取日志,且偏移指针文件对原日志文件偏移量未记录# start_position => "end" }
}output {stdout {}
}#>>> 准备测试日志文件
$ echo 1111 > /tmp/1.txt#>>> 启动Logstash实例
$ logstash -f /root/stduy-logstash-config/02_input_file.conf 
#>>> 查看偏移量
$ cat /usr/share/logstash/data/plugins/inputs/file/.sincedb_820ddbbd098cfece4b56f4fcbf67a9bb
16789540 0 64768 4 1717082577.147625 /tmp/1.txt#>>> 查看日志文件
$ ll -i /tmp/1.txt 
16789540 -rw-r--r-- 1 root root 4 530 23:20 /tmp/1.txt

5.4 Logstash基于Filebeat形式进行input

Filebeat output Logstash官网:https://www.elastic.co/guide/en/beats/filebeat/7.17/logstash-output.html
Logstash input Fliebeat官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-beats.html

#>>> 修改Filebeat配置文件
$ vim /root/filebeat-config/output-logstash.yml filebeat.inputs:
- type: logpaths:- /tmp/test.logtags: ["test"]fields:log_type: testfields_under_root: trueoutput.logstash:# Logstash主机地址hosts: ["192.168.174.142:5044"]#>>> 启动Filebeat实例
$ filebeat -e -c /root/filebeat-conifg/output-logstash.yml 
#>>> 修改Logstash配置文件
$ vim input-beats.conf 
input {beats {# 通讯端口号,默认5044port => 5044}
}output {stdout {}
}#>>> 启动Logstash
$ logstash -rf /root/logstash-config/input-beats.conf

5.5 Logstash基于ES形式进行output

​ 官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-outputs-elasticsearch.html

#>>> 修改Logstash配置文件
$ cat 08_output_es.conf 
input {beats {# 通讯端口号,默认5044port => 5044}
}output {elasticsearch {# ES集群IPhosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"] # 索引模式名称index => "test-log-%{+yyyy.MM.dd}"}
}#>>> 启动Logstash
$ logstash -f /root/stduy-logstash-config/08_output_es.conf 

六、Logstash grok插件使用

​ 官网: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html
​ 简介:解析任意文本并对其进行结构化。Grok 是将非结构化日志数据解析为结构化和可查询内容的好方法。该工具非常适合系统日志,apache和其他Web服务器日志,mysql 日志,以及通常为人类编写的任何日志格式。

6.1 Logstash 内置正则使用

​ 简介:将非结构化的日志格式通过Grok内置的正则转化为结构化。

#>>> 日志格式
192.168.174.1 - - [01/Jun/2024:10:37:16 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0" "-"
192.168.174.1 - - [01/Jun/2024:10:37:16 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0" "-"#>>> 编写Filebeat配置文件
$ vim filebeat-out-logstash.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["access"]
output.logstash:hosts: ["192.168.174.142:5044"]#>>> 清除偏移量
$ rm -rf /var/lib/filebeat/*
#>>> 启动Filebeat实例             
$ filebeat  -e -c ~/filebeat-out-logstash.yml #>>> 编写Logstash配置文件
$ vim filter_grok.yml 
input {beats {port => 5044}
}filter {grok {match => {"message" => "%{HTTPD_COMMONLOG}" }}
}output {stdout {}elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"] index => "test-%{+yyyy.MM.dd}"}
}#>>> 启动Logstash
$ logstash  -rf ~/filter_grok.yml 

6.4 Logstash删除指定字段

​ 简介:如果此筛选器成功,请从此事件中删除任意字段。

​ 未删除字段终端打印如下:

#>>> Filebeat配置文件编写
$ vim filebeat-out-logstash.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["access"]
output.logstash:hosts: ["192.168.174.142:5044"]#>>> 删除偏移量
$ rm -rf /var/lib/filebeat/*#>>> 启动Filebeat实例
$ filebeat  -e -c ~/filebeat-out-logstash.yml #>>> Logstash配置文件编写
$ vim filter_grok.yml 
input {beats {port => 5044}
}filter {grok {match => {"message" => "%{HTTPD_COMMONLOG}"}# You can also remove multiple fields at once: remove_field => [ "type","offset","ecs","input","@version","log","agent","tags" ]}
}output {stdout {}elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "test-%{+yyyy.MM.dd}"}
}#>>> 启动Logstash实例
$ logstash -rf ~/filter_grok.yml 

删除指定字段后终端数据显示如下:

八、Logstash useragent插件分析用户客户端类型

​ 官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-useragent.html

#>>> Filebeat配置文件编写
$ vim filebeat-out-logstash.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["access"]json.keys_under_root: true  output.logstash:hosts: ["192.168.174.142:5044"]#>>> 删除偏移量
$ rm -rf /var/lib/filebeat/*#>>> 启动Filebeat实例
$ filebeat  -e -c ~/filebeat-out-logstash.yml #>>> Logstash配置文件编写
$ vim filter_grok.yml 
input {beats {port => 5044}
}filter {grok { remove_field => [ "type","offset","ecs","input","@version","log","agent","tags" ]}useragent {source => "http_user_agent"target => "test_user_agent"remove_field => [ "agent","@version","tags","ecs","log","offset","type" ]}
}output {stdout {}elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "test-%{+yyyy.MM.dd}"}
}#>>> 启动Logstash实例
$ logstash -rf ~/filter_grok.yml 

九、Logstashif分支

​ 官网:https://www.elastic.co/guide/en/logstash/7.17/event-dependent-configuration.html#metadata
​ 简介: 针对不同的日志类型。filter去做不同的处理

#>>> Filebeat配置文件
$ vim  filebeat-out-logstash.yml 
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logfields:log_type: nginx_accessfields_under_root: truejson.keys_under_root: true   - type: logpaths:- /var/log/nginx/error.logfields:log_type: nginx_errorfields_under_root: true- type: logpaths:- /var/log/messagesfields:log_type: system_logfields_under_root: trueoutput.logstash:hosts: ["192.168.174.142:5044"]#>>> 删除偏移量
$ rm -rf /var/lib/filebeat/*#>>> 启动Filebeat实例
$ filebeat  -e -c ~/filebeat-out-logstash.yml #>>> Logstash 配置文件准备
$ vim filter_grok.yml 
input {beats {port => 5044}
}filter {mutate {remove_field => ["agent","ephemeral_id","ecs","@version","tags","input","log","offest"]  }useragent {source => "http_user_agent" target => "study_user_agent"}
}output {stdout {}if [log_type] == "nginx_access" {  elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"] index => "web-access-%{+yyyy.MM.dd}"} } else if [log_type] == "nginx_error" {elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "web-error-%{+yyyy.MM.dd}"  	 }}  else if [log_type] == "system_log" {elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "system-log-%{+yyyy.MM.dd}"     } }
}#>>> 启动Logstash
$ logstash  -rf ~/filter_grok.yml 

十、Kibana操作使用

10.1 Kibana手动创建索引模板




{
"number_of_replicas":3,
"number_of_shards":3
}

九、kafka简介和使用

9.1kafka相关概念

官网:https://kafka.apache.org/

9.1.1 Kafka介绍

Apache Kafka是一个开源的分布式事件流平台,为数以千计的公司提供高性能数据管道、流分析、 数据集成和任务关键型应用程序。

  • 高吞吐量:使用延迟低至2ms的机器集群以网络有限的吞吐量传递消息。

  • 可伸缩:将生产集群扩展到1000个代理、每天数万亿条消息、PB的数据和数十万个分区。弹性扩展和收缩存储和处理。

  • 永久存储器:将数据流安全地存储在分布式、持久、容错的集群中。

  • 高可用性:在可用性区域上高效地扩展集群,或者跨地理区域连接单独的集群。

9.1.2 Kafka相关专业术语

  • 主题(Topic:主题是Kafka中用于对消息进行分类的逻辑分组,每个主题可以看作是消息的分类器。主题是多订阅者模式,即一个主题可以有多个消费者订阅。

  • 分区(Partition:每个主题被分成一个或多个分区。分区是消息存储的基本单元。分区内的消息是有序的,但不同分区之间无序。每个分区可以分布在不同的Kafka服务器上,从而实现水平扩展。

    • leader partition 负责对kafka集群的读写操作,和客户端进行交互。
    • follower partition 负责去leader partition 同步数据,不可以和客户端进行交互。
  • 偏移量(Offset:偏移量是分区中每条消息的唯一标识符。它是一个递增的整数,记录了消息在分区中的位置。消费者使用偏移量来跟踪读取消息的位置。

  • 生产者(Producer:生产者是负责向Kafka主题发布消息的客户端应用程序。生产者将消息发送到指定的主题和分区。

  • 消费者(Consumer:消费者是负责从Kafka主题读取消息的客户端应用程序。消费者通过订阅一个或多个主题来读取消息,并使用偏移量来跟踪读取进度。

  • 消费者组(Consumer Group:消费者组是一组消费者实例,共同消费一个或多个主题的消息。

  • 代理(Broker:代理是Kafka集群中的一个服务器节点,负责存储和传输消息数据。一个Kafka集群由多个代理组成,每个代理可以处理多个分区。

  • 复制(ReplicationKafka中的复制机制将分区数据复制到多个代理上,以确保数据的高可用性和容错性。每个分区有一个领导副本(Leader)和若干个跟随副本(Follower)。所有的读写操作都由领导副本处理,跟随副本只需同步领导副本的数据。但是LeaderFollwer都属于副本。创建时副本数不能为0

9.2 Zookeeper使用和配置

​ 官网链接:https://dlcdn.apache.org/zookeeper/

​ 下载链接:https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz

9.2.1 简介

ZookeeperApache 旗下的一个开源分布式协调服务,用于管理和协调分布式应用程序中的各种服务和组件,它提供了一系列高效且可靠的分布式数据一致性和协调机制。

9.2.2 Zookeeper集群部署:三节点😀

#>>> 解压安装包到指定目录
$ tar xf apache-zookeeper-3.8.2-bin.tar.gz  -C /opt/ && cd /opt/#>>> 更改目录名称
$ mv apache-zookeeper-3.8.2-bin/   zookeeper#>>> 配置zk环境变量
$ cat >> /etc/profile.d/zookeeper.sh <<-EOF
#!/bin/bash
export ZK_HOME=/opt/zookeeper
export PATH=\$PATH:\$ZK_HOME/bin
EOF#>>> 重新加载环境变量 
$ source /etc/profile.d/zookeeper.sh# 创建zk数据存放目录
$ mkdir -p /opt/data/zk#>>> 修改zk的配置文件(三节点)
$ cp /opt/zookeeper/conf/zoo_sample.cfg  /opt/zookeeper/conf/zoo.cfg$ egrep -v "^(#|$)" /opt/zookeeper/conf/zoo.cfg # Zookeeper的心跳间隔时间。服务器和客户端会通过心跳包维持连接状态。
tickTime=2000# 从节点最多可以等待10 个tickTime(即 10 * 2000 毫秒 = 20 秒)来与领导者同步数据状态。否则它将被认为是不可用的。
initLimit=10# 定义Zookeeper领导者和从节点之间的心跳和同步请求的最大允许延迟时间。领导者与从节点之间的响应时间如果超过了这个限制,从节点将被认为失去同步状态。(即 5 * 2000 毫秒 = 10 秒)。
syncLimit=5# 定义zk数据目录
dataDir=/opt/data/zk# 定义Zookeeper集群的客户端连接端口
clientPort=2181
server.140=192.168.174.140:2888:3888
server.141=192.168.174.141:2888:3888
server.142=192.168.174.142:2888:3888# 参数解释:# dataDir		zk数据目录# clientPort	端口号# server.140	zk节点唯一标识# 192.168.100.160	zk节点主机地址# 2888				集群内部通讯端口# 3888				leader选举端口# 创建ID文件(位置存放在zk的数据存放路径下)101节点
$ echo "140" > /opt/data/zk/myid102节点
$ echo "141" > /opt/data/zk/myid103节点
$ echo "142" > /opt/data/zk/myid#>>> 所有节点启动zk
$ zkServer.sh start#>>> 查看zk状态
$ zkServer.sh status

myid文件:在Zookeeper集群中,myid文件是每个Zookeeper服务器节点的重要配置文件之一,用于唯一标识集群中的每个服务器。

9.2.3 Zookeeper角色划分

Zookeeper集群中,不同的服务器节点可以承担不同的角色,以确保集群的高可用性、数据一致性和故障恢复能力。主要角色包括:server.140=192.168.174.140:2888:3888:角色

  1. 领导者(Leader

    • 处理所有写操作(如创建、更新、删除节点)并确保数据的一致性。

    • 管理并协调集群中其他节点的活动。

    • 负责为客户端请求生成唯一的事务IDzxid)。

    • 在集群启动或领导者失效时,会进行领导者选举,选出新的领导者。

    • 定期发送心跳给跟随者,确保自己处于活动状态。

  2. 跟随者(Follower

    • 处理所有读取操作(如读取节点数据)。
    • 接收并转发客户端的写请求给领导者进行处理。
    • 将领导者的事务日志同步到本地,确保数据一致性。
    • 参与领导者选举。
    • 与领导者保持同步,接收并应用领导者的事务。
  3. 观察者(Observer

    • 处理读取操作,减轻领导者和跟随者的负担。
    • 不参与领导者选举和事务投票,只同步领导者的事务日志。
    • 提高集群的读取扩展能力,适合需要高读取吞吐量的场景。
  4. 客户端(Client

    • 连接到集群中的任一节点进行读取或写入操作。
    • 自动处理节点故障并重新连接到其他可用节点。
    • 通过客户端APIZookeeper集群进行交互。

9.2.4 zookeeper配置内存堆栈

#>>> 查看zk进程
$ jps
1367 Elasticsearch
5400 QuorumPeerMain
5593 Jps#>>> 查看zk的堆栈大小
$ jmap -heap 5400

zookeeper默认堆内存大小为1GB,一般设置为2GB或者4GB

#>>> 调节zookeeper推内存大小为256MB
$ vim /opt/zookeeper/conf/java.env
#! /bin/bash # 指定JDK安装路径 
export JAVA_HOME=/usr/local/java # 指定zookeeper的堆内存大小 
export JVMFLAGS="-Xms256m -Xmx256m $JVMFLAGS" #>>> 同步文件
$ scp /opt/zookeeper/conf/java.env  elk02:/opt/zookeeper/conf/
$ scp /opt/zookeeper/conf/java.env  elk03:/opt/zookeeper/conf/#>>> 所有节点重启zk
$ zkServer.sh restart#>>> 验证堆内存
$ jmap -heap `jps | grep QuorumPeerMain | awk '{print $1}'`

9.3 Kafka集群安装:三节点😀

​ 下载链接:https://kafka.apache.org/downloads
​ 官网:https://kafka.apache.org/

#>>> 解压安装包到指定目录
$ tar xf kafka_2.13-3.2.1.tgz  -C /opt/ &&  cd /opt/#>>> 修改目录名称
$ mv kafka_2.13-3.2.1/  kafka#>>> 配置kafka环境变量
$ cat >> /etc/profile.d/kafka.sh <<-EOF
#!/bin/bash
export KAFKA_HOME=/opt/kafka
export PATH=\$PATH:\$KAFKA_HOME/bin
EOF#>>> 重新加载环境变量
$ source /etc/profile.d/kafka.sh #>>> 创建数据目录
$ mkdir /opt/data/kafka -p#>>> 修改kafka配置文件# 1.kafka101节点配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties
broker.id=140
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka# 2.kafka102节点配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties  
broker.id=141
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka	# /kafka为zk中的znode# 3.kafka103节点配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties  
broker.id=142
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka#>>> 所有节点启动kafka
$ kafka-server-start.sh -daemon /opt/kafka/config/server.properties #>>> zk节点查看kafka注册信息
$ zkCli.sh ls /kafka/brokers/ids | grep  "^\["
[140, 141, 142                                    #>>> kafka停止脚本
$ kafka-server-stop.sh

9.4 Kafka Topic日常操作

9.4.1 查看Topic相关信息

#>>> 查看集群中所有Topic
$ kafka-topics.sh \--bootstrap-server  192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--list#>>> 查看所有Topic详细信息
$ kafka-topics.sh   \--bootstrap-server  192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--describe#>>> 查看某个Topic信息
$ kafka-topics.sh \--bootstrap-server  192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--describe  --topic  Topice名称

9.4.2 创建Topic

#>>> 创建test-elk Topic
$ kafka-topics.sh  \--bootstrap-server  192.168.174.140:9092 \--create --topic test-elk#>>> 创建Topic,并指定副本数量
$ kafka-topics.sh   \--bootstrap-server  192.168.174.140:9092 \--create --topic test-elk-02  \--partitions 10 \--replication-factor 1#>>> 查看Topic的详细信息
$ kafka-topics.sh  \--bootstrap-server  192.168.174.140:9092  \--describe --topic test-elk-02

#>>> 创建Topic,并指定副本数量
$ kafka-topics.sh \--bootstrap-server  192.168.174.140:909 \--create --topic test-elk-03 \--partitions 10 \--replication-factor 2 $ kafka-topics.sh   \--bootstrap-server  192.168.174.140:9092  \--describe --topic test-elk-03

注意:副本数量不能大于Broker的数量

9.5 Filebeat收集日志至Kafka

官网:https://www.elastic.co/guide/en/beats/filebeat/7.17/kafka-output.html

$ cat filbeat-out-kafka.yml 
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logjson.keys_under_root: true   #- type: log
#  paths:
#    - /var/log/nginx/error.logoutput.kafka:# initial brokers for reading cluster metadatahosts: ["192.168.174.140:9092", "192.168.174.141:9092", "192.168.174.142:9092"]# message topic selection + partitioningtopic: "test-kafka-topic"$ filebeat  -e -c ~/filbeat-out-kafka.yml #>>>  kafka测试拉取数据
$ kafka-console-consumer.sh   \--topic test-kafka-topic \--bootstrap-server 192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--from-beginning

9.6 Logstash收集Kafka Topic日志

​ 官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-kafka.html

input {kafka {bootstrap_servers => "192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092"  # Kafka集群IPtopics => ["test-kafka-topic"]      # 指定Topic进行消费数据group_id => "test-kafka"     		# 指定消费者组codec => json {                     # 指定消费的数据是JSON格式。charset => "UTF-8"}}
}filter {mutate {remove_field => ["agent","ephemeral_id","ecs","@version","tags","input","log","offest"]  }if [log_type] == "nginx_access" {geoip {source => "clientip"}useragent {source => "http_user_agent" target => "study_user_agent"}
} 
}output {stdout {}#  if [log_type] == "nginx_access" {  
#    elasticsearch {
#      hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"] 
#      index => "web-access-%{+yyyy.MM.dd}"
#    } 
#  } else if [log_type] == "nginx_error" {
#        elasticsearch {
#          hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
#          index => "web-error-%{+yyyy.MM.dd}"   
#    }
#  }  else if [log_type] == "system_log" {
#       elasticsearch {
#         hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
#         index => "system-log-%{+yyyy.MM.dd}"     
#    } 
#   }
}#>>> 启动Logstash实例
$ logstash  -rf ~/filter_grok.yml 

十、Elasticsearch Restful风格API实战

10.1 ES集群状态

  1. 绿色(Green

    • 含义:集群健康状态正常,所有的主分片和副本分片都已分配。
    • 解释:绿色状态表示集群中的所有数据都可以访问,所有分片(包括主分片和副本分片)都分配到集群中的节点上。
    • 示例:假设集群有3个主分片,每个主分片有1个副本分片,那么绿色状态下这6个分片都已成功分配且正常运行。
  2. 黄色(Yellow

    • 含义:集群健康状态部分正常,所有的主分片都已分配,但有一个或多个副本分片未分配。
    • 解释:黄色状态表示集群中的所有主分片都可以访问,但一些副本分片由于某种原因(例如节点故障或资源不足)未能分配。这意味着数据是安全的,但没有高可用性,因为如果某个节点失败,它可能会导致数据无法访问。
    • 示例:假设集群有3个主分片和每个主分片1个副本分片,如果有3个主分片和2个副本分片已分配,但1个副本分片未能分配,则集群为黄色状态。
  3. 红色(Red

    • 含义:集群健康状态不正常,有一个或多个主分片未分配。

    • 解释:红色状态表示集群中有一些数据不可访问,因为主分片未能分配。此时,可能存在数据丢失的风险,需要立即采取措施来修复问题。

    • 示例:假设集群有3个主分片和每个主分片1个副本分片,如果有2个主分片和所有副本分片未能分配,则集群为红色状态。

10.2 Elasticsearch术语

Document文档,用户存储在ES的数据,ES最小单元,文档不可被拆分。文档使用JSON的对象存储类型。filed相当于数据表的字段,对文档数据根据不同属性进行分类标识。index索引,一个索引就是拥有相似特征文档的集合。shard分片,存储数据的地方,每个底层对应的使一个Lucene库,一个索引至少有一个或多个分片。replica副本,一个分片可以有0个或者多个副本。作用是对数据进行备份,一旦副本数量不为0,就会引入主分片(primary shard)和副本分片(replica shard)的概念。主分片(primary shard)实现数据的读写操作。副本分片(replica shard)可以实现数据的读操作,需要主分片同步数据,当主分片挂掉时,副本分片会变为主分片。Allocation分配。将分片分配给某个节点的过程,包括主分片和副本分片。如果副本分片,还包含从主分片复制数据的过程,此过程由Master节点调度完成。
  1. 上家公司每天所产生的日志量(5GB左右)
  2. ELastic stack技术栈有哪些组件 (beats Logstash elasticsearch Kibana)
  3. elasticsearch 使用场景?(日志收集场景:elk技术栈。搜索引擎(IK分词器))
  4. elasticsearch 数据迁移?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/88223.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/88223.shtml
英文地址,请注明出处:http://en.pswp.cn/web/88223.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大腾智能国产 3D CAD:设计自由度拉满,数据安全锁死

在智能制造与数字化转型的浪潮中&#xff0c;大腾智能CAD作为一款自主研发的三维计算机辅助设计软件&#xff0c;凭借其从概念设计到制造落地的全流程覆盖能力&#xff0c;正成为国产工业设计软件领域的新锐力量。软件深度融合先进建模技术与工程实践需求&#xff0c;为机械制造…

ubuntu 操作记录

1&#xff1a;安装minicom 1: sudo apt-get install minicom minicom -s 2&#xff1a;Ctrl Z C 的区别 ctrlz的是将任务中断,但是此任务并没有结束,他仍然在进程中他只是维持挂起的状态,用户可以使用fg/bg操作继续前台或后台的任务,fg命令重新启动前台被中断的任务,bg命令…

深度剖析:向70岁老系统植入通信芯片——MCP注入构建未来级分布式通信

> 如何让老旧系统重获新生?协议注入技术是关键。 ## 一、当遗留系统遇上分布式未来:一场艰难的对话 想象一下:你负责维护一套诞生于20年前的单体式银行核心系统,它像一位固执的70岁老人,使用着陈旧的TCP自定义协议。这时业务部门要求实现与云原生风险分析引擎的实时…

针对 SSD 固态硬盘的安全擦除 Secure Erase

SSD 的安全擦除&#xff08;Secure Erase&#xff09;用于永久删除存储介质上的数据&#xff0c;以及在驱动器性能开始明显下降至低于标称值时恢复其速度。Secure Erase 可以解决的问题核心当 SSD 开始运行缓慢&#xff08;读写数据变差&#xff09;时&#xff0c;这里有许多可…

Three.js搭建小米SU7三维汽车实战(3)轨道控制器

往期内容&#xff1a; Three.js搭建小米SU7三维汽车实战&#xff08;1&#xff09;搭建开发环境 Three.js搭建小米SU7三维汽车实战&#xff08;2&#xff09;场景搭建 轨道控制器 轨道控制器可以改变相机在空间坐标系中的位置 进而方便从不同的角度观察物体 1. 轨道控制器响…

C++树状数组详解

C树状数组深度解析 第1章 引言&#xff1a;为什么需要树状数组 1.1 动态序列处理的挑战 在现代计算机科学中&#xff0c;我们经常需要处理动态变化的序列数据&#xff0c;这类数据具有以下特点&#xff1a; 实时更新&#xff1a;数据点会随时间不断变化频繁查询&#xff1a;需要…

TeamT5-ThreatSonar 解决方案:构建智能动态的 APT 与勒索软件防御体系

一、核心功能深度解析&#xff1a;从威胁狩猎到自动化响应的闭环能力 &#xff08;一&#xff09;威胁狩猎&#xff1a;主动挖掘潜伏性攻击的 “数字侦探” 多层级威胁识别引擎&#xff1a; 静态特征匹配&#xff1a;内置超 1000 种 APT 后门签名&#xff08;如 Regin、Duqu 等…

C#基础篇(10)集合类之列表

C# 中的列表(List)详解列表(List)概述在C#中&#xff0c;List<T>是System.Collections.Generic命名空间中的一个泛型集合类&#xff0c;它提供了动态大小的数组功能&#xff0c;可以存储指定类型的元素。列表的创建与初始化// 创建一个空列表 List<int> numbers n…

SpringBoot订单模块核心接口设计与实现

目录 一、 管理端接口实现 (后台管理系统) 一、订单搜索 (高权重 - 核心管理功能) 1.Controller (OrderController): 2.Service (OrderService): 3.ServiceImpl (OrderServiceImpl): 1.使用MyBatis分页插件PageHelper 2.基础数据查询 4.Mapper (OrderMapper): 5.Mapper …

EXCEL链接模板无法自动链接到PowerBI?试试这个方法

在使用EXCEL链接模板连接PowerBI时&#xff0c;你有没有遇到如图所示的提示呢&#xff1a;下面我来分享一下&#xff0c;出现弹框的原因及解决方法&#xff1a;首先我们先看一下这个英文翻译&#xff0c;意思就是说&#xff0c;我们只能使一个PowerBI文件处于打开的状态&#x…

最新全开源礼品代发系统源码/电商快递代发/一件代发系统

简介&#xff1a;最新全开源礼品代发系统源码/电商快递代发/一件代发系统测试环境&#xff1a;Nginx PHP7.2 MySQL5.6图片&#xff1a;

Android 事件分发机制深度解析

一、事件分发机制核心概念1. 事件分发三要素要素作用关键方法事件(Event)用户触摸动作的封装MotionEvent分发者负责将事件传递给下级dispatchTouchEvent()拦截者决定是否截断事件传递&#xff08;仅ViewGroup&#xff09;onInterceptTouchEvent()消费者最终处理事件的组件onTou…

从威胁检测需求看两类安全监测平台差异

在网络安全领域&#xff0c;针对不同场景的威胁检测需求&#xff0c;衍生处了多种技术架构的安全监测平台。尽管它们的目标均为“识别异常行为、阻断潜在威胁”&#xff0c;但根据其核心引擎的配置的技术侧重点&#xff0c;可大致分为两类&#xff1a;聚焦基础入侵检测的平台与…

useContext:React 跨组件数据共享的优雅解决方案

关键点 useContext&#xff1a;React 提供的 Hook&#xff0c;用于在组件树中共享全局状态&#xff0c;简化跨组件数据传递。应用场景&#xff1a;主题切换、用户认证、语言设置和全局配置管理。实现方式&#xff1a;结合 createContext 和 useContext&#xff0c;实现灵活的状…

20250706-8-Docker快速入门(下)-Dockerfile介绍与基本使用_笔记

一、Dockerfile构建镜像1. Dockerfile概述&#xfeff;定义&#xff1a;Dockerfile是一个用于自动构建镜像的文本文件&#xff0c;由一条条指令组成工作原理&#xff1a;指令逐步执行&#xff0c;每个指令完成不同功能典型指令示例&#xff1a;FROM centos:latest&#xff1a;基…

Git系列--3.分支管理

目录 一、理解分支 1.1图示 1.2 打印仓库下有哪些分支 1.3创建分支 1.4HEAD与切换分支 1.5合并分支 1.6流程图理解 二、删除分支 ​ 三、合并分支冲突 3.1.问题导入 3.2.解决 3.3合并图示 四、合并模式 4.1合并​编辑 4.2变基 五、bug分支 5.1背景建立 5.2解决步骤 5.2.1…

Vue.js TDD开发深度指南:工具链配置与精细化测试策略

“TDD不是测试优先的开发&#xff0c;而是设计优先的开发。” —— Robert C. Martin 引言 在Vue.js项目中实施测试驱动开发&#xff08;TDD&#xff09;是构建健壮应用的关键路径。但许多开发者在实践中常遇到&#xff1a; 工具链配置复杂导致放弃不同类型组件测试策略混淆测…

基于物联网的智能家居控制系统设计与实现

标题:基于物联网的智能家居控制系统设计与实现内容:1.摘要 随着物联网技术的飞速发展&#xff0c;智能家居逐渐成为人们关注的焦点。本文旨在设计并实现一个基于物联网的智能家居控制系统&#xff0c;以提高家居的智能化水平和用户的生活便利性。通过采用先进的传感器技术、通信…

Vue 中使用 Cesium 实现可拖拽点标记及坐标实时显示功能

在 Cesium 地图开发中&#xff0c;实现点标记的拖拽交互并实时显示坐标信息是一个常见的需求。本文将详细介绍如何在 Vue 框架中使用 Cesium 的 Primitive 方式创建点标记&#xff0c;并实现拖拽功能及坐标提示框跟随效果。先看效果图功能实现概述我们将实现的功能包括&#xf…

HTML 插件:构建网页的强大工具

HTML 插件:构建网页的强大工具 引言 HTML 插件是网页设计中不可或缺的一部分,它们为网页增添了丰富的交互性和动态效果。本文将深入探讨 HTML 插件的概念、类型、应用及其在网页开发中的重要性。 什么是 HTML 插件? HTML 插件,也称为 HTML 组件或 HTML 控件,是指嵌入到…