概述

本文介绍如何基于Flink源码进行二次开发,实现一个动态规则引擎系统。通过自定义算子和算子协调器,实现数据流的动态规则计算和协调管理。以此更好理解前面介绍的源码相关文章

项目需求

核心功能

实现一个动态规则引擎,具备以下特性:

  • 数据源产生两类数据:数据本身运算表达式
  • 按照运算表达式对数据进行运算并输出结果
  • 运算表达式可以动态更新
  • 支持多并行度的运算任务

架构设计

在这里插入图片描述

具体例子说明

场景:实时温度监控系统

假设我们有一个实时温度监控系统,需要对传感器数据进行动态计算:

数据源输入示例:
时间线:
T1: {"type": "rule", "expression": "temperature * 1.8 + 32"}  // 摄氏度转华氏度
T2: {"type": "data", "sensorId": "001", "temperature": 25.0}
T3: {"type": "data", "sensorId": "002", "temperature": 30.0}
T4: {"type": "data", "sensorId": "003", "temperature": 20.0}
T5: {"type": "rule", "expression": "temperature + 273.15"}   // 摄氏度转开尔文
T6: {"type": "data", "sensorId": "004", "temperature": 35.0}
T7: {"type": "data", "sensorId": "005", "temperature": 28.0}
期望的处理结果:
T2数据: 25.0 * 1.8 + 32 = 77.0°F    (使用第一个规则)
T3数据: 30.0 * 1.8 + 32 = 86.0°F    (使用第一个规则)
T4数据: 20.0 * 1.8 + 32 = 68.0°F    (使用第一个规则)
--- 规则切换点 ---
T6数据: 35.0 + 273.15 = 308.15K      (使用第二个规则)
T7数据: 28.0 + 273.15 = 301.15K      (使用第二个规则)
关键挑战:
  1. 数据一致性:T4的数据必须用第一个规则计算完成后,T6的数据才能开始用第二个规则计算
  2. 并行处理:如果有多个Calc Operator并行处理,需要确保它们都完成了旧规则的计算
  3. 无数据丢失:规则切换过程中不能丢失任何数据

处理流程详解:

当T5时刻新规则到达时:
1. Expression Operator收到新规则↓
2. 通知Coordinator更新规则: "temperature + 273.15"↓
3. 向所有Calc Operator广播: "请完成当前批次计算"↓
4. 阻塞数据流: T6、T7数据暂时不向下游发送↓
5. 等待所有Calc Operator汇报: "我已完成T4及之前的数据计算"↓
6. Coordinator确认所有Task完成后,通知Expression Operator: "可以继续"↓
7. 恢复数据流: T6、T7数据开始使用新规则处理

多并行度场景:

假设有3个Calc Operator并行处理:Calc-1: 正在处理T2数据 (25.0°C)
Calc-2: 正在处理T3数据 (30.0°C)
Calc-3: 正在处理T4数据 (20.0°C)当T5新规则到达时:
- 所有Calc都必须完成当前计算并汇报
- 只有收到3个完成汇报后,才能开始处理T6、T7数据

为什么需要Operator Coordinator?

问题:Flink的Task之间只能传递数据,无法传递控制信号
解决:通过Job Master中的Coordinator实现:
- Expression Operator → Coordinator: "新规则来了"
- Coordinator → 所有Calc Operator: "完成当前批次"
- 所有Calc Operator → Coordinator: "我完成了"
- Coordinator → Expression Operator: "可以继续了"

时序图示例:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/95978.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/95978.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/95978.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

「 CentOS7 安装部署k8s」

一、Linux系统部署K8s还是非常便利的,只需要掌握Linux常用命令,便可以迅速部署,一起来学习一下吧1、运行以下命令更新系统并安装必要工具:yum update -y yum install -y yum-utils device-mapper-persistent-data lvm22、安装Dock…

Disbursement on Quarantine Policy(概率、逆元计算期望)

题目描述There is a train with n rows, and there are m seats per row. All seats are occupied. For some passengers, we know they are being infected with COVID-19 or not. However, for other passengers, we are not sure about their status, and we assume each of…

AI 在金融领域的落地案例

目录 引言 一、信贷风控:基于 LoRA 的 Qwen-7B 模型微调(适配城商行审批场景) 场景背景 核心代码 1. 环境依赖安装 2. 金融数据集加载与预处理(城商行信贷数据) 3. LoRA 微调 Qwen-7B 模型 4. 模型推理&#xf…

平衡二叉树的调整

平衡二叉树的定义平衡二叉树(balanced binary tree),又称AVL树(Adelson-Velskii and Landis)。 一棵平衡二叉树或者是空树,或者是具有下列性质的二叉排序树:① 左子树与右子树的高度之差的绝对值小于等于1;…

深入解析:如何设计灵活且可维护的自定义消息机制

深入解析:如何设计灵活且可维护的自定义消息机制 引言 在现代软件开发中,组件间的通信机制至关重要。无论是前端框架中的组件交互,还是后端服务间的消息传递,一个良好的消息机制能显著提升代码的可维护性和扩展性。本文将深入探讨…

