在物联网系统中,时序数据库(TSDB)和关系型数据库(RDBMS)的存储顺序设计需要根据数据特性、业务需求和系统架构综合考虑。以下是典型的设计方案和逻辑顺序:


1. 常见存储顺序方案

方案一:先写时序数据库,后异步同步到关系型数据库

适用场景:高频传感器数据为主,业务数据可容忍短暂延迟。
流程

  1. MQTT Broker 接收设备原始数据(如 devices/A/temperature)。
  2. 数据首先写入时序数据库(如InfluxDB):
    • 存储原始时间序列数据(高吞吐、低延迟)。
  3. 异步处理层(如Kafka/Flink)消费数据,处理后写入关系型数据库:
    • 提取关键状态(如最新温度值)写入MySQL的device_status表。
    • 关联设备元数据(如设备所属用户)。

优点

  • 确保传感器数据的写入性能最大化。
  • 避免高频写入拖累关系型数据库。

示例代码(伪代码):

# MQTT回调处理
def on_mqtt_message(topic, payload):# 1. 原始数据写入InfluxDBinflux.write({"measurement": "sensor_data","tags": {"device_id": topic.split('/')[1]},"fields": {"temperature": payload.temp},"time": payload.timestamp})# 2. 异步推送至Kafka,后续处理kafka.produce("device_updates", key=device_id, value=payload)# Kafka消费者处理业务逻辑
def kafka_consumer():for message in kafka.consume():# 3. 关联设备元数据并写入MySQLdevice = mysql.query("SELECT * FROM devices WHERE id = ?", message.device_id)mysql.execute("UPDATE device_status SET last_temp = ?, updated_at = NOW() WHERE device_id = ?",message.temp, message.device_id)

示例代码(以下是使用Java实现的等效代码,包含MQTT回调处理、InfluxDB写入和通过Kafka异步处理写入MySQL的逻辑):

