1. 预处理的核心概念:什么是 Ingest Pipeline?

想象一下数据进入 Elasticsearch 的旅程。原始数据(Raw Data)往往并不完美:格式可能混乱,字段可能缺失,或者需要被丰富和转换后才能发挥最大的价值。预处理就是在数据被索引(Indexed)到最终的数据存储位置之前,对其进行清洗、转换、丰富的一个中间加工环节。

这个加工环节在 Elasticsearch 中被称为 Ingest Pipeline(摄取管道)。管道由一系列称为 Processor(处理器) 的步骤组成,每个处理器执行一个特定的操作。数据像水一样流经这个管道,被一个个处理器依次处理,最终变成我们想要的样子存入 Elasticsearch。

架构位置:
在传统的 ETL(Extract-Transform-Load)流程中,Transform 通常由外部工具(如 Logstash)完成。而 Ingest Pipeline 将 T 的环节下沉并内嵌到了 Elasticsearch 内部,由 Ingest Node 节点负责执行。

这样做的主要优势:

  1. 简化架构:减少了对 Logstash 等外部处理组件的强依赖,降低了系统复杂度和维护成本。
  2. 高性能:处理过程在 ES 集群内部完成,避免了不必要的网络传输开销。
  3. 灵活性:可以动态创建、修改和复用管道,适应多变的数据处理需求。
  4. 原子性:预处理和索引操作是一个原子过程,保证一致性。

2. 核心组件:Processor(处理器)详解

处理器是管道的肌肉和骨骼。Elasticsearch 提供了丰富的内置处理器,以下是一些最常用和强大的:

  • grok文本解析之王。使用基于正则表达式的模式将非结构化的文本解析成结构化的字段。常用于解析日志文件(如 Nginx、Apache 日志)。
  • date:解析日期字段,并将其转换为标准的 ISO8601 时间戳,这对于基于时间序列的查询和可视化至关重要。
  • dissect:另一种文本解析工具,使用分隔符模式,比 grok 性能更高,但灵活性稍差。
  • remove / rename:删除不需要的字段或为字段重命名,保持数据整洁。
  • set / append:设置字段的值,或向数组字段追加值。
  • convert:改变字段的数据类型,如将字符串 "123" 转换为整数 123
  • enrich数据丰富神器。允许你根据当前文档的内容,去另一个索引中查询匹配的数据,并将其内容合并到当前文档中(例如,根据 IP 字段查询 GeoIP 数据库添加地理位置信息)。
  • script万能处理器。当内置处理器无法满足复杂需求时,可以使用 Painless 脚本编写自定义逻辑,功能极其强大。
  • fail:在满足特定条件时让处理过程失败,便于调试和错误处理。
  • foreach:对数组类型的字段中的每个元素执行相同的处理器操作。

3. 实战实例:解析 Nginx 访问日志

让我们通过一个完整的、真实的例子来将上述概念串联起来。

场景:我们需要将如下格式的 Nginx 访问日志导入 Elasticsearch,并进行搜索和可视化。
raw_log 字段原始数据:

192.168.1.100 - - [30/Apr/2024:10:30:01 +0800] "GET /api/v1/products?page=2 HTTP/1.1" 200 1532 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"

目标:从中提取出客户端IP、时间戳、HTTP方法、请求路径、HTTP状态码、响应体大小等结构化字段。

步骤一:设计并创建 Ingest Pipeline

我们创建一个名为 nginx_log_processing 的管道。

