一、UDF 核心原理

Flink 自定义函数(UDF)是扩展 Table API/SQL 能力的核心机制,允许将自定义逻辑嵌入查询。其设计遵循以下原则:

1. 函数类型体系

类型输入输出关系核心用途
标量函数(ScalarFunction)0~N 个标量 → 1 个标量字段转换、值计算
表值函数(TableFunction)0~N 个标量 → 多行多列数据拆分、关联外部数据
聚合函数(AggregateFunction)多行标量 → 1 个标量自定义聚合(如加权平均)
表值聚合函数(TableAggregateFunction)多行标量 → 多行多列分组TopN、分桶统计等
异步表值函数异步查询外部系统 → 多行多列高效关联外部数据库/API

2. 类型系统

  • 标量/表值函数使用新数据类型系统(基于DataTypes
  • 聚合函数仍使用旧类型系统(基于TypeInformation
  • 类型推导:默认通过反射获取,复杂场景可通过@DataTypeHint@FunctionHint注解显式指定

3. 执行逻辑

  • 核心是求值方法(如eval()accumulate()),定义数据处理逻辑
  • 生命周期:open()初始化 → 求值方法调用 → close()资源清理
  • 确定性:通过isDeterministic()声明是否返回确定结果(影响优化策略)

二、快速上手实战

1. 标量函数(ScalarFunction)

作用:对输入标量做转换计算(如字符串处理、格式转换)

实现步骤

  1. 继承ScalarFunction,实现eval()方法
    public class HashFunction extends ScalarFunction {// 输入任意类型,返回哈希值public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.hashCode();}
    }
    
  2. 注册与调用
    // 注册
    tableEnv.createTemporarySystemFunction("HashFunc", HashFunction.class);
    // Table API 调用
    table.select(call("HashFunc", $("field")));
    // SQL 调用
    tableEnv.sqlQuery("SELECT HashFunc(field) FROM t");
    

2. 表值函数(TableFunction)

作用:将单行输入拆分为多行输出(如字符串按分隔符拆分)

实现步骤

  1. 继承TableFunction<T>,通过collect()输出结果
    @FunctionHint(output = @DataTypeHint("ROW<word STRING, len INT>"))
    public class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(" ")) {collect(Row.of(s, s.length())); // 输出每行数据}}
    }
    
  2. 注册与调用
    tableEnv.createTemporarySystemFunction("SplitFunc", SplitFunction.class);
    // 关联查询(LATERAL JOIN)
    tableEnv.sqlQuery("""SELECT t.id, s.word, s.len FROM t, LATERAL TABLE(SplitFunc(t.content)) AS s(word, len)
    """);
    

3. 聚合函数(AggregateFunction)

作用:多行数据聚合为单个值(如自定义平均值、求和逻辑)

实现步骤

  1. 定义累加器(存储中间结果)
    public class WeightedAvgAccum {public long sum = 0;   // 加权和public int count = 0; // 权重总和
    }
    
  2. 继承AggregateFunction,实现核心方法
    public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {@Overridepublic WeightedAvgAccum createAccumulator() { return new WeightedAvgAccum(); }// 累加逻辑public void accumulate(WeightedAvgAccum acc, long value, int weight) {acc.sum += value * weight;acc.count += weight;}// 最终结果计算@Overridepublic Long getValue(WeightedAvgAccum acc) {return acc.count == 0 ? null : acc.sum / acc.count;}
    }
    
  3. 注册与调用
    tableEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);
    tableEnv.sqlQuery("""SELECT user, WeightedAvg(score, weight) FROM scores GROUP BY user
    """);
    

4. 表值聚合函数(TableAggregateFunction)

作用:多行数据聚合为多行结果(如分组取TopN)

实现步骤

  1. 定义累加器(存储中间状态)
    public class Top2Accum {public int first;  // 第一名public int second; // 第二名
    }
    
  2. 继承TableAggregateFunction,实现核心方法
    public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {@Overridepublic Top2Accum createAccumulator() {Top2Accum acc = new Top2Accum();acc.first = Integer.MIN_VALUE;acc.second = Integer.MIN_VALUE;return acc;}// 累加逻辑public void accumulate(Top2Accum acc, int value) {if (value > acc.first) {acc.second = acc.first;acc.first = value;} else if (value > acc.second) {acc.second = value;}}// 输出结果public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {out.collect(Tuple2.of(acc.first, 1));out.collect(Tuple2.of(acc.second, 2));}
    }
    
  3. 注册与调用
    tableEnv.createTemporarySystemFunction("Top2", Top2.class);
    // Table API 调用(SQL暂不支持)
    table.groupBy($("group")).flatAggregate(call(Top2.class, $("value")).as("val", "rank")).select($("group"), $("val"), $("rank"));
    

