一、UDF 核心原理
Flink 自定义函数(UDF)是扩展 Table API/SQL 能力的核心机制,允许将自定义逻辑嵌入查询。其设计遵循以下原则:
1. 函数类型体系
类型 | 输入输出关系 | 核心用途 |
---|---|---|
标量函数(ScalarFunction) | 0~N 个标量 → 1 个标量 | 字段转换、值计算 |
表值函数(TableFunction) | 0~N 个标量 → 多行多列 | 数据拆分、关联外部数据 |
聚合函数(AggregateFunction) | 多行标量 → 1 个标量 | 自定义聚合(如加权平均) |
表值聚合函数(TableAggregateFunction) | 多行标量 → 多行多列 | 分组TopN、分桶统计等 |
异步表值函数 | 异步查询外部系统 → 多行多列 | 高效关联外部数据库/API |
2. 类型系统
- 标量/表值函数使用新数据类型系统(基于
DataTypes
) - 聚合函数仍使用旧类型系统(基于
TypeInformation
) - 类型推导:默认通过反射获取,复杂场景可通过
@DataTypeHint
或@FunctionHint
注解显式指定
3. 执行逻辑
- 核心是求值方法(如
eval()
、accumulate()
),定义数据处理逻辑 - 生命周期:
open()
初始化 → 求值方法调用 →close()
资源清理 - 确定性:通过
isDeterministic()
声明是否返回确定结果(影响优化策略)
二、快速上手实战
1. 标量函数(ScalarFunction)
作用:对输入标量做转换计算(如字符串处理、格式转换)
实现步骤:
- 继承
ScalarFunction
,实现eval()
方法public class HashFunction extends ScalarFunction {// 输入任意类型,返回哈希值public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.hashCode();} }
- 注册与调用
// 注册 tableEnv.createTemporarySystemFunction("HashFunc", HashFunction.class); // Table API 调用 table.select(call("HashFunc", $("field"))); // SQL 调用 tableEnv.sqlQuery("SELECT HashFunc(field) FROM t");
2. 表值函数(TableFunction)
作用:将单行输入拆分为多行输出(如字符串按分隔符拆分)
实现步骤:
- 继承
TableFunction<T>
,通过collect()
输出结果@FunctionHint(output = @DataTypeHint("ROW<word STRING, len INT>")) public class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(" ")) {collect(Row.of(s, s.length())); // 输出每行数据}} }
- 注册与调用
tableEnv.createTemporarySystemFunction("SplitFunc", SplitFunction.class); // 关联查询(LATERAL JOIN) tableEnv.sqlQuery("""SELECT t.id, s.word, s.len FROM t, LATERAL TABLE(SplitFunc(t.content)) AS s(word, len) """);
3. 聚合函数(AggregateFunction)
作用:多行数据聚合为单个值(如自定义平均值、求和逻辑)
实现步骤:
- 定义累加器(存储中间结果)
public class WeightedAvgAccum {public long sum = 0; // 加权和public int count = 0; // 权重总和 }
- 继承
AggregateFunction
,实现核心方法public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {@Overridepublic WeightedAvgAccum createAccumulator() { return new WeightedAvgAccum(); }// 累加逻辑public void accumulate(WeightedAvgAccum acc, long value, int weight) {acc.sum += value * weight;acc.count += weight;}// 最终结果计算@Overridepublic Long getValue(WeightedAvgAccum acc) {return acc.count == 0 ? null : acc.sum / acc.count;} }
- 注册与调用
tableEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class); tableEnv.sqlQuery("""SELECT user, WeightedAvg(score, weight) FROM scores GROUP BY user """);
4. 表值聚合函数(TableAggregateFunction)
作用:多行数据聚合为多行结果(如分组取TopN)
实现步骤:
- 定义累加器(存储中间状态)
public class Top2Accum {public int first; // 第一名public int second; // 第二名 }
- 继承
TableAggregateFunction
,实现核心方法public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {@Overridepublic Top2Accum createAccumulator() {Top2Accum acc = new Top2Accum();acc.first = Integer.MIN_VALUE;acc.second = Integer.MIN_VALUE;return acc;}// 累加逻辑public void accumulate(Top2Accum acc, int value) {if (value > acc.first) {acc.second = acc.first;acc.first = value;} else if (value > acc.second) {acc.second = value;}}// 输出结果public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {out.collect(Tuple2.of(acc.first, 1));out.collect(Tuple2.of(acc.second, 2));} }
- 注册与调用
tableEnv.createTemporarySystemFunction("Top2", Top2.class); // Table API 调用(SQL暂不支持) table.groupBy($("group")).flatAggregate(call(Top2.class, $("value")).as("val", "rank")).select($("group"), $("val"), $("rank"));
三、关键技巧
-
类型注解:复杂类型用
@DataTypeHint
指定,例如:@DataTypeHint("DECIMAL(12, 3)") // 声明 decimal 精度 public BigDecimal eval(double a) { ... }
-
命名参数:通过
@ArgumentHint
指定参数名,支持 SQL 中按名传参:public String eval(@ArgumentHint(name = "content") String s,@ArgumentHint(name = "begin") int b ) { ... } // SQL 调用:SELECT func(content => 'abc', begin => 1)
-
确定性声明:非确定性函数(如随机数、当前时间)需重写:
@Override public boolean isDeterministic() { return false; }
四、常见问题
- 注册方式:临时注册(
createTemporarySystemFunction
)仅当前会话有效,永久注册需结合 Catalog - 权限控制:UDF 可访问外部资源(如数据库连接),需确保执行环境有对应权限
- 性能优化:聚合函数尽量实现
merge()
方法,支持两阶段聚合优化
通过上述步骤,可快速实现各类自定义逻辑,扩展 Flink 处理能力。核心是理解不同函数的输入输出关系,以及累加器(聚合函数)的设计逻辑。