在这里插入图片描述

第七章:Kafka消息系统(实时流处理)

欢迎回到数据探索之旅!

  • 在前六章中,我们构建了强大的**批量处理流水线**。

  • 通过Airflow DAG(批量任务编排)协调Spark作业(数据处理),数据从MySQL数据库(源系统)经数据层(青铜、白银、黄金)存入MinIO存储(数据湖),并通过数据质量检查确保数据可靠性。这种批量处理非常适合生成日报和分析。

  • 但若需要即时响应数据变化呢?例如客户完成订单后立即推荐相关商品,此时无法等待每日批量作业。

这就需要**实时数据流处理——这正是Kafka消息系统**的核心价值

实时流处理解决的问题

批量处理存在固有延迟(数小时至数天),无法满足以下场景:

  • 欺诈检测(即时拦截可疑交易)
  • 个性化推荐(购物后实时推荐)
  • 实时仪表盘(销售数据动态更新
  • 事件驱动告警

Kafka:高速数据传送带

Apache Kafka是构建实时数据管道的分布式流平台

其核心设计犹如高速传送带,具备:

  • 高吞吐(每秒百万级消息处理)
  • 低延迟(毫秒级响应)
  • 持久化存储(消息可回溯)
  • 水平扩展能力

核心概念解析

  • 消息(事件):数据变更的最小单元,例如MySQL表的新增记录
  • 主题(Topic):数据分类通道(如mysql.coffee_shop.order_details存储订单明细变更)
  • 生产者(Producer):数据写入端(如监控MySQL的Debezium连接器)
  • 消费者(Consumer):数据读取端(如推荐服务)
  • 代理(Broker):Kafka服务节点,组成高可用集群

实时推荐系统工作流

订单数据实时处理流程:

在这里插入图片描述

整个过程可在500ms内完成,实现秒级响应

数据摄取:Kafka Connect与Debezium

**变更数据捕获(CDC)**是实现实时数据摄取的关键技术

# docker-compose.yaml配置片段
connect:image: confluentinc/cp-kafka-connectcommand:- bash- -c- |# 安装Debezium MySQL连接器confluent-hub install debezium/debezium-connector-mysql/etc/confluent/docker/runenvironment:CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"  # Kafka集群地址

Debezium通过解析MySQL二进制日志(binlog),将数据变更转化为标准事件格式。示例事件消息:

{"payload": {"after": {"order_id": "ORD_20230619_001","product_id": "COFFEE_BEAN_ESPRESSO","quantity": 2},"op": "c",  // 操作类型:c=新增,u=更新,d=删除"ts_ms": 1687189200000  // 事件时间戳}
}

数据消费:Python消费者实现

实时推荐服务的消费者核心代码:

# kafka_client.py消费者工作线程
def consumer_worker(worker_id: int):# 初始化Kafka连接handler = KafkaHandler(["kafka-1:9092", "kafka-2:9092"])consumer = handler.get_consumer(topic="mysql.coffee_shop.order_details",group_id="realtime-recs")producer = handler.get_producer()while True:# 批量拉取消息(每秒轮询)messages = consumer.poll(timeout_ms=1000)for msg in messages.items():for record in msg.value:# 处理事件(调用推荐逻辑)process_recommendation(record, producer)def process_recommendation(record, producer):# 从Redis获取用户画像(详见第八章)user_profile = redis.get(f"user:{record['user_id']}")if user_profile["tier"] == "DIAMOND":# 生成推荐并发送至下游主题suggestion = {"order_id": record["order_id"],"suggested_product": "COFFEE_GRINDER"}producer.send("order_suggestions", suggestion)

该实现包含以下关键技术点

  1. 消费者组(group_id)实现负载均衡
  2. 自动提交偏移量(enable_auto_commit=True)
  3. 批量消息处理提升吞吐量

基础设施部署

Docker Compose定义的核心服务:

services:kafka-1:image: bitnami/kafka:3.5.1ports:- 29092:29092  # 外部访问端口environment:KAFKA_CFG_NODE_ID: 1  # 节点标识KAFKA_CFG_LISTENERS: PLAINTEXT://:9092kafka-ui:image: provectuslabs/kafka-uiports:- 8000:8080  # 监控界面端口environment:KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS: kafka-1:9092

关键组件说明:

  • 双节点Kafka集群(kafka-1/kafka-2)保障高可用
  • Kafka UI提供可视化监控(http://localhost:8000)
  • 初始化服务(init-kafka)自动创建主题和分区

价值总结

Kafka实时流处理系统与批量处理管道形成互补:

批量处理实时流处理
延迟小时级毫秒级
吞吐极高
用例历史分析即时响应
存储数据湖持久化短期事件保留

这种混合架构同时满足企业对历史数据分析和实时决策的需求$CITE_14 $CITE_17。

下一章:Redis缓存/存储


第八章:Redis缓存/存储

详细专栏:Redis文档学习

在第七章:Kafka消息系统(实时流处理)中,我们了解到Kafka如何实现实时数据流动以支持商品推荐服务。

实时推荐需要极速访问用户等级、支付方式和商品信息,这正是Redis缓存/存储的核心价值

Redis核心特性

Redis是开源的内存数据结构存储系统,具备以下关键能力:

  • 亚毫秒级响应:数据存储在内存而非磁盘,访问速度比传统数据库快100倍
  • 丰富数据结构:支持字符串、哈希、集合、有序集合等数据结构
  • 数据持久化:支持RDB快照AOF日志两种持久化方式
  • 高可用架构:支持主从复制集群部署

项目中的Redis应用

在我们的咖啡销售数据管道中,Redis承担两大核心角色:

1. 查找数据缓存(静态数据加速)

通过lookup_data_cache.py脚本定时从MySQL加载三类核心数据到Redis

# 来源: scripts/database/lookup_data_cache.py
# 钻石客户ID存储为集合
r.sadd("diamond_customers", customer_id)# 支付方式ID存储为集合
r.sadd("bank_acb_payment", payment_method_id)# 商品详情存储为哈希
r.hset(f"product:{product_id}", mapping={"name": "浓缩咖啡", "unit_price": 25})

实时服务通过redis_static连接访问这些数据

# 检查钻石客户(时间复杂度O(1))
is_diamond = redis_static.sismember("diamond_customers", "CUST_202306001")# 获取商品详情(哈希全量读取)
product_info = redis_static.hgetall("product:COFFEE_BEAN_001")

2. 订单状态管理(动态数据暂存)

使用redis_dynamic连接处理实时订单流

# 订单计数器递增(原子操作保证线程安全)
current_count = redis_dynamic.incr(f"message_count:ORDER_20230619_001")# 存储已购商品集合(自动去重)
redis_dynamic.sadd(f"ordered_products:ORDER_20230619_001", "COFFEE_GRINDER")# 设置订单状态(带90秒过期时间)
redis_dynamic.setex(f"order_status:ORDER_20230619_001", 90, "completed")

Redis架构优势

维度传统数据库Redis
响应时间10-100ms0.1-1ms
QPS~1k~1M
数据结构固定表结构多种灵活结构
持久化强持久化可配置持久化

Docker部署配置

Redis服务在docker-compose.yaml中的定义

services:redis:image: redis:7.0.12ports:- "6379:6379"volumes:- ./redis_data:/data  # 数据持久化目录command: ["redis-server", "--save 60 1000", "--appendonly yes"]

关键配置说明:

  • --save 60 100060秒内有1000次写入则触发RDB快照
  • --appendonly yes:启用AOF日志记录所有写操作

数据流可视化

Redis在实时推荐中的交互流程:

在这里插入图片描述

总结

Redis通过内存存储和高效数据结构,在实时推荐中实现:

  1. 查询加速:将钻石客户检查从10ms级优化至0.1ms级
  2. 状态同步:可靠跟踪分布式环境下的订单处理进度
  3. 资源解耦:降低MySQL负载峰值压力达80%

这种缓存+暂存的双重模式,使实时推荐服务能在500ms内完成从事件接收到推荐生成的完整流程

下一章:Docker Compose环境


第九章:Docker Compose环境

详细专栏:Docker 云原生

欢迎回到咖啡销售数据管道核心概念系列的最终章!我们已经深入探讨了各个组件

  • 数据来源(第一章:MySQL数据库(源系统))
  • 数据处理引擎(第二章:Spark作业(数据处理))
  • 分层存储(第三章:MinIO存储(数据湖)和第四章:数据层(青铜、白银、黄金))
  • 工作流调度(第五章:Airflow DAG(批量编排))
  • 数据质量保障(第六章:数据质量检查)
  • 实时事件处理(第七章:Kafka消息系统(实时流))
  • 以及高速缓存(第八章:Redis缓存/存储)

想象组装复杂机械或指挥大型乐团——每个零件或乐手都有特定角色。

如何确保它们协同运作

这正是Docker Compose要解决的核心挑战:将Spark、Airflow、Kafka、数据库等异构服务整合为有机整体。

Docker Compose核心价值

Docker Compose是通过YAML文件定义和管理多容器应用的工具,具备三大核心能力

在这里插入图片描述

蓝图文件解析

项目包含两个核心配置文件:

  • docker-compose.yaml:定义实时服务组件
  • docker-compose-batch.yaml:定义批处理服务组件

服务定义范式

# 摘自 docker-compose-batch.yaml
services:minio:image: minio/minio:latestcontainer_name: minioports:- "9000:9000"  # S3 API端口- "9001:9001"  # 控制台端口environment:MINIO_ROOT_USER: minioadminMINIO_ROOT_PASSWORD: minioadminvolumes:- ./volumes/minio:/data  # 数据持久化路径networks:- myNetwork

关键配置说明:

  • image:指定Docker镜像版本,确保环境一致性
  • ports:端口映射遵循主机端口:容器端口格式
  • volumes:数据卷实现主机与容器的路径映射
  • networks:自定义网络实现服务发现

网络拓扑架构

networks:myNetwork:  # 自定义覆盖网络driver: bridgeattachable: true

网络特性:

  • 服务间通过服务名互访(如spark-master:7077)
  • 隔离外部网络干扰,提升安全性
  • 支持跨compose文件网络共享

数据卷设计

volumes:- ./airflow/dags:/opt/airflow/dags  # DAG文件同步- ./volumes/postgres:/var/lib/postgresql/data  # 元数据持久化

数据管理策略:

  • 批处理数据:MinIO卷映射实现数据湖持久化
  • 元数据存储:PostgreSQL卷保障任务状态不丢失
  • 日志文件:主机目录映射方便问题排查

💡完整服务矩阵

服务类型包含组件通信协议
批处理服务Spark Master/Worker, AirflowHTTP/8080, JDBC/7077
实时服务Kafka集群, Redis, Kafka ConnectTCP/9092, TCP/6379
存储服务MinIO, PostgreSQLS3/9000, JDBC/5432
监控服务Prometheus, Grafana, Kafka UIHTTP/3000, Web/8000

环境管理指令集

全栈启动命令

docker compose -f docker-compose.yaml -f docker-compose-batch.yaml up -d

参数解析:

  • -f:指定多个compose文件实现模块化配置
  • -d后台守护模式运行
  • 启动顺序通过depends_on字段控制

运维监控命令

# 查看容器运行状态
docker compose -f docker-compose.yaml ps# 查看实时日志
docker compose logs -f spark-master# 弹性扩容Spark Worker
docker compose up -d --scale spark-worker=3

系统协同原理

在这里插入图片描述

协同要点:

  1. 批量处理流:Airflow通过SparkSubmitOperator提交作业到Spark集群,实现ETL流水线
  2. 实时处理流Kafka Connect监控MySQL binlog生成CDC事件,触发实时推荐计算
  3. 监控告警流:各组件暴露Metrics端点,Prometheus采集后通过Grafana展示

核心优势

环境一致性保障

  • 开发、测试、生产环境使用相同镜像版本(如bitnami/kafka:3.5.1)
  • 避免"在我机器上能跑"的问题,实现跨平台兼容

资源隔离控制

# 限制容器资源配额
deploy:resources:limits:cpus: '0.50'memory: 1024Mreservations:cpus: '0.25'memory: 512M

资源管理策略:

  • Spark Worker按计算需求分配CPU/MEM
  • Kafka Broker根据吞吐量配置资源上限
  • 关键服务(如PostgreSQL)预留基础资源

快速扩缩容能力

# 扩展Kafka Broker节点
docker compose up -d --scale kafka=3# 缩减Spark Worker节点
docker compose up -d --scale spark-worker=2

动态调整策略:

  • 批量任务高峰期扩展Spark计算节点
  • 大促期间增加Kafka分区和消费者组实例

总结

Docker Compose通过声明式配置将复杂的多服务系统抽象为可版本控制的蓝图文件,实现:

  1. 一键部署:15+组件通过单个命令启动
  2. 服务发现自定义网络实现容器间域名解析
  3. 数据治理卷映射策略保障数据生命周期
  4. 资源管控:CPU/内存配额限制防止资源争抢

该方案使我们的数据管道具备企业级可维护性,支持从开发环境到生产环境的平滑过渡。

通过组合批量处理与实时处理组件,构建出完整的Lambda架构(详见 架构专栏)实现。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/85382.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/85382.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/85382.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jquery 赋值时不触发change事件解决——仙盟创梦IDE

一、传统方法jquey change $(#village_id).trigger(change);$("#village_id").val(99);$("#village_id").change(); 不生效 二、传统方法jquey $(#village_id).trigger(change); 四、传统方法jquey <input type"text" /> <button…

Android | 签名安全

检验和签名 校验开发者在数据传送时采用的一种校正数据的一种方式&#xff0c; 常见的校验有:签名校验(最常见)、dexcrc校验、apk完整性校验、路径文件校验等。 通过对 Apk 进行签名&#xff0c;开发者可以证明对 Apk 的所有权和控制权&#xff0c;可用于安装和更新其应用。…

Android14 耳机按键拍照

在相机拍照预览界面 通过耳机按键实现拍照功能 耳机按键定义 frameworks/base/core/java/android/view/KeyEvent.java public static final int KEYCODE_HEADSETHOOK 79;相机界面 拍照逻辑 DreamCamera2\src\com\android\camera\PhotoModule.java Override public bool…

【AI作画】第2章comfy ui的一般输入节点,文本框的类型和输入形式

目录 CLIP文本编码器 条件输出和文本输出 转换某一变量为输入 展示作品集 在默认的工作流之外&#xff0c;我们如何自己添加节点呢&#xff1f; 一般我们用到的sampler采样器在“鼠标右键——添加节点——采样——K采样器” 我们用的clip文本编码器在“鼠标右键——添加节…

vue3仿高德地图官网路况预测时间选择器

<template><div class"time-axis-container"><div class"time-axis" ref"axisRef"><!-- 刻度线 - 共25个刻度(0-24) --><divv-for"hour in 25":key"hour - 1"class"tick-mark":class&…

ZArchiver:高效解压缩,轻松管理文件

在数字时代&#xff0c;文件的压缩与解压已成为我们日常操作中不可或缺的一部分。无论是接收朋友分享的大文件&#xff0c;还是下载网络资源&#xff0c;压缩包的处理都极为常见。ZArchiver正是一款为安卓用户精心打造的解压缩软件&#xff0c;它以强大的功能、简洁的界面和高效…

1432.改变一个整数能得到的最大差值

贪心思想&#xff0c;为了得到最大差&#xff0c;想办法变成一个最大的数和一个最小的数。 这里有规则&#xff0c;从最高位开始&#xff0c; 变成最大&#xff0c;如果<9&#xff0c;则将该数位代表的数都变成9&#xff0c;如果该数位已经是9了&#xff0c;则将下一个数位…

前端跨域解决方案(4):postMessage

1 postMessage 核心 postMessage 是现代浏览器提供的跨域通信标准 API&#xff0c;允许不同源的窗口&#xff08;如主页面与 iframe、弹出窗口、Web Worker&#xff09;安全交换数据。相比其他跨域方案&#xff0c;它的核心优势在于&#xff1a; 双向通信能力&#xff1a;支持…

大语言模型指令集全解析

在大语言模型的训练与优化流程中&#xff0c;指令集扮演着关键角色&#xff0c;它直接影响模型对任务的理解与执行能力。以下对常见指令集展开详细介绍&#xff0c;涵盖构建方式、规模及适用场景&#xff0c;助力开发者精准选用 为降低指令数据构建成本&#xff0c;学术界和工…

OpenCV CUDA模块设备层-----用于封装CUDA纹理对象+ROI偏移量的一个轻量级指针类TextureOffPtr()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 TextureOffPtr<T, R> 是 OpenCV 的 CUDA 模块&#xff08;opencv_cudev&#xff09;中用于封装 CUDA 纹理对象 ROI 偏移量 的一个轻量级指…

Python 数据分析10

2.3.3其他 除了前面所介绍的常用语数据挖掘建模的库之外&#xff0c;还有许多库也运用于数据挖掘建模&#xff0c;如jieba、SciPy、OpenCV、Pillow等。 1.jieba jieba是一个被广泛使用的Python第三方中文分词库。jieba使用简单&#xff0c;并且支持Python、R、C等多种编程语言的…

css 制作一个可以旋转的水泵效果

如图&#xff0c;项目里面有一个小图片可以旋转&#xff0c;达到看起来像是一个在工作的水泵。我使用css旋转动画实现。 一、HTML结构部分 <div className"ceshixuanzhuan"><img src{lunkuo} className"lunkuo"/><img src{yepian} classN…

数据结构期末程序题型

一、 队列 1、简单模拟队列排列 #include<bits/stdc.h> using namespace std; int main(){int n;cin>>n;queue<int>q;string str;while(true){cin>>str;if(str"#")break;if(str"In"){int t;cin>>t;if(q.size()<n){q.pu…

SpringCloud+Vue汽车、单车充电桩源码实现:从架构设计到核心模块解析

智慧充电管理平台技术实现&#xff1a;从架构设计到核心模块解析 智慧充电管理平台作为新能源汽车生态的核心基础设施&#xff0c;需要实现充电设备管理、订单处理、数据统计分析等复杂功能。本文将从技术架构、核心模块设计、关键技术实现三个维度&#xff0c;深度解析平台的…

Kafka入门及实战应用指南

1、Kafka概述 Apache Kafka是由LinkedIn公司于2010年开发的一款分布式消息系统&#xff0c;旨在解决当时传统消息队列&#xff08;如ActiveMQ、RabbitMQ&#xff09;在高吞吐量和实时性场景下的性能瓶颈。随着LinkedIn内部对实时日志处理、用户行为追踪等需求的激增&#xff0…

智能指针 c++

C 智能指针详解 智能指针是 C11 引入的内存管理工具&#xff0c;位于 <memory> 头文件中&#xff0c;用于自动管理动态分配的内存&#xff0c;防止内存泄漏。主要类型如下&#xff1a; 1. std::unique_ptr (独占所有权) 特点&#xff1a;唯一拥有所指对象&#xff0c;不…

Python应用八股文

大家好!在 Python 学习的道路上&#xff0c;掌握一些基础知识要点至关重要&#xff0c;这些要点常被称为“Python 八股”。以下是对它们的简易总结&#xff0c;帮助你快速回顾和巩固 Python 的核心概念。 一、数据结构 列表&#xff08;List&#xff09;&#xff1a;有序可变序…

【技术深度】领码SPARK破解微服务数据依赖困局:架构设计与实践指南

——深度解析分布式数据冗余与异步消息机制&#xff0c;驱动企业数字化转型加速 ✨ 核心摘要 本文从技术架构与工程实现的角度&#xff0c;系统讲解领码SPARK融合平台如何精准解决微服务架构下数据依赖“卡脖子”问题。通过设计高效的数据冗余模型和完善的异步消息更新机制&am…

关于前端的防抖和节流

给我解释下 前端开发中的防抖和节流 并举个具体的例子 防抖&#xff08;Debounce&#xff09;与节流&#xff08;Throttle&#xff09;详解 在前端开发中&#xff0c;防抖&#xff08;Debounce&#xff09; 和 节流&#xff08;Throttle&#xff09; 是两种优化高频触发事件的…

React-router 多类型历史记录栈

react-router 为了满足开发者更多路由历史存储场景&#xff0c;提供了以下几种模式&#xff1a; 浏览器原生历史记录 浏览器 hash 内存型 服务端记录 以上实现分别对应于一下 API 实现&#xff1a; createBrowserRouter&#xff1a;浏览器提供的历史管理。 createHashRou…