import org.eclipse.paho.client.mqttv3.*;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.sql.*;
import java.time.Instant;
import java.util.Properties;public class MqttDataProcessor {// InfluxDB 配置private final InfluxDBClient influxDBClient;// Kafka 生产者private final KafkaProducer<String, DeviceData> kafkaProducer;// MySQL 连接private final Connection mysqlConnection;public MqttDataProcessor(InfluxDBClient influxDBClient, KafkaProducer<String, DeviceData> kafkaProducer,Connection mysqlConnection) {this.influxDBClient = influxDBClient;this.kafkaProducer = kafkaProducer;this.mysqlConnection = mysqlConnection;}// MQTT回调处理public IMqttMessageListener createMqttListener() {return (topic, message) -> {try {// 解析payloadDeviceData data = parsePayload(topic, message.getPayload());// 1. 原始数据写入InfluxDBwriteToInfluxDB(data);// 2. 异步推送至KafkasendToKafka(data);} catch (Exception e) {e.printStackTrace();}};}private DeviceData parsePayload(String topic, byte[] payload) {// 这里应该是你的实际payload解析逻辑String deviceId = topic.split("/")[1];// 示例: 假设payload是JSON格式 {"temp": 25.5, "timestamp": 123456789}String json = new String(payload);// 实际项目中可以使用Gson/Jackson等库double temp = Double.parseDouble(json.split("\"temp\":")[1].split(",")[0]);long timestamp = Long.parseLong(json.split("\"timestamp\":")[1].split("}")[0]);return new DeviceData(deviceId, temp, Instant.ofEpochSecond(timestamp));}private void writeToInfluxDB(DeviceData data) {try (WriteApi writeApi = influxDBClient.getWriteApi()) {Point point = Point.measurement("sensor_data").addTag("device_id", data.getDeviceId()).addField("temperature", data.getTemp()).time(data.getTimestamp(), WritePrecision.S);writeApi.writePoint(point);}}private void sendToKafka(DeviceData data) {ProducerRecord<String, DeviceData> record = new ProducerRecord<>("device_updates", data.getDeviceId(), data);kafkaProducer.send(record, (metadata, exception) -> {if (exception != null) {exception.printStackTrace();}});}// Kafka消费者处理业务逻辑public void startKafkaConsumer() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "device-data-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.your.package.DeviceDataDeserializer"); // 需要自定义try (KafkaConsumer<String, DeviceData> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(List.of("device_updates"));while (true) {ConsumerRecords<String, DeviceData> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, DeviceData> record : records) {// 3. 关联设备元数据并写入MySQLupdateMySQL(record.value());}}}}private void updateMySQL(DeviceData data) {String query = "SELECT * FROM devices WHERE id = ?";String update = "UPDATE device_status SET last_temp = ?, updated_at = NOW() WHERE device_id = ?";try (PreparedStatement selectStmt = mysqlConnection.prepareStatement(query);PreparedStatement updateStmt = mysqlConnection.prepareStatement(update)) {// 查询设备元数据selectStmt.setString(1, data.getDeviceId());ResultSet rs = selectStmt.executeQuery();if (rs.next()) {// 更新设备状态updateStmt.setDouble(1, data.getTemp());updateStmt.setString(2, data.getDeviceId());updateStmt.executeUpdate();}} catch (SQLException e) {e.printStackTrace();}}// 设备数据DTOpublic static class DeviceData {private String deviceId;private double temp;private Instant timestamp;// 构造器、getter和setterpublic DeviceData(String deviceId, double temp, Instant timestamp) {this.deviceId = deviceId;this.temp = temp;this.timestamp = timestamp;}// 省略getter和setter...}
}// 使用示例
public class Main {public static void main(String[] args) throws Exception {// 初始化InfluxDB客户端InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", "token".toCharArray(),"org", "bucket");// 初始化Kafka生产者Properties kafkaProps = new Properties();kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.your.package.DeviceDataSerializer"); // 需要自定义KafkaProducer<String, DeviceData> kafkaProducer = new KafkaProducer<>(kafkaProps);// 初始化MySQL连接Connection mysqlConn = DriverManager.getConnection("jdbc:mysql://localhost:3306/iot_db", "user", "password");// 创建处理器MqttDataProcessor processor = new MqttDataProcessor(influxDBClient, kafkaProducer, mysqlConn);// 启动Kafka消费者线程new Thread(processor::startKafkaConsumer).start();// 配置MQTT客户端MqttClient mqttClient = new MqttClient("tcp://broker.example.com:1883", "java-client");mqttClient.connect();// 订阅主题并设置回调mqttClient.subscribe("devices/+/data", 0, processor.createMqttListener());}
}

注意事项:
依赖库:需要添加以下依赖:

MQTT: org.eclipse.paho.client.mqttv3

InfluxDB: com.influxdb.influxdb-client-java

Kafka: org.apache.kafka.kafka-clients

MySQL: mysql.mysql-connector-java

序列化:需要为Kafka实现DeviceData的序列化器和反序列化器。

错误处理:实际项目中需要更完善的错误处理和重试机制。

资源管理:确保正确关闭所有连接和资源。

线程安全:如果高并发场景,需要考虑线程安全问题。


方案二:双写(时序库+关系库)

适用场景:数据一致性要求高,且写入压力可控。
流程

  1. MQTT消息同时写入时序数据库和关系型数据库(需事务或最终一致性保证)。
  2. 关系型数据库仅存储关键状态快照(如设备最新状态),而非全部原始数据。

优点

  • 数据实时一致,适合关键业务状态(如设备告警阈值)。

挑战

  • 需处理写入冲突(如使用分布式事务或补偿机制)。

方案三:关系型数据库为主,定期归档到时序库

适用场景:历史数据分析需求明确,但实时查询以业务数据为主。
流程

  1. 数据先写入MySQL的device_realtime表。
  2. 定时任务将过期数据批量迁移至InfluxDB,MySQL中仅保留近期数据。

优点

  • 简化实时业务查询(所有数据在MySQL中)。
  • 降低MySQL存储压力。

2. 存储顺序设计原则

(1)根据数据特性分层
数据层级存储目标数据库选择示例
原始时序数据高频写入、长期存储时序数据库每秒温度读数
状态快照最新状态查询关系型数据库设备当前温度、在线状态
业务元数据关联查询、事务操作关系型数据库设备所属用户、地理位置
(2)写入路径优化
  • 高频数据路径:MQTT → 时序数据库 → (可选)异步聚合后写入关系库。
  • 低频元数据路径:业务系统直接CRUD操作关系型数据库。
(3)一致性保证
  • 最终一致性:通过消息队列(如Kafka)解耦,确保数据最终同步。
  • 强一致性:使用分布式事务(如XA协议),但性能较低。

3. 典型物联网架构示例

在这里插入图片描述

关键点

  1. 实时性要求高的数据(如传感器读数)直连时序数据库。
  2. 需要业务关联的数据(如“设备所属用户”)通过流处理关联后写入MySQL。
  3. 历史数据分析直接从时序数据库查询。

4. 选择建议