三、关键技巧

  1. 类型注解:复杂类型用@DataTypeHint指定,例如:

    @DataTypeHint("DECIMAL(12, 3)") // 声明 decimal 精度
    public BigDecimal eval(double a) { ... }
    
  2. 命名参数:通过@ArgumentHint指定参数名,支持 SQL 中按名传参:

    public String eval(@ArgumentHint(name = "content") String s,@ArgumentHint(name = "begin") int b
    ) { ... }
    // SQL 调用:SELECT func(content => 'abc', begin => 1)
    
  3. 确定性声明:非确定性函数(如随机数、当前时间)需重写:

    @Override
    public boolean isDeterministic() { return false; }
    

四、常见问题

  • 注册方式:临时注册(createTemporarySystemFunction)仅当前会话有效,永久注册需结合 Catalog
  • 权限控制:UDF 可访问外部资源(如数据库连接),需确保执行环境有对应权限
  • 性能优化:聚合函数尽量实现merge()方法,支持两阶段聚合优化

通过上述步骤,可快速实现各类自定义逻辑,扩展 Flink 处理能力。核心是理解不同函数的输入输出关系,以及累加器(聚合函数)的设计逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/88554.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/88554.shtml
英文地址,请注明出处:http://en.pswp.cn/web/88554.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【AI学习】大模型微调实践

参加了书生・浦语&#xff08;InternLM&#xff09;端侧小模型论文分类微调练习打榜赛 具体的实践教程在&#xff1a; https://aicarrier.feishu.cn/wiki/D7kZw9Nx4iMyDnkpL0Gc5giNn5g 折腾了十多天&#xff0c;各种尝试&#xff0c;AB榜单终于进入了前十都&#xff0c;累死 …

ElementUI:高效优雅的Vue.js组件库

Hi&#xff0c;我是布兰妮甜 &#xff01;在当今快节奏的前端开发领域&#xff0c;选择一个功能强大、设计优雅且易于使用的UI组件库至关重要。ElementUI作为基于Vue.js的知名组件库&#xff0c;凭借其丰富的组件体系、一致的设计语言和出色的开发体验&#xff0c;已成为众多企…

Java Stream流介绍及使用指南

背景在Java 8之前&#xff0c;处理集合数据&#xff08;如List, Set, Map&#xff09;通常意味着编写冗长的、以操作为中心的代码&#xff1a;创建迭代器、使用for或while循环遍历元素、在循环体内进行条件判断和操作、收集结果。这种方式虽然有效&#xff0c;但不够简洁、可读…

JDK 1.7 vs JDK 1.8

JDK版本比较 Java平台的两次重大飞跃&#xff1a;JDK 7的稳定优化与JDK 8的革命性创新引言&#xff1a;Java的进化之路Java作为企业级开发的支柱语言&#xff0c;其版本更新直接影响着全球数百万开发者。JDK 1.7&#xff08;2011年发布&#xff09;和JDK 1.8&#xff08;2014年…

张量与维度

3x4x5的张量&#xff1a; x torch.tensor([[[1, 2, 3, 4, 5], [6, 7, 8, 9, 10], [11, 12, 13, 14, 15], [16, 17, 18, 19, 20]], [[21, 22, 23, 24, 25], …

智慧菜场系统(源码+文档+讲解+演示)

引言 在数字化浪潮的推动下&#xff0c;传统菜市场也在寻求创新与变革。智慧菜场系统作为一种新型的菜市场管理工具&#xff0c;通过数字化手段优化菜市场的全流程&#xff0c;提高运营效率&#xff0c;增强消费者体验&#xff0c;提升市场管理质量。本文将详细介绍智慧菜场系统…

【GESP】C++一级真题 luogu-B4355 [GESP202506 一级] 值日

GESP C一级&#xff0c;2025年6月真题&#xff0c;基础运算和循环语句&#xff0c;难度★☆☆☆☆。 题目题解详见&#xff1a;【GESP】C一级真题 luogu-B4355 [GESP202506 一级] 值日 | OneCoder 【GESP】C一级真题 luogu-B4355 [GESP202506 一级] 值日 | OneCoderGESP C一级…

【Linux应用】Ubuntu20.04 aarch64开发板一键安装ROS2(清华源)

【Linux应用】Ubuntu20.04 aarch64开发板一键安装ROS2&#xff08;清华源&#xff09; 文章目录相关资料更改UTF8执行更新一键安装ROS2验证配置环境变量附录&#xff1a;开发板快速上手&#xff1a;镜像烧录、串口shell、外设挂载、WiFi配置、SSH连接、文件交互&#xff08;RAD…

【HDLBits习题 2】Circuit - Sequential Logic(4)More Circuits

1. Rule90&#xff08;Rule 90&#xff09;方法1&#xff1a;module top_module (output reg [511:0] q,input clk,input load,input [511:0] data ); integer i;always (posedge clk) beginif (load 1b1) beginq < data;end else beginfor (i0; i<$bits(q);…