PostgreSQL——用户管理

PostgreSQL用户管理一、组角色管理1.1、创建组角色1.2、查看和修改组角色1.3、删除组角色二、角色的各种权限2.1、LOGIN(登录)2.2、SUPERUSER(超级用户)3.3、CREATEDB(创建数据库)3.4、CREATEROLE&#xff…

东软8位MCU使用问题总结

简介用的单片机为ES7P7021,采用8位RISC内核,2KB的FLASH,128bit的RAM。编译器使用东软提供的iDesigner,开发过程中编译器和单片机有一些地方使用时需要注意下。1.RAMclear()函数注意问题/****************************************…

深度学习在订单簿分析与短期价格预测中的应用探索

一、订单簿数据特性及预处理 1.1 订单簿数据结构解析 在金融交易领域,订单簿是市场微观结构的集中体现,它记录了不同价格水平的买卖订单信息。一个典型的订单簿由多个层级组成,每个层级包含特定价格上的买单和卖单数量。例如,在某…

Hashmap源码

目录 HashMap底层原理 JDK1.8及以后底层结构为:数组链表红黑树 默认参数 扩容机制 数组 链表 红黑树 HashMap为什么用红黑树不用B树 HashMap什么时候扩容 HashMap的长度为什么是 2的 N 次方 HashMap底层原理 JDK1.8及以后底层结构为:数组链表红…

【JAVA 字符串常量池、new String的存储机制、==与equals的区别,以及字符串重新赋值时的指向变化】

系列文章目录 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录系列文章目录代码原理解错误逻辑理解理解与修正&#xff1a…

博客项目 Spring + Redis + Mysql

基础模块1. 邮箱发送功能最初设计的接口 (雏形)public interface EmailService {/*** 发送验证码邮件** param email 目标邮箱* return 发送的code* throws RuntimeException 如果发送邮件失败,将抛出异常*/String sendVerificationCode(Stri…

前端处理导出PDF。Vue导出pdf

前言:该篇主要是解决一些简单的页面内容导出为PDF1.安装依赖使用到两个依赖,项目目录下运行这两个//页面转换成图片 npm install --save html2canvas //图片转换成pdf npm install jspdf --save 2.创建通用工具类exportPdf.js文件可以保存在工具类目录下…

【GM3568JHF】FPGA+ARM异构开发板烧录指南

1. Windows烧录说明 SDK 提供 Windows 烧写工具(工具版本需要 V3.31或以上),工具位于工程根目录: tools/ ├── windows/RKDevTool 如下图,编译生成相应的固件后,设备烧写需要进入 MASKROM 或 LOADER 烧写模式,准备…

C++ 多进程编程深度解析【C++进阶每日一学】

文章目录一、引言二、核心概念:进程 (Process)功能与作用三、C 多进程的实现方式四、核心函数详解1. fork() - 创建子进程函数原型功能说明返回值完整使用格式2. wait() 和 waitpid() - 等待子进程结束函数原型参数与返回值详解3. exec 系列函数 - 执行新程序函数族…

一周学会Matplotlib3 Python 数据可视化-绘制面积图(Area)

锋哥原创的Matplotlib3 Python数据可视化视频教程: 2026版 Matplotlib3 Python 数据可视化 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 课程介绍 本课程讲解利用python进行数据可视化 科研绘图-Matplotlib,学习Matplotlib图形参数基本设置&…

北京JAVA基础面试30天打卡11

1.索引创建注意事项 适合的场景 1.频繁使用where语句查询的字段 2.关联字段需要建立索 3.如果不创建索引,那么在连接的过程中,每个值都会进行一次全表扫描 4.分组和排序字段可以建立索引因为索引天生就是有序的,在分组和排序时优势不言而喻 5…

vscode无法检测到typescript环境解决办法

有一个vitereacttypescript项目,在工作电脑上一切正常。但是,在我家里的电脑运行,始终无法检测到typescript环境。即使出现错误的ts语法,也不会有报错提示,效果如下:我故意将一个string类型,传入…

【MCP开发】Nodejs+Typescript+pnpm+Studio搭建Mcp服务

MCP服务支持两种协议,Studio和SSE/HTTP,目前官方提供的SDK有各种语言。 开发方式有以下几种: 编程语言MCP命令协议发布方式PythonuvxSTUDIOpypiPython远程调用SSE服务器部署NodejspnpmSTUDIOpnpmNodejs远程调用SSE服务器部署… 一、初始化项…

vscode使用keil5出现变量跳转不了和搜索全局不了

vscode使用keil5出现变量跳转不了,或者未包含文件,或者未全局检索; 参考如下文章后还会出现; 为什么vscode搜索栏只搜索已经打开的文件_vscode全局搜索只能搜当前文件-CSDN博客 在机缘巧合之下发现如下解决方式: 下载…

命名空间——网络(net)

命名空间——网络(net) 一、网络命名空间:每个都是独立的“网络房间” 想象你的电脑是一栋大楼,每个网络命名空间就是大楼里的一个“独立房间”: 每个房间里有自己的“网线接口”(网卡)、“门牌…