  • 优先时序数据库:若90%以上的查询是基于时间范围的聚合(如“过去24小时温度趋势”)。
  • 优先关系型数据库:若需频繁JOIN查询(如“查询设备A的所有者手机号”)。
  • 混合使用:大多数生产环境会同时使用两者,通过写入顺序设计平衡性能与功能需求。

通过合理设计存储顺序,可以同时满足物联网场景的高性能写入复杂业务查询需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/89244.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/89244.shtml
英文地址,请注明出处:http://en.pswp.cn/web/89244.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

django安装、跨域、缓存、令牌、路由、中间件等配置

注意&#xff1a;如果是使用 PyCharm 编程工具就不用创建虚拟化&#xff0c;直接打开 PyCharm 选择新建的目录直接调过下面的步骤11. 项目初始化如果不是用 PyCharm 编辑器就需要手动创建虚拟环境在项目目录cmd&#xff0c;自定义名称的虚拟环境# 激活虚拟环境 python -m venv …

时间的弧线,逻辑的航道——标准单元延迟(cell delay)的根与源

时序弧 在这篇文章中&#xff0c;我们将讨论影响标准单元延迟的因素。在开始讨论之前&#xff0c;我们需要先了解一下什么是时序弧 (Timing Arcs)&#xff1a; 时序弧 (Timing Arcs)&#xff1a; 时序弧代表了信号从一个输入流向一个输出的方向。它存在于组合逻辑和时序逻辑中&…

《透视定轴:CSS 3D魔方中视觉层级的秩序法则》

当CSS的代码编织出一个能自由旋转的3D魔方&#xff0c;六个色彩各异的面在空间中翻转、重叠时&#xff0c;最考验技术的并非旋转动画的流畅度&#xff0c;而是每个面在任意角度下都能保持符合现实逻辑的前后关系。为何有时某个面会突兀地“穿透”另一个面&#xff1f;为何旋转到…

RTL编程中常用的几种语言对比

以下是RTL&#xff08;寄存器传输级&#xff09;编程中常用的几种硬件描述语言&#xff08;HDL&#xff09;及其核心差异的对比分析。RTL编程主要用于数字电路设计&#xff0c;通过描述寄存器间的数据传输和逻辑操作实现硬件功能。以下内容综合了行业主流语言的技术特性与应用场…

前端面试题(HTML、CSS、JavaScript)

目录 一、HTML src与href区别 对html语义化理解 语义化标签有哪些&#xff1f; script中的defer与async区别 行内元素与块级元素有哪些&#xff1f; canvas与svg区别 SEO优化 html5新特性 二、CSS 盒模型 选择器优先级 伪元素与伪类 隐藏元素几种方式 水平/垂直…

Linux-线程控制

线程等待pthread_join()pthread_join 是 Linux 系统中用于线程同步的重要函数&#xff0c;主要作用是等待指定线程结束并回收其资源。基本功能- 阻塞当前调用线程&#xff0c;直到目标线程执行结束。 - 回收目标线程的资源&#xff0c;避免产生“僵尸线程”。 - 可选地获取目标…

RAG优化秘籍:基于Tablestore的知识库答疑系统架构设计

目录一、技术架构设计二、双流程图解析横向架构对比纵向核心流程三、企业级代码实现Python检索核心TypeScript前端接入YAML部署配置四、性能对比验证五、生产级部署方案六、技术前瞻分析附录&#xff1a;完整技术图谱一、技术架构设计 原创架构图 #mermaid-svg-3Ktoc4oH4xlbD6…

i.mx8 RTC问题

项目场景&#xff1a;需要增加外置RTC&#xff0c;保证时间的精准。问题描述&#xff1a;基本情况&#xff0c;外置i2c接口的RTC&#xff0c;注册、读写都正常&#xff0c;但是偶发性重启后&#xff0c;系统时间是2022&#xff0c;rtc时间是1970&#xff0c;都像是恢复了默认时…

数据集相关类代码回顾理解 | utils.make_grid\list comprehension\np.transpose

目录 utils.make_grid list comprehension np.transpose utils.make_grid x_gridutils.make_grid(x_grid, nrow4, padding2) make_grid 函数来自torchvision的utils模块&#xff0c;用于图像数据可视化&#xff0c;将一批图像排列成一个网格。 x_grid&#xff1a;四维图像…

C#中Static关键字解析

本文仅作为参考大佬们文章的总结。 Static关键字是C#语言中一个基础而强大的特性&#xff0c;它能够改变类成员的行为方式和生命周期。本文系统性总结static关键字的各类用法、核心特性、适用场景以及需要注意的问题&#xff0c;以帮助掌握这一重要概念。 一、Static关键字概…

通用综合文字识别联动 MES 系统:OCR 是数据流通的核心

制造业的 MES 系统需实时整合生产数据以调控流程&#xff0c;但车间的工单、物料标签、质检报告等多为纸质或图片形式&#xff0c;传统人工录入不仅滞后&#xff0c;还易出错&#xff0c;导致 MES 系统数据断层。通用综合文字识别借助 OCR 技术&#xff0c;成为连接这些信息与 …

【Linux 学习指南】网络编程基础:从 IP、端口到 Socket 与 TCP/UDP 协议详解

文章目录&#x1f4dd;理解源IP地址和目的IP地址&#x1f320; 认识端口号&#x1f309;端口号范围划分&#x1f309;理解"端口号"和"进程ID"&#x1f309;理解源端口号和目的端口号&#x1f309;理解socket&#x1f320;传输层的典型代表&#x1f309;认识…

React+Next.js+Tailwind CSS 电商 SEO 优化

一、项目背景与技术选型​1. 原始痛点​项目最初基于纯 React 开发&#xff08;SPA 架构&#xff09;&#xff0c;存在三个致命问题&#xff1a;​搜索引擎爬虫无法有效抓取动态渲染的商品详情、分类页内容&#xff1b;​单页面应用 难以实现页面级的 meta 定制&#xff0c;关键…

Process Lasso:提升电脑性能的得力助手

在日常使用电脑的过程中&#xff0c;我们常常会遇到这样的问题&#xff1a;电脑运行缓慢、程序响应迟缓、多任务处理时卡顿不断。这些问题不仅影响工作效率&#xff0c;还让人感到非常烦躁。其实&#xff0c;这些问题很多时候是因为电脑的进程管理不够优化。而Process Lasso正是…

AI驱动的大前端内容创作与个性化推送:资讯类应用实战指南

在信息爆炸的时代&#xff0c;资讯类应用面临两大核心挑战&#xff1a;一是如何高效生产海量优质内容&#xff0c;二是如何让用户从海量信息中快速获取感兴趣的内容。AI技术的介入正在重构资讯类应用的开发模式&#xff0c;从内容生产到用户触达形成全链路智能化。本文将从开发…

2025/7/16——java学习总结

Java IO 流全体系总结&#xff1a;从基础到实战的完整突破&#xff08;重写&#xff09;一、基础核心&#xff1a;字节流与字符流的底层逻辑&#xff08;一&#xff09;字节流&#xff1a;二进制数据的读写基础操作字节输入流&#xff1a;掌握 FileInputStream 单字节读取细节&…

书籍自然数数组的排序(8)0715

题目给定一个长度为N的整型数组arr&#xff0c;其中有N个互不相等的自然数1~N&#xff0c;请实现arr的排序&#xff0c;但是不要把下标0~N-1位置上的数通过直接赋值的方式替换成1~N。解答 arr在调整之后应该事下标从0到N-1的位置上依次放着1~N&#xff0c;即arr[index] index …

【08】MFC入门到精通——MFC模态对话框 和 非模态对话框 解析 及 实例演示

文章目录八、模态对话框 和 非模态对话框 创建及显示8.1 对话框是怎样弹出的8.2 模态对话框的创建及显示8.3 非模态对话框的创建及显示8.4 完整代码下载八、模态对话框 和 非模态对话框 创建及显示 Windows对话框分为两类&#xff1a;模态对话框 和 非模态对话框。 模态对话框…

github上传大文件(多种解决方案)

之前一直用vscode的上传项目方法&#xff0c;这个方便之处在于不用打开git终端输入各种命令&#xff0c;不过麻烦的是我一直无法拉取github上的远程仓库提交&#xff0c;每次只能更新已有的仓库并且上传的文件还不能太大&#xff0c;应该是不能超过100MB&#xff0c;而且直接在…

生活污水深度除磷的方法

生活污水中磷含量过多的危害大家都知道总磷是水质检测的重要指标之一&#xff0c;在污水处理中生活污水往往都会出现总磷超标的现象。生活污水磷超标的危害是多方面的主要包括水体富营养化、危害水生生物、影响人类健康&#xff0c;以及可能引发蓝藻水华等问题。除磷方法污水的…