基于mysqlfrm工具解析mysql数据结构文件frm表结构和数据库版本信息

这里使用Linux系统上操作。win上搞了下 python报错。所以在这里记录一下推荐大家使用linux系统操作。 安装mysql utilswget https://downloads.mysql.com/archives/get/p/30/file/mysql-utilities-1.6.5.tar.gztar -xf mysql-utilities-1.6.5.tar.gzcd mysql-utilities-1.6.5py…

【C++ 深入解析 C++ 模板中的「依赖类型」】

深入解析 C 模板中的「依赖类型」 依赖类型是 C 模板编程中的核心概念&#xff0c;特指那些依赖于模板参数的类型。迭代器是依赖类型的常见例子&#xff0c;但远不止于此。让我们全面解析这个重要概念&#xff1a; 依赖类型的本质定义 依赖类型是&#xff1a; 在模板中定义直接…

Telnet远程连接实验(Cisco)

Telnet远程连接实验&#xff08;Cisco&#xff09; 拓扑图一并实现DHCP服务、HTTP服务、FTP服务。 二层交换机配置&#xff1a; 交换机Switch0配置&#xff1a; vlan 10vlan 20int f0/1switchport mode accessswitchport access vlan 10int f0/2switchport mode accessswitchpo…

C++:非类型模板参数,模板特化以及模板的分离编译

目录 一、前言 二、非类型模板参数 三、模板的特化 3.1 类模板特化 3.11 全特化 3.12 偏特化 3.2 函数模板特化 3.3 注意 四、模板的分离编译 一、前言 前面的文章梳理了模板初阶的一些用法&#xff0c;在后面梳理了STL的一些容器的用法后&#xff0c;下面将用到含有S…

【Qt 学习之路】Qt Android开发环境搭建:Ubuntu的Vmware虚拟机中的踩坑实录

文章目录1、简介2、虚拟机内USB设备识别难题2.1、正确连接手机2.2、打开USB相关配置2.3、打开虚拟机中的手机设备3、Gradle下载速度缓慢之困3.1、下载 Gradle 镜像3.2、安放镜像位置3.3、修改项目中的gradle路径1、简介 许久未曾使用Qt进行Android开发&#xff0c;今日在Ubunt…

MySQL中使用group_concat遇到的问题及解决

在使用group_concat的过程中遇到个问题&#xff0c;这里记录一下&#xff1a;在MySQL中有个配置参数group_concat_max_len&#xff0c;它会限制使用group_concat返回的最大字符串长度&#xff0c;默认是1024。 查询group_concat_max_len大小&#xff1a; show variables like…

高性能小型爬虫语言与代码示例

高性能小型爬虫现在有哪几种新兴语言可以选择。我看到了很多关于爬虫框架的信息&#xff0c;特别是使用Go语言和Node.js的框架。Go语言方面有Kaola1和Katana2这两个框架。Kaola被描述为高性能的Go语言爬虫框架&#xff0c;轻量级且强大&#xff0c;提供灵活配置选项。 Node.js…

【PTA数据结构 | C语言版】在顺序表 list 中查找元素 x

本专栏持续输出数据结构题目集&#xff0c;欢迎订阅。 文章目录题目代码题目 请编写程序&#xff0c;将 n 个整数存入顺序表&#xff0c;对任一给定整数 x&#xff0c;查找其在顺序表中的位置。 输入格式&#xff1a; 输入首先在第一行给出正整数 n&#xff08;≤10^4 &#…

claude code-- 基于Claude 4 模型的智能编程工具,重塑你的编程体验

文章目录0.前言1.安装nodejs2.使用指南3.快速上手4.总结0.前言 最近的这个claudecode非常的火&#xff0c;因为可能是这个cursoe定价的一些原因吧&#xff0c;我是听其他的这个大佬说的&#xff0c;因为这个cursor其实我就是最开始的使用用过一下&#xff0c;现在基本上不使用…

HTTP API 身份认证

互联网系统通常需要根据用户身份决定是否有资源的访问权限&#xff0c;这就需要对用户进行身份认证&#xff08;Authentication&#xff09;&#xff0c;验证用户所声称的身份。验证手段通常是验证只有用户知道或拥有的东西&#xff0c;比如密码、手机号、指纹等。 基于浏览器…

Python毕业设计232—基于python+Django+vue的图书管理系统(源代码+数据库)

毕设所有选题&#xff1a; https://blog.csdn.net/2303_76227485/article/details/131104075 基于pythonDjangovue的图书管理系统(源代码数据库)232 一、系统介绍 本项目前后端分离&#xff0c;分为用户、管理员两种角色 1、用户&#xff1a; 注册、登录、新闻资讯、图书信…