PUT _ingest/pipeline/nginx_log_processing
{"description": "Parse and transform Nginx access logs","processors": [// 1. 使用 Grok 进行核心解析{"grok": {"field": "message", // 假设原始日志在 'message' 字段中"patterns": ["%{IP:client.ip} - - \\[%{HTTPDATE:timestamp}\\] \"%{WORD:http.method} %{URIPATHPARAM:http.request.path}(?:\\?%{URIPARAM:http.request.params})? HTTP/%{NUMBER:http.version}\" %{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.bytes:long}( \"%{DATA:http.referer}\")?( \"%{DATA:user.agent}\")?"],"ignore_missing": true,"on_failure": [{"set": {"field": "error","value": "{{ _ingest.on_failure_message }}"}}]}},// 2. 转换时间戳{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"timezone": "Asia/Shanghai","target_field": "@timestamp" // 转换后放入标准的时间戳字段}},// 3. 移除临时字段{"remove": {"field": ["timestamp", "message"],"ignore_missing": true}},// 4. (可选) 根据 IP 丰富地理信息 - 这里需要先有配置好的enrich policy// {//   "enrich": {//     "policy_name": "ip_geo_policy",//     "field": "client.ip",//     "target_field": "client.geo",//     "ignore_missing": true//   }// }]
}

架构师解读

  • grok 处理器是这里的核心。我们使用预定义的模式(如 %{IP:client.ip})将文本匹配并提取到命名字段中。patterns 数组允许定义多个模式以备选。on_failure 子句是一个很好的错误处理实践,它会在解析失败时将错误信息记录到一个新字段,而不是让整个文档索引失败。
  • date 处理器将解析后的、人类可读的 timestamp 转换为 Elasticsearch 内部优化的 @timestamp 字段,这是管理时序数据的最佳实践。
  • remove 处理器用于清理中间产物,保持文档干净,节省存储空间。
  • enrich 处理器被注释掉了,但它展示了如何实现更高级的数据丰富。你需要先创建一个 Enrich Policy,指向一个包含 IP 和地理位置映射的索引,才能启用它。
步骤二:使用 Pipeline 索引数据

现在,当我们索引文档时,只需在请求中指定 pipeline 参数即可。

PUT my-nginx-logs-2024.04.30/_doc/1?pipeline=nginx_log_processing
{"message": "192.168.1.100 - - [30/Apr/2024:10:30:01 +0800] \"GET /api/v1/products?page=2 HTTP/1.1\" 200 1532 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64)\""
}

Elasticsearch 在索引这个文档前,会先将其通过 nginx_log_processing 管道进行处理。

步骤三:查看处理结果

索引成功后,查询这条数据,你会看到最终存储的文档是结构化的:

{"client": {"ip": "192.168.1.100"},"@timestamp": "2024-04-30T02:30:01.000Z","http": {"method": "GET","request": {"path": "/api/v1/products"},"response": {"status_code": 200,"body_bytes": 1532},"version": "1.1"},"user_agent": {"original": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
}

原始杂乱的日志消息变成了一个完美的、嵌套结构的 JSON 文档,非常适合进行聚合、筛选和可视化分析。


4. 架构建议与最佳实践

  1. 规划与测试:在投入生产前,使用 Simulate Pipeline API 对样例数据进行测试和调试。这是避免线上问题的最重要工具。

    POST _ingest/pipeline/_simulate
    {"pipeline": { ... }, // 你的pipeline定义"docs": [ ... ]      // 你的样例文档
    }
    
  2. 性能考量

    • Ingest Node 角色:在生产集群中,最好部署专用的 Ingest Node,将其与 Master/Data Node 角色分离,避免资源竞争。
    • 处理器顺序:将最可能过滤掉数据的处理器(如drop)或计算量小的处理器放在前面,减少后续不必要的处理开销。
    • grok 性能grok 是 CPU 密集型操作,模式复杂度过高或数据量巨大时可能成为瓶颈。考虑使用 dissect 或预处理在数据源端完成。
  3. 错误处理:始终在管道中定义 on_failure 策略。可以将处理失败的文档路由到另一个索引(使用 set 处理器修改 _index),以便后续检查和重新处理,而不是直接丢弃。

  4. 复用与维护:将通用的处理逻辑(如基础的时间戳处理、通用字段清理)抽象成独立的管道,然后使用 pipeline 处理器在管道中调用其他管道,实现模块化和复用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/919969.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/919969.shtml
英文地址,请注明出处:http://en.pswp.cn/news/919969.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

我从零开始学习C语言(15)- 基本类型 PART2

开始学习第七章其余部分。7.3.4 转义序列正如在前面示例中见到的那样,字符常量通常是用单引号括起来的单个字符。然而,一些特殊符号(比如换行符)是无法采用上述方式书写的,因为它们不可见(非打印字符&#…

K8S的部署与常用管理

一、k8s的部署 1.1.集群环境初始化 1.1.1.所有主机禁用swap [rootk8s- ~]# systemctl mask dev-nvme0n1p3.swap [rootk8s- ~]# swapoff -a [rootk8s- ~]# systemctl status dev-nvme0n1p3.swap [rootk8s- ~]# vim /etc/fstab 内容: 注释swap 1.1.2.安装k8s部署工…

2025年机械工程与自动化技术国际会议(ICMEAT 2025)

2025年机械工程与自动化技术国际会议(ICMEAT 2025) 2025 International Conference on Mechanical Engineering and Automation Technology一、大会信息会议简称:ICMEAT 2025 大会地点:中国杭州 审稿通知:投稿后2-3日内…

高数 不定积分(4-3):分部积分法

文章目录写在前面分部积分法😕 一个小问题✨ 分部积分法是怎么来的?🌰 几个小例子⭐ 最终总结!后话写在前面 文章传送门:高数 不定积分(4-2):换元积分法 今天再更一篇:) 上篇文章&…

Chrome/360 浏览器 WebUI 资源底层机制解析:共享资源与专属资源的奥秘

在 Chromium 和 360 浏览器源码中,我们会发现 WebUI 页面不仅有 C 逻辑处理(如 WebUIMessageHandler),还伴随着大量 HTML、CSS 和 JS 文件。尤其是 src/ui/webui/resources 和 src/chrome/browser/360/webui 这两个目录&#xff0…

基于springboot的高校后勤保修服务系统/基于android的高校后勤保修服务系统app

基于springboot的高校后勤保修服务系统/基于android的高校后勤保修服务系统app

Qt QML 用Q_PROPERTY快捷访问c++属性

在之前我写过如何调用函数,当时的属性都是手搓的,也就是自己写成员变量、变化信号和读写函数,但其实有一个很便捷的方法,即使用Q_PROPERTY,下面给出标准结构:Q_PROPERTY(数据类型 变量名 READ 变量名 WRITE…

ubuntu中网卡的 IP 及网关配置设置为永久生效

要将 Ubuntu 中 ens33 和 ens36 网卡的 IP 及网关配置设置为永久生效(重启后不丢失),需通过 netplan 配置并禁用 cloud-init 对网络的干扰(避免重启后配置被覆盖),具体步骤如下:一、最终的永久生…

不再让Windows更新!Edge游戏助手卸载及关闭自动更新

文章目录Windows系统更新问题方法一:通过注册表手动设置1. 打开注册表编辑器2. 定位到目标路径3. 创建新的DWORD值4. 修改数值方法二:命令行设置1. 打开命令提示符2. 输入命令验证设置是否生效恢复更新Edge关闭游戏助手Edge关闭后台运行Edge关闭自动更新…

css3之flex布局

flex布局要牢记的两个知识点: 开启了flex布局的元素叫flex containerflex container里面的直接子元素叫flex items 这两点要记牢,设置属性的时候才不会搞混这个是flex布局的整体图 一、flex container上的属性 1.flex-direction 修改主轴方向的属性&…

vscode 搭建C/C++开发环境搭建(linux)

1.编译器/调试器安装首先,需要安装编译器(GCC/G)和调试器(GDB),用于编译和调试代码。1.打开终端(Ctrl Alt T)2.更新软件包获取新版本信息sudo apt update3.安装build-essential包,它包含gcc,g等必要库sudo apt install…

vue-pure-admin页面引入和功能添加流程解析

vue-pure-admin (opens new window)是一款开源完全免费且开箱即用的中后台管理系统模版。完全采用 ECMAScript 模块(ESM)规范来编写和组织代码,使用了最新的 Vue3、Vite、Element-Plus、TypeScript、Pinia、Tailwindcss 等主流技术开发 以下是…

vlc-android: 编译自己的libvlc

概述 VLC 媒体播放器作为一款由志愿者开发团队精心维护的自由、开源且跨平台的多媒体播放器,能轻松驾驭绝大多数多媒体文件,无论是本地磁盘中的视频、音频,还是来自网络的流媒体协议. VLC for Android 支持网络串流,无论是基于 H…

并联谐振与串联谐振

在LC电路中,感抗和容抗相等时对应的频率值称为谐振频率,在接收广播电视信号或无线通信信号时,使接收电路的频率与所选择的发射的信号频率相同就叫做调谐。并联谐振LC并联谐振电路是指将电感器和电容器并联形成,如图所示。在并联谐…

打印机怎么连接电脑?打印机驱动?【图文详解】USB连接打印机?wifi连接打印机?

一、问题背景 在日常办公与生活里,把电脑和打印机连接起来,是实现文档、照片等打印的基础操作。但很多人初次尝试时,会因不熟悉流程而感到无从下手。 无论是办公场景下急需打印重要文件,还是日常生活中想要打印照片留念&#xff0…

CVPR 2025 | 医学影像加速进化:深度学习×多模态,精准诊断再升级

关注gongzhonghao【CVPR顶会精选】今天聊一个医学图像领域的前沿探索:结合空间感知卷积、扩散模型与视觉语言模型,从图像配准到合成分割,再到跨模态理解,打造了一个更加智能、鲁棒且可泛化的医学影像工具链。无论是SACB-Net带来的…

[每周一更]-(第157期):深入理解Go语言的垃圾回收机制:调优与监控

Go语言以其简洁的语法和强大的并发能力而闻名,而它的垃圾回收(GC)机制则是支撑其高性能的关键组件之一。本文将深入探讨Go语言的垃圾回收原理,并介绍如何对其进行调优与监控,以提升应用程序的性能。 Go语言垃圾回收机制…

Java 学习笔记(基础篇9)

1. 综合练习题目 1 :金额转换为中文大写格式请编写一个 Java 程序,实现将数字金额转换为中文大写格式(带单位)的功能,具体要求如下:(1) 程序接收用户输入的一个整数金额(范围:0-9999…

云原生俱乐部-k8s知识点归纳(5)

写到这里,k8s的内容已经到一半了,虽然后面的内容我觉得更加玄学一点。控制器真的是个神奇的东西,虽然后面的CRD会带着大家一起做一个控制器,但是还是觉得很奇妙。控制器大概就是k8s中的精华了,通过控制器去监听k8s中ap…

C++复习2

C继承 继承的概念 继承(inheritance)机制是面向对象程序设计使代码可以复用的重要的手段,它允许程序员在保持原有类特性的基础上进行扩展,增加功能,这样产生新的类,称为派生类。 继承呈现了面向对象程序设计…