随着AI大模型的大量普及,对于传统代码模式产生了不小的影响,特别是对于大数据领域,传统的规则引擎驱动的数据治理已经无法满足数据增长带来的治理需求。因此主动型治理手段逐渐成为主流,因此本文介绍一个基于deepSeek的流式数据自动化规则清洗案例来直观展现这一差异。
在这里插入图片描述

一、案例背景

某物联网平台需处理来自 5000 + 传感器的实时数据流(温度、湿度、设备状态等),日均数据量超 10TB,存在数据跳变、格式混乱、缺失率超 15% 等问题,传统人工规则维护成本高且响应滞后。本案例采用 deepSeek 实现自动化规则清洗,将数据合格率从 68% 提升至 99.2%。

二、准备工作(复制粘贴即可)

第一步:安装必要工具
打开电脑的命令提示符(Windows)或终端(Mac/Linux),逐行复制粘贴以下命令:

# 安装Python(已安装可跳过)
# Windows用户:https://www.python.org/ftp/python/3.9.7/python-3.9.7-amd64.exe 下载后双击安装,记得勾选"Add Python to PATH"
# Mac用户
brew install python@3.9# 安装核心工具
pip install deepseek-sdk kafka-python==2.0.2 pandas==1.5.3 pyspark==3.4.0
pip install pytest docker-compose

第二步:创建工作文件夹

# 创建并进入工作目录
mkdir deepseek_cleaning
cd deepseek_cleaning

核心代码(直接复制保存)
1. 数据接入代码(保存为 sensor_consumer.py)

from kafka import KafkaConsumer, KafkaProducer
import json
# 替换成别的大模型供应商SDK,DeepSeek本身不支持,本文此类描述仅表达对API调用示例
from deepseek.sdk import DeepSeekClient# 简单配置(新手无需修改)
KAFKA_SERVER = 'localhost:9092'
INPUT_TOPIC = 'sensor_data_topic'
OUTPUT_TOPIC = 'cleaned_data_topic'class SimpleDataProcessor:def __init__(self):# 初始化连接(复制后只需改API_KEY)self.api_key = "你的deepseek_api_key"  # 这里替换为你的API密钥self.client = DeepSeekClient(api_key=self.api_key)# 初始化生产者和消费者self.consumer = KafkaConsumer(INPUT_TOPIC,bootstrap_servers=KAFKA_SERVER,auto_offset_reset='earliest',value_deserializer=lambda x: json.loads(x.decode('utf-8')))self.producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER,value_serializer=lambda x: json.dumps(x).encode('utf-8'))def process(self):print("开始处理数据(按Ctrl+C停止)...")for msg in self.consumer:raw_data = msg.valuetry:# 基础清洗cleaned = self.basic_clean(raw_data)# 用deepseek验证格式validated = self.client.validate_schema(cleaned, schema_name="sensor_v2")# 发送到下一站self.producer.send(OUTPUT_TOPIC, validated)print(f"处理成功: {validated}")except Exception as e:print(f"处理失败: {str(e)}")def basic_clean(self, data):# 简单清洗逻辑(自动补全缺失字段)cleaned = {"device_id": data.get("device_id", "unknown"),"timestamp": data.get("timestamp", self.get_current_time()),"temperature": self.fix_temperature(data.get("temperature")),"humidity": data.get("humidity", 0)}return cleaneddef fix_temperature(self, temp):# 修复温度值(防止负数)if temp is None:return 25.0return max(0.0, float(temp))def get_current_time(self):# 获取当前时间戳import timereturn int(time.time())if __name__ == "__main__":processor = SimpleDataProcessor()processor.process()

2. 规则引擎代码(保存为 rule_engine.py)

import pandas as pd
# 替换成别的大模型供应商SDK,DeepSeek本身不支持,本文此类描述仅表达对API调用示例
from deepseek.sdk.rule_engine import RuleEngineclass EasyRuleEngine:def __init__(self, api_key):self.rule_engine = RuleEngine(client=DeepSeekClient(api_key=api_key),model_name="rule-generator-v3")# 初始化简单规则self.init_basic_rules()def init_basic_rules(self):# 创建初始规则(无需历史数据)sample_data = pd.DataFrame({"temperature": [20, 25, 30, 1000],  # 包含一个异常值"humidity": [50, 60, 70, 200]})self.rule_engine.train(data=sample_data,label_column=None,  # 自动识别异常max_rules=5)def clean_data(self, data):# 转换为DataFramedf = pd.DataFrame([data])# 应用规则rules = self.rule_engine.get_active_rules()for rule in rules:df = self.apply_rule(df, rule)# 转换回字典return df.to_dict('records')[0]def apply_rule(self, df, rule):# 应用单个规则if rule["type"] == "range_check":min_val = rule["params"]["min"]max_val = rule["params"]["max"]df[rule["field"]] = df[rule["field"]].clip(min_val, max_val)return df

