在 Logstash 中配置从 Elasticsearch (ES) 读取数据并输出到 Kafka 是一个相对高级但强大的用法,通常用于数据迁移、重新索引、或构建新的数据管道。

下面我将详细解释配置文件的各个部分和细节。

核心配置文件结构 (es-to-kafka.conf)

一个完整的配置文件主要包含三个部分:input, filter (可选), 和 output

input {elasticsearch {# 输入配置:告诉Logstash如何从ES读取数据}
}filter {# 过滤配置(可选):对从ES读取的数据进行加工、清洗、转换
}output {kafka {# 输出配置:告诉Logstash如何将数据写入Kafka}
}

1. Input (Elasticsearch) 插件配置详解

用以定义数据来源

input {elasticsearch {# 【必需】ES集群的地址列表hosts => ["http://localhost:9200", "http://node2:9200"] # 【必需】要查询的索引。支持通配符(如`my-index-*`)和逗号分隔。index => "source-index-*" # 【强烈建议】查询语句。默认是 `{ "query": { "match_all": {} } }`,即查询所有。# 你可以根据需要添加时间范围过滤等,避免全量同步。query => '{"query": {"range": {"@timestamp": {"gte": "now-1h/d",  # 例如:只拉取过去1小时的数据"lte": "now/d"}}}}'# 【必需】分页大小。控制一次从ES拉取多少条数据。根据文档大小和JVM堆内存调整。size => 1000 # 【必需】滚动API的保持时间。每次滚动查询的上下文保持时间,应大于处理一批数据所需的时间。scroll => "5m" # 【可选】认证信息。如果ES有安全认证user => "your_elasticsearch_user"password => "your_password"# 【可选】SSL/TLS配置(如果ES开启了HTTPS)ssl => truecacert => "/path/to/your/ca.crt" # 或使用 `ssl_certificate_verification => false` (不推荐,仅测试用)# 【可选】调度计划。默认只运行一次。# 如果希望持续从ES拉取新数据,可以使用cron表达式,但这通常不是好主意,容易重复消费。# schedule => "* * * * *" # 每分钟运行一次(谨慎使用!)# 【可选】用于排序的字段。建议使用唯一且递增的字段,如`@timestamp`或自增ID,与`docinfo`配合实现断点续传。sort => "@timestamp:asc" # 【高级可选】启用文档元数据获取。可以将ES文档的_id, _index等信息也添加到Logstash event中。docinfo => true docinfo_target => "[@metadata][elasticsearch]" docinfo_fields => ["_index", "_type", "_id"] # 【高级可选】设置请求重试次数retry_max_attempts => 3}
}

2. Filter 插件配置(可选)

从 ES 获取的数据已经是 JSON 格式,通常不需要复杂解析,但常用来进行一些调整。

filter {# 1. 移除不必要的字段。例如,从docinfo中获取的元数据可能不需要发送到Kafka。mutate {remove_field => ["@version", "@timestamp", "[@metadata][elasticsearch]"]}# 2. 添加Kafka消息所需的Key或Header(在output中可以使用)# 例如,使用文档的_id作为Kafka消息的Key,保证同一文档始终进入同一分区。mutate {add_field => {"[@metadata][kafka_key]" => "%{[@metadata][elasticsearch][_id]}"}}mutate {rename => {"旧的字段名" => "新的字段名"# 可以同时重命名多个字段"另一个旧字段" => "另一个新字段"}}# 3. 转换数据格式或内容# json {#   source => "message" # 如果ES里的某个字段是JSON字符串,可以在这里解析它# }# date {#   match => ["timestamp", "UNIX_MS"] # 格式化时间字段#   target => "timestamp"# }
}

重要提示@metadata 字段中的内容不会在输出中显示,非常适合用来做流程中的临时变量(比如上面的 kafka_key)。

3. Output (Kafka) 插件配置详解

用以定义数据的目的地。

output {kafka {# 【必需】Kafka集群的broker列表bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"# 【必需】目标Topic的名称topic_id => "target-topic-name"# 【可选】消息的Key。常用于分区选择。这里使用filter阶段设置的metadata。# 如果没有key,Kafka会使用轮询策略分配分区。codec => "json" # 非常重要!指定消息的序列化格式为JSON。# 【可选】消息格式序列化器。`json` codec已经帮我们处理了,所以不需要单独设置。# value_serializer => "org.apache.kafka.common.serialization.StringSerializer"# 【可选】压缩算法,可以有效减少网络传输量和存储空间。compression_type => "snappy" # 可选 "gzip", "lz4", "snappy"# 【可选】生产者ACK机制,关系到数据可靠性。acks => "1" # "0"(不等待), "1"(等待Leader确认), "all"(等待所有ISR确认)# 【可选】批量发送设置,提高吞吐量。batch_size => 16384linger_ms => 1000 # 发送前等待更多消息加入batch的时间(毫秒)# 【可选】SSL/SASL认证(如果Kafka集群需要)ssl_truststore_location => "/path/to/kafka.client.truststore.jks"ssl_truststore_password => "password"sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='your_username' password='your_password';"sasl_mechanism => "PLAIN"security_protocol => "SASL_SSL"# 【可选】遇到错误(如Topic不存在)时重试次数retries => 3}# 强烈建议添加一个备用输出(如stdout),用于调试和查看错误信息。stdout {codec => rubydebug}
}

完整配置示例

假设我们将 app-logs-* 索引中过去 15 分钟的数据,迁移到名为 logstash-migration-topic 的 Kafka Topic 中,并使用文档 ID 作为 Kafka Message Key。

input {elasticsearch {hosts => ["http://es-node1:9200"]index => "app-logs-*"query => '{"query": {"range": {"@timestamp": {"gte": "now-15m","lte": "now"}}}}'size => 500scroll => "5m"docinfo => truedocinfo_target => "[@metadata][es_doc]"schedule => "*/5 * * * *" # 每5分钟运行一次(谨慎!可能导致数据重复)}
}filter {# 使用文档的_id作为Kafka消息的Keymutate {add_field => {"[@metadata][kafka_key]" => "%{[@metadata][es_doc][_id]}"}}# 移除一些不必要的系统字段mutate {remove_field => ["@version", "@timestamp", "[@metadata][es_doc]"]}
}output {kafka {bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"topic_id => "logstash-migration-topic"codec => "json"compression_type => "lz4"acks => "all" # 追求高可靠性}stdout {}
}

运行命令

将上述配置保存为 es-to-kafka.conf 文件,然后使用以下命令运行 Logstash:

bin/logstash -f /path/to/your/es-to-kafka.conf --config.test_and_exit # 测试配置文件语法
bin/logstash -f /path/to/your/es-to-kafka.conf # 启动运行

重要注意事项

  1. 性能与资源:这种操作对 ES 和 Logstash 都是 资源密集型 的。务必调整 size 参数,监控 JVM 内存和 CPU 使用率。
  2. 重复数据:默认情况下,每次运行 input 都会重新查询。使用 schedule 会导致数据重复。要实现增量迁移,必须在 query 中使用严格的时间范围或自增 ID,并记录上次获取的位置。
  3. 数据类型:ES 输入插件会将整个文档作为一个 Logstash event,message 字段就是原始的 JSON 文档。使用 json codec 输出可以保持其结构。
  4. 错误处理:网络中断、Kafka Topic 不存在等都可能导致任务失败。建议在测试环境充分测试,并配置好 retriesretry_max_attempts
  5. 版本兼容性:确保你的 Logstash 版本与 ES 和 Kafka 集群版本兼容。插件可能因版本不同而参数略有差异。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/95303.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/95303.shtml
英文地址,请注明出处:http://en.pswp.cn/web/95303.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在OracleLinux9.4系统上静默滚动打补丁安装Oracle19c

OracleLinux9.4系统 安装Oracle19c 文章目录OracleLinux9.4系统 安装Oracle19c一、安装准备1、yum安装预检查需要的包2、系统资源二、滚动安装一、安装准备 1、yum安装预检查需要的包 yum install libnsl yum install -y oracle-database-preinstall-19c # 最新的unzip yum i…

Android原生HttpURLConnection上传图片方案

创建上传方法object FormUploader {private val BOUNDARY "Boundary-" System.currentTimeMillis()private const val LINE_FEED "\r\n"Throws(IOException::class)fun uploadImage(url: String, imageFile: File, params: MutableMap<String?, Str…

落叶清扫机器人cad+三维图+设计说明书

摘 要 城市公共场所、校园等环境中&#xff0c;落叶的清扫一直是一个繁琐而耗时的任务。传统的人工清扫方式不仅效率低下&#xff0c;还存在人力浪费和安全隐患等问题。因此&#xff0c;研发一款能够自主完成落叶清扫任务的机器人成为了当今研究的热点之一。随着科技的不断进…

国别域名的SEO优势:是否更利于在当地搜索引擎排名?

当你盯着搜索引擎结果页发呆时&#xff0c;有没有想过——凭什么那个.jp域名的网站能排在.ca前面&#xff1f;别扯什么内容质量&#xff0c;上周帮客户优化新加坡市场时&#xff0c;亲眼见着两个内容相似度90%的页面&#xff0c;.sg域名比.com.au在Google Singapore上高出3个排…

动态配置最佳实践:Spring Boot 十种落地方式与回滚审计指南(含实操与避坑)

作为一名Spring Boot开发者&#xff0c;正在运维一个高可用微服务系统&#xff1a;业务需求变化频繁&#xff0c;需要实时调整配置如数据库连接或日志级别&#xff0c;但每次修改都得重启应用&#xff0c;造成服务中断和用户投诉。这不是小麻烦&#xff0c;而是配置管理的痛点—…

vue社区网格化管理系统(代码+数据库+LW)

摘要 随着城市化进程的加快&#xff0c;社区管理的复杂性逐渐增大&#xff0c;传统的管理模式已无法满足现代社区管理的需求。社区网格化管理系统作为一种新的管理模式&#xff0c;通过将社区划分为多个网格单元&#xff0c;使得管理更加精细化、智能化和高效化。本论文基于Sp…

使用EasyExcel实现Excel单元格保护:自由锁定表头和数据行

使用EasyExcel实现Excel单元格保护&#xff1a;锁定表头和第二行数据 前言 在日常开发中&#xff0c;我们经常需要导出Excel文件&#xff0c;有时还需要对Excel中的某些单元格进行保护&#xff0c;防止用户误修改。本文将介绍如何使用EasyExcel 4.0.3实现锁定Excel表头和第二行…

dify docker知识库topk最大值参数配置

1 问题说明 dify构建RAG知识库过程中&#xff0c;通过会遇到一些默认配置不能解决的问题。 比如topk&#xff0c;topk默认最大10&#xff0c;对语义模糊的检索&#xff0c;目标文档可能没进前10&#xff0c;出现在10-30区间。 所以&#xff0c;需要调整topk最大值参数。 # T…

SRE命令行兵器谱之一:精通top/htop - 从性能“体检”到瓶颈“解剖”

SRE命令行兵器谱之一:精通top/htop - 从性能“体检”到瓶颈“解剖” SRE的“战场”:真实故障场景 下午三点,监控系统告警:“核心API服务响应时间(P99)飙升至5秒”。用户已经开始在群里抱怨接口超时。这是一个典型的线上性能问题,每一秒的延迟都在影响用户体验和公司收…

一、Git与Gitee常见问题解答

Git与Gitee常见问题解答 Git相关问题 Q1: 什么是Git&#xff1f; A: Git是一个分布式版本控制系统&#xff0c;由Linux之父Linus Torvalds开发。它能够跟踪文件的变更历史&#xff0c;支持多人协作开发&#xff0c;是现代软件开发中不可或缺的工具。 Q2: Git的三个区域是什么&a…

kubernetes服务质量之QoS类

一、QoS类 Kubernetes的QoS&#xff08;Quality of Service&#xff09;类别允许您指定可用于应用程序的可用资源数量&#xff0c;以便更好地控制应用程序的可用性。它还允许您限制特定应用程序的资源使用率&#xff0c;以帮助保护系统的稳定性和性能。 Kubernetes 创建 Pod 时…

Redis--Lua脚本以及在SpringBoot中的使用

前言、为什么要用 Lua&#xff1f;多步操作合并为一步&#xff0c;保证原子性。减少网络通信次数。下推逻辑到 Redis&#xff0c;提高性能。一、Redis 使用 Lua 脚本的两种方式方式一&#xff1a;使用 --eval 执行脚本文件这种方式 需要先写一个 Lua 文件。&#x1f4cc; 示例&…

基于 C 语言的网络单词查询系统设计与实现(客户端 + 服务器端)

一、项目概述本文将介绍一个基于 C 语言开发的网络单词查询系统&#xff0c;该系统包含客户端和服务器端两部分&#xff0c;支持用户注册、登录、单词查询及历史记录查询等功能。系统采用 TCP socket 实现网络通信&#xff0c;使用 SQLite 数据库存储用户信息、单词数据及查询记…

《JAVA EE企业级应用开发》第一课笔记

《JAVA EE企业级应用开发》第一课笔记 文章目录《JAVA EE企业级应用开发》第一课笔记课程主题&#xff1a;三层架构与SSM框架概述一、核心架构&#xff1a;三层架构 (MVC)1. 表现层 (Presentation Layer)2. 业务逻辑层 (Business Logic Layer)3. 数据持久层 (Data Persistence …

RT-DETR网络结构

1.前言 本章主要来介绍下RT-DETR的网络结构,参考的依旧是ultralytics实现的RT-DETR-L,代码如下: ultralytics/ultralytics: Ultralytics YOLO 🚀 首先谈谈我对RT-DETR的浅显认识,他不像是YOLOv8这种纯CNN实现的网络,也不像是Vit这种以Transformer实现的网络,他是前一…

Python 文件复制实战指南:从基础操作到高效自动化的最佳实践

Python 文件复制实战指南:从基础操作到高效自动化的最佳实践 1. 引言:文件复制为何是自动化的核心能力? 在日常开发与运维工作中,文件复制是一项基础却至关重要的操作。无论是备份日志、同步配置、部署代码,还是批量迁移数据,都离不开对文件的精准复制与路径管理。而 Py…

WebSocket的基本使用方法

一. 与HTTP对比WebSocket 是一种在单个 TCP 连接上实现全双工&#xff08;双向&#xff09;通信的网络协议&#xff0c;它解决了传统 HTTP 协议 “请求 - 响应” 模式的局限性&#xff0c;让客户端&#xff08;如浏览器&#xff09;和服务器能建立持久连接&#xff0c;实现实时…

架构选型:为何用对象存储替代HDFS构建现代数据湖

在过去十余年的大数据浪潮中&#xff0c;Hadoop及其核心组件HDFS&#xff08;Hadoop分布式文件系统&#xff09;无疑是整个技术生态的基石。它开创性地解决了海量数据的分布式存储难题&#xff0c;支撑了无数企业从数据中挖掘价值。然而&#xff0c;随着数据规模的指数级增长以…

智能养花谁更优?WebIDE PLOY技术与装置的结合及实践价值 —— 精准养护的赋能路径

一、WebIDEPLOY 技术支撑下的智能养花系统核心构成在 WebIDEPLOY 技术的框架下&#xff0c;智能养花装置形成了一套精准协同的闭环系统&#xff0c;其核心在于通过技术整合实现 “监测 - 决策 - 执行 - 远程交互” 的无缝衔接&#xff0c;让植物养护更贴合城市居民的生活节奏。…

基于llama.cpp在CPU环境部署Qwen3

大家好,我是奇文王语,NLP爱好者,长期分享大模型实战技巧,欢迎关注交流。 最近两天在研究如何使用小规模参数的模型在CPU环境上进行落地应用,比如模型Qwen3-0.6B。开始使用Transformers库能够正常把模型服务进行部署起来,但是通过测试速度比较慢,用户的体验会比较差。 …