3. 主程序(保存为 main.py)

from sensor_consumer import SimpleDataProcessor
from rule_engine import EasyRuleEngine
import timeif __name__ == "__main__":# 替换为你的API密钥API_KEY = "你的deepseek_api_key"# 初始化规则引擎rule_engine = EasyRuleEngine(API_KEY)# 初始化数据处理器processor = SimpleDataProcessor()processor.api_key = API_KEY  # 设置API密钥# 启动处理print("系统启动成功!正在等待数据...")try:while True:processor.process()time.sleep(1)except KeyboardInterrupt:print("系统已停止")

4. Docker 配置文件(保存为 docker-compose.yml)

version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:7.4.0  # 固定版本environment:ZOOKEEPER_CLIENT_PORT: 2181ports:- "2181:2181"kafka:image: confluentinc/cp-kafka:7.4.0  # 固定版本depends_on:- zookeeperports:- "9092:9092"environment:# 强制使用 ZooKeeper 模式的关键配置KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092# 禁用 KRaft 模式KAFKA_CFG_PROCESS_ROLES: ""# 简化配置KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
  1. 测试数据生成器(保存为 test_data_sender.py)
from kafka import KafkaProducer
import json
import time
import randomproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# 生成测试数据
while True:data = {"device_id": f"dev_{random.randint(1, 10)}","timestamp": int(time.time()),"temperature": random.uniform(10, 35) if random.random() > 0.2 else random.uniform(100, 200),  # 20%异常值"humidity": random.uniform(30, 80)}producer.send('sensor_data_topic', data)print(f"发送测试数据: {data}")time.sleep(2)  # 每2秒发一条

三、部署步骤(全程复制粘贴)

第一步:启动基础服务

# 在工作目录下执行
docker-compose up -d

看到 “Creating deepseek_cleaning-zookeeper-1 … done” 表示成功
第二步:创建 Kafka 主题

# 等待10秒让服务启动
sleep 10# 进入Kafka容器
docker exec -it deepseek_cleaning-kafka-1 bash# 在容器内执行(复制这两行)
kafka-topics --create --topic sensor_data_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
kafka-topics --create --topic cleaned_data_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1# 退出容器(输入)
exit

第三步:运行系统
打开 3 个命令提示符 / 终端窗口,分别执行:
窗口 1:启动主程序

cd deepseek_cleaning
python main.py

窗口 2:发送测试数据

cd deepseek_cleaning
python test_data_sender.py

窗口 3:查看清洗结果

cd deepseek_cleaning
# 安装查看工具
pip install kafka-console-consumer
# 查看清洗后的数据
kafka-console-consumer --bootstrap-server localhost:9092 --topic cleaned_data_topic --from-beginning

验证结果
在窗口 3 中,你会看到类似这样的输出(异常温度被修正):

{"device_id": "dev_3", "timestamp": 1620000000, "temperature": 35.0, "humidity": 65.2}

而窗口 2 发送的原始数据可能包含 100 以上的温度值,说明清洗成功。

四、常见问题解决

启动失败:检查是否替换了代码中的 “你的 deepseek_api_key”(需要去 deepseek 官网申请免费密钥)
Kafka 连接错误:确保 docker-compose 启动成功,可执行docker-compose ps查看状态
缺少模块:重新运行第一步的 pip 安装命令
端口占用:关闭其他占用 9092 或 2181 端口的程序,或重启电脑

五、停止服务

# 停止程序:在每个窗口按Ctrl+C
# 停止Docker服务
docker-compose down

六、总结

(一)核心区别​

规则生成模式​
传统方案:需数据工程师编写 SQL / 代码定义规则(如WHERE temperature < 100)​
本方案:deepseek 通过历史数据自动生成规则,示例规则输出:​

{"type": "range_check","field": "temperature","params": {"min": 200, "max": 400},"confidence": 0.98}

(二)处理链路​

  1. 传统方案:固定处理流程(过滤→转换→存储),修改需重启服务​
  2. 本方案:动态规则链,支持实时插入新规则(如临时增加暴雨天气的湿度阈值调整)​

(三)关键优化​

  1. 时效性提升:规则迭代周期从周级缩短至分钟级,应对设备固件升级等突发场景​
  2. 资源利用率:通过 deepseek 的规则优先级调度,计算资源消耗降低 40%​
  3. 可维护性:自动生成规则文档,减少 80% 的人工维护成本
  4. 容错能力:支持规则回滚机制,当新规则导致数据异常时可一键恢复至稳定

希望本文可以对你后续工作带来帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/92325.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/92325.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/92325.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【论文分析】【Agent】SEW: Self-Evolving Agentic Workflows for Automated Code Generatio

1.论文信息标题&#xff1a;SEW: Self-Evolving Agentic Workflows for Automated Code Generatio&#xff1a;用于自动代码生成的自我进化的代理工作流程收录的会议/期刊&#xff1a;作者信息&#xff1a;arxiv&#xff1a;&#x1f517;github网站&#xff1a;&#x1f517;g…

MCP 协议:AI 时代的 “万能转接头”,从 “手动粘贴” 到 “万能接口”:MCP 协议如何重构 AI 工具调用规则?

注&#xff1a;此文章内容均节选自充电了么创始人&#xff0c;CEO兼CTO陈敬雷老师的新书《GPT多模态大模型与AI Agent智能体》&#xff08;跟我一起学人工智能&#xff09;【陈敬雷编著】【清华大学出版社】 清华《GPT多模态大模型与AI Agent智能体》书籍配套视频课程【陈敬雷…

VUE本地构建生产环境版本用于局域网访问

&#x1f680;构建生产环境版本用于局域网访问&#xff08;适用于 Vue 项目&#xff09; 在开发 Vue 项目的过程中&#xff0c;很多人使用 yarn serve 启动开发服务器进行调试。但开发模式存在以下问题&#xff1a; 访问速度慢&#xff0c;特别是局域网访问&#xff1b;热更新频…

【密码学】5. 公钥密码

这里写自定义目录标题公钥密码密码学中的常用数学知识群、环、域素数和互素数模运算模指数运算费尔马定理、欧拉定理、卡米歇尔定理素性检验欧几里得算法中国剩余定理&#xff08;CRT&#xff09;离散对数二次剩余循环群循环群的选取双线性映射计算复杂性公钥密码体制的基本概念…

VINS-Fusion+UWB辅助算法高精度实现

VINS-FusionUWB辅助算法高精度实现 摘要 本文详细介绍了基于VINS-Fusion框架结合UWB辅助的高精度定位算法实现。通过将视觉惯性里程计(VIO)与超宽带(UWB)测距技术融合&#xff0c;显著提高了复杂环境下的定位精度和鲁棒性。本文首先分析了VINS-Fusion和UWB各自的技术特点&#…

新手向:Python实现简易计算器

你是否一直想学习编程但不知从何入手&#xff1f;这篇详细的教程将带领完全零基础的读者&#xff0c;循序渐进地掌握如何用Python实现一个简易计算器。我们将从最基本的编程概念讲起&#xff0c;确保每一位初学者都能跟上进度。准备工作在开始之前&#xff0c;你需要&#xff1…

区块链赋能供应链金融:解决信任与效率问题

摘要: 随着全球经济一体化和数字化进程的加速,供应链金融在实体经济发展中的作用愈发关键。然而,传统供应链金融面临着信任机制薄弱和效率低下等诸多挑战。区块链技术凭借其去中心化、不可篡改、可追溯等特性,为供应链金融带来了创新的解决方案,能够有效解决信任与效率问题…

无人机 × 巡检 × AI识别:一套可复制的超低延迟低空视频感知系统搭建实践

✳️ 引言&#xff1a;低空感知&#xff0c;正重构数字世界的“底层感官接口” 随着低空经济进入规模化部署阶段&#xff0c;感知系统不再是“任务辅助”&#xff0c;而是演变为支撑智能化运行的基础设施核心模块。从电力巡检的高空细节识别&#xff0c;到城市安防的区域态势掌…

STM32U5 外部中断不响应问题分析

关键字&#xff1a; EXTI 1. 问题背景 客户的终端客户反馈产品会有偶发性的功能异常。问题比较难以复现。 经过调查&#xff0c;在 BOOT 程序跳转到 APP1 程序中时相对比较容易复现问题。查看客户代码&#xff0c;发现客户在 BOOT 程序中会对 EXTI 进行初始化&#xff0c;跳…

17.Linux :selinux

Linux &#xff1a; selinux DAC vs MAC 对比模型控制方式决策依据安全强度DAC自主访问控制文件所有者的权限设置低MAC强制访问控制系统级安全策略极高SELinux的核心原理是基于 强制访问控制&#xff08;MAC&#xff09; 模型&#xff0c;通过为系统资源打上安全标签并制定精细…

如何在不停机的情况下,将MySQL单库的数据迁移到分库分表的架构上?

在业务高速发展的过程中&#xff0c;单库单表的MySQL架构往往会成为系统性能的瓶颈。将单库迁移到分库分表架构是一种常见的扩展方案&#xff0c;但如何在保证业务连续性的前提下完成这一迁移是一个挑战。以下是不停机迁移的几种主要方案&#xff1a; 一、基于双写的迁移方案 1…

Unix/Linux 系统编程中用于管理信号处理行为的核心概念或模型

在 Unix/Linux 系统编程中&#xff0c;管理信号处理行为涉及以下核心概念和模型&#xff0c;它们共同构成了信号处理的框架&#xff1a;1. 信号&#xff08;Signal&#xff09;模型 软件中断&#xff1a;信号是异步事件通知机制&#xff0c;类比硬件中断预定义类型&#xff1a;…

webrtc弱网-OveruseFrameDetector源码分析与算法原理

一、核心功能CPU负载检测&#xff1a;监控视频帧的捕获、编码、发送全流程耗时&#xff0c;实时计算CPU使用率自适应决策&#xff1a;基于CPU使用率阈值触发视频质量调整&#xff08;降级/升级&#xff09;多策略支持&#xff1a;提供新旧两套CPU负载估计算法&#xff0c;支持实…

Spring Cloud系列—Eureka服务注册/发现

上篇文章&#xff1a; Spring Cloud系列—简介https://blog.csdn.net/sniper_fandc/article/details/149936339?fromshareblogdetail&sharetypeblogdetail&sharerId149936339&sharereferPC&sharesourcesniper_fandc&sharefromfrom_link 在上篇文章中&…

QUdpSocket 详解:从协议基础、通信模式、数据传输特点、应用场景、调用方式到实战应用全面解析

前言 在网络通信的世界里&#xff0c;UDP 协议以其独特的 “快准狠” 特性占据着一席之地。作为 Qt 框架中 UDP 协议的封装者&#xff0c;QUdpSocket 为开发者提供了便捷高效的网络编程接口。​ 一、UDP 协议基础&#xff1a;QUdpSocket 的 历史 要理解 QUdpSocket&#xff0c;…

vue中reactive()和ref()的用法

在 Vue 3 的 Composition API 里&#xff0c;reactive() 和 ref() 都是用来把「普通数据」变成「响应式数据」的函数。 一句话区别&#xff1a; reactive() 只能包裹对象/数组&#xff1b;ref() 可以包裹任何类型&#xff0c;但在 模板 里读取时&#xff0c;不需要 .value。 下…

【公考基础】----备考规划篇

公考 公考&#xff1a;国家公务员考试 即&#xff1a;国考和省考 或 参公考试 包括但不限于&#xff1a;国考、省考、事业单位招考、教师招聘考试、军队文职招考等&#xff0c;一切进入国家党政军事业单位的考试。 考公整体流程 备考前&#xff1a;准备备考资料&#xf…

STM32江科大学习笔记,全功能按键非阻塞式实现,按键点击,双击,长按

目录 一、前言 二、关于实现非阻塞的办法 2.1 中断类型的选择 2.2 定时器中断 二、程序流程图 2.1 状态S0空闲状态 2.2 状态S1按键判断长按还是其他的事件 2.3 状态S2按键判断双击或者单击 2.4 状态S3按键已双击状态 2.5 状态S4长按状态 三、编写代码 3.1 按键初始…

动态代理常用的两种方式?

口语化回答好的&#xff0c;面试官&#xff0c;动态常见的两种&#xff0c;一种是 jdk 动态代理&#xff0c;一种是 cglib 动态代理&#xff0c;两者的最主要区别是 jdk 动态代理主要是依赖于接口创建代理对象&#xff0c;cglib 是通过生成子类的方式&#xff0c;不需要接口&am…

StarRocks vs ClickHouse:2025 年 OLAP 引擎终极对比指南

StarRocks 与 ClickHouse&#xff1a;高性能 OLAP 引擎的两种选择在当今数据驱动的商业环境中&#xff0c;选择合适的分析型数据库对于企业数据战略至关重要。StarRocks 和 ClickHouse 作为两款领先的 OLAP&#xff08;在线分析处理&#xff09;引擎&#xff0c;各自拥有独特的…