flink本身是专注有状态的无限流处理,有限流处理【batch批次】是无限流处理的一中特殊情况!

应用场景

  • 实时ETL
集成流计算现有的诸多数据通道和SQL灵活的加工能力,对流式数据进行实时清洗、归并和结构化
处理;同时,对离线数仓进行有效的补充和优化,并为数据实时传输提供可计算通道。
  • 实时报表
实时化采集、加工流式数据存储;实时监控和展现业务、客户各类指标,让数据化运营实时化。
如通过分析订单处理系统中的数据获知销售增长率;
通过分析分析运输延迟原因或预测销售量调整库存;
  • 监控预警
对系统和用户行为进行实时监测和分析,以便及时发现危险行为,如计算机网络入侵、诈骗预警等
  • 在线系统
实时计算各类数据指标,并利用实时结果及时调整在线系统的相关策略,在各类内容投放、智能推
送领域有大量的应用,如在客户浏览商品的同时推荐相关商品等

flink主要角色

JobManager

协调分布式执行,它们用来调度task,协调检查点(CheckPoint),协调失败时恢复等

TaskManager

也称之为worker,主要职责是接收jobmanager协调的task,部署和启动任务,接收上游的数据并处理。同时向resourcemanager反注册自己的资源信息。

ResourceManager

管理集群资源,如Yarn

Dispatcher

作用:提供一个REST接口来让我们提交需要执行的应用。一旦一个应用提交执行,Dispatcher会启动一个JobManager,并将应用转交给他。
Dispatcher还会启动一个webUI来提供有关作业执行信息
注意:某些应用的提交执行的方式,有可能用不到Dispatcher
Task
一个完整的处理阶段(如map阶段),由多个相同功能的SubTask组成‌
SubTask
Task的并行实例,是实际执行的最小单元。例如设置并行度为3的map操作会产生3个SubTask‌
slot:
TaskManager中的资源格子,每个Slot有独立的内存分配(如4GB)‌

一个Slot可以运行多个SubTask(线程级共享)

不同Job的Slot内存隔离,但共享网络等资源‌

slot sharing: 

默认行为‌:同一Job的上下游SubTask可共享Slot(如map和filter挤在一个Slot)‌

优势‌:减少数据传输延迟(同Slot内直接内存交换)‌提高资源利用率(轻量级操作不独占Slot)

程序架构主要部分

source

本地集合:fromCollection(seq); fromElements();

文件:readTextFile(path);

socket:socketTextStream();

自定义:StreamExecutionEnvironment.addSource(sourceFunction),flink本身提供了许多源,也可以implements SourceFunction方法是为非并行源,或者为并行源 implements ParallelSourceFunction接口,或者extends RichParallelSourceFunction。

RichParallelSourceFunction与ParallelSourceFunction是Flink中用于实现自定义数据源的两种关键接口,主要区别如下:

  1. 功能扩展性

    • RichParallelSourceFunction继承了RichFunction,提供了open()close()方法,支持访问运行时上下文(如并行度、任务ID等),便于资源管理(如数据库连接)7。
    • ParallelSourceFunction仅标记接口,无额外方法,需自行实现资源管理逻辑2。
  2. 并行度支持

    • 两者均支持并行执行(通过实现ParallelSourceFunction标记接口)26。
    • 但RichParallelSourceFunction通过运行时上下文可动态分配数据分片(如MySQL分页查询)7。
  3. 状态管理

    • RichParallelSourceFunction可结合检查点机制实现状态一致性,适合有状态数据源(如偏移量记录)5。
    • ParallelSourceFunction需自行处理状态持久化4。
  4. 典型应用场景

    • RichParallelSourceFunction适用于需要复杂初始化或状态管理的场景(如连接外部系统)
    • ParallelSourceFunction适合简单无状态数据源(如内存集合)3。
  5. 实现复杂度

    • RichParallelSourceFunction需实现更多生命周期方法,但开发更规范6。
    • ParallelSourceFunction更轻量,但灵活性较低2。

总结:优先选择RichParallelSourceFunction以获得更完善的开发支持,仅在无状态且无需资源管理时考虑ParallelSourceFunction

  • 需要访问运行时上下文(如获取并行任务ID)25
  • 需管理资源(如数据库连接、文件句柄)310
  • 需结合检查点机制实现状态一致性(如Kafka偏移量记录)58
  • 典型场景:MySQL分页查询、Kafka消费者、分布式日志采集

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.configuration.Configuration
import java.sql.{Connection, DriverManager, ResultSet}class MySQLRichParallelSource extends RichParallelSourceFunction[String] {private var connection: Connection = _private var isRunning = trueoverride def open(parameters: Configuration): Unit = {// 初始化数据库连接connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "user", "password")}override def run(ctx: SourceFunction.SourceContext[String]): Unit = {val stmt = connection.createStatement()val rs = stmt.executeQuery("SELECT * FROM sensor_data")while (isRunning && rs.next()) {ctx.collect(rs.getString("value")) // 发射数据}}override def cancel(): Unit = isRunning = falseoverride def close(): Unit = {if (connection != null) connection.close()}
}
  • 简单无状态数据生成(如内存集合、随机数流)914
  • 无需资源初始化/清理的并行任务115
  • 典型场景:模拟传感器数据、内存集合并行读取

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
import scala.util.Randomclass RandomParallelSource extends ParallelSourceFunction[Int] {private var isRunning = trueoverride def run(ctx: SourceFunction.SourceContext[Int]): Unit = {val rand = new Random()while (isRunning) {ctx.collect(rand.nextInt(100)) // 生成0-100随机数Thread.sleep(500)}}override def cancel(): Unit = isRunning = false
}

transformation

  1. Map
    功能:对数据流中的每个元素进行一对一转换14

val dataStream: DataStream[Int] = env.fromElements(1, 2, 3) val mappedStream = dataStream.map(_ * 2) // 输出2,4,6

  1. FlatMap
    功能:将每个元素转换为0个、1个或多个输出元素17

val words = env.fromElements("hello world", "flink streaming") val splitWords = words.flatMap(_.split(" ")) // 输出各单词

  1. Filter
    功能:根据条件过滤数据元素24

val numbers = env.fromElements(1, 2, 3, 4) val evens = numbers.filter(_ % 2 == 0) // 输出2,4

  1. KeyBy
    功能:按指定key对数据进行分区25

case class Sensor(id: String, temp: Double) val sensors = env.fromElements(Sensor("s1", 35.2), Sensor("s2", 28.5)) val keyedStream = sensors.keyBy(_.id)

  1. Reduce
    功能:对分组数据流进行聚合操作59

keyedStream.reduce((s1, s2) => Sensor(s1.id, s1.temp + s2.temp)) // 相同id的温度累加

  1. Aggregations
    功能:内置聚合函数(sum/min/max等)25

keyedStream.sum("temp") // 按key求温度总和

  1. Window
    功能:在数据流上定义时间或计数窗口58

sensors.keyBy(_.id) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum("temp")

  1. Union
    功能:合并多个同类型数据流48

val stream1 = env.fromElements(1, 2) val stream2 = env.fromElements(3, 4) val merged = stream1.union(stream2) // 输出1,2,3,4

  1. Join
    功能:基于key连接两个数据流89

val streamA = env.fromElements(("a", 1), ("b", 2)) val streamB = env.fromElements(("a", 3), ("b", 4)) streamA.join(streamB) .where(_._1).equalTo(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply((a,b) => (a._1, a._2 + b._2)) // 输出("a",4),("b",6)

  1. CoFlatMap
    功能:连接两个流并共享状态8

val controlStream = env.fromElements("HIGH", "LOW") val dataStream = env.fromElements(1.2, 3.4, 5.6) controlStream.connect(dataStream) .flatMap( (ctrl: String, out: Collector[Double]) => {}, (value: Double, out: Collector[Double]) => { if (currentThreshold == "HIGH") out.collect(value) } )

  1. ProcessFunction
    功能:提供对时间和状态的底层访问

class TempAlertFunction extends ProcessFunction[Sensor, String] { override def processElement( sensor: Sensor, ctx: ProcessFunction[Sensor, String]#Context, out: Collector[String]): Unit = { if (sensor.temp > 100) { out.collect(s"Alert! ${sensor.id} temp=${sensor.temp}") } } }

  1. Iterate
    功能:创建迭代流处理循环

val numbers = env.fromElements(1, 2, 3, 4) val iterated = numbers.iterate( iteration => { val minusOne = iteration.map(_ - 1) val stillGreaterThanZero = minusOne.filter(_ > 0) val lessThanZero = minusOne.filter(_ <= 0) (stillGreaterThanZero, lessThanZero) } )

  1. Window Apply
    功能:对窗口数据应用自定义函数

sensors.keyBy(_.id) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply { (key, window, vals, out: Collector[String]) => out.collect(s"$key: ${vals.map(_.temp).sum}") }

  1. Side Output
    功能:将数据分流到侧输出流

val outputTag = OutputTag[String]("side-output") val mainStream = numbers.process(new ProcessFunction[Int, Int] { override def processElement( value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = { if (value % 2 != 0) { ctx.output(outputTag, s"Odd: $value") } out.collect(value) } }) val sideStream = mainStream.getSideOutput(outputTag)

  1. Broadcast
    功能:将流广播到所有并行任务8

val ruleStream = env.fromElements("rule1", "rule2").broadcast val dataStream = env.fromElements(1, 2, 3) dataStream.connect(ruleStream) .process(new BroadcastProcessFunction[Int, String, String] { override def processElement( value: Int, ctx: BroadcastProcessFunction[Int, String, String]#ReadOnlyContext, out: Collector[String]): Unit = { val rules = ctx.getBroadcastState(...) // 使用广播规则处理数据 } })

完整项目实现示例:


import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collectorobject FlinkOperatorsDemo {case class SensorReading(id: String, timestamp: Long, temperature: Double)def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 1. 数据源val sensorData = env.fromElements(SensorReading("sensor1", 1L, 35.6),SensorReading("sensor2", 2L, 28.3),SensorReading("sensor1", 3L, 37.2))// 2. 算子演示val processed = sensorData.filter(_.temperature > 30)  // Filter.map(r => (r.id, r.temperature))  // Map.keyBy(_._1)  // KeyBy.timeWindow(Time.seconds(5))  // Window.reduce((r1, r2) => (r1._1, r1._2 + r2._2))  // Reduceprocessed.print()env.execute("Flink Operators Demo")}
}

代码说明:

  1. 包含Flink核心算子链式调用演示
  2. 使用case class定义数据类型
  3. 展示从数据源到窗口聚合的完整流程

其他重要算子补充说明:

  • Fold‌:已弃用,推荐使用Reduce
  • WindowAll‌:非分组全局窗口
  • Project‌:选择部分字段(仅DataSet API)
  • Cross‌:两个流的笛卡尔积
  • CoGroup‌:分组连接两个数据集

sink

文件: writeAsText(path)

HDFS: 


import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import java.time.ZoneIdobject FlinkToHdfs {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 1. 创建测试数据源val dataStream = env.fromElements("2025-08-20 15:00:00,user1,click","2025-08-20 15:01:00,user2,purchase","2025-08-20 15:02:00,user3,view")// 2. 配置HDFS输出路径val hdfsPath = "hdfs://namenode:9000/flink/output"// 3. 创建StreamingFileSinkval sink = StreamingFileSink.forRowFormat(new Path(hdfsPath),new SimpleStringEncoder[String]("UTF-8")).withBucketAssigner(new DateTimeBucketAssigner[String]("yyyy-MM-dd--HH",ZoneId.of("UTC"))).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(60 * 60 * 1000) // 1小时滚动一次.withInactivityInterval(15 * 60 * 1000) // 15分钟不活动则滚动.withMaxPartSize(128 * 1024 * 1024) // 128MB.build()).build()// 4. 添加sink到数据流dataStream.addSink(sink)env.execute("Flink to HDFS Example")}
}

kafka: FlinkKafkaProducer

redis:RedisSink

hbase: 

自定义:SinkToMySql extends RichSinkFunction

Chain

Operator Chain

数据传输

Flink在处理任务间的数据传输过程中,采用了缓冲区机制。

Yarn部署机制

(1)启动一个YARN yarn-session.sh -h        
/export/servers/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
# -n 表示申请2个容器,这里指的就是多少个taskmanager
# -s 表示每个TaskManagerslots数量
# -tm 表示每个TaskManager的内存大小
# -d 表示以后台程序方式运行
如果不想让Flink YARN客户端始终运行,那么也可以启动分离的 YARN会话。该参数被称为-d--detached。在这种情况下,Flink YARN客户端只会将Flink提交给集群,然后关闭它自己
(2)直接在YARN上提交运行Flink作业(Run a Flink job on YARN)

bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /export/servers/flink/examples/batch/WordCount.jar

# -m jobmanager的地址

# -yn 表示TaskManager的个数
停止yarn-cluster
yarn application -kill application_1527077715040_0003
rm -rf /tmp/.yarn-properties-root

窗口

通俗讲,Window是用来对一个无限的流设置一个有限的集合,从而在有界的数据集上进行操作的一种机制。流上的集合由Window来划定范围,比如计算过去10分钟或者最后50个元素的和
分为:
时间窗口(TimeWindow)
数量窗口(CountWindow)

滚动窗口

时间驱动:keyedStream.timeWindow(Time.seconds(10))
数量驱动:keyedStream.countWindow(3)

滑动窗口

时间驱动:keyedStream.timeWindow(Time.seconds(10), Time.seconds(5))
数量驱动:keyedStream.countWindow(3, 2)

会话窗口

keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))

全局窗口

EventTime[事件时间]

事件发生的时间,例如:点击网站上的某个链接的时间,每一条日志都会记录自己的生成时间。如果以EventTime为基准来定义时间窗口那将形成EventTimeWindow,要求消息本身就应该携带EventTime。如果要使用事件时间,要设置:env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置使用事件时间

IngestionTime[摄入时间]

数据进入Flink的时间,如某个Flink节点的source operator接收到数据的时间,例如:某个source消费到kafka中的数据。如果以IngesingtTime为基准来定义时间窗口那将形成IngestingTimeWindow,sourcesystemTime为准

ProcessingTime[处理时间]

某个Flink节点执行某个operation的时间,例如:timeWindow处理数据时的系统时间,默认的时间属性就是Processing Time如果以ProcessingTime基准来定义时间窗口那将形成ProcessingTimeWindow,以operator的systemTime为准

水印(水位线)

水位线是为了控制这个事件能不能被处理,如果在水位线允许的事件范围之外,肯定是不会被当前窗口处理的。emitWatermark(当前最大时间 - 乱序允许时间)这个方法200ms周期执行,就是告诉框架只能提交这个时间及之后的时间的事件过来。 当水位线超过窗口结束时间时,对应的窗口才会被触发计算。这种机制确保了:即使数据乱序到达,只要在允许的时间范围内,仍然能被正确处理;超过容忍时间的迟到数据会被丢弃。

窗口的创建是根据事件时间来的,是独立的任务来创建,框架会根据事件时间,将当前事件归到那个窗口。水位线是触发窗口计算,跟窗口创建没关系。

例如:窗口[10:00:00 - 10:00:10),当窗口接收到第一条10:00:10事件,就会创建下一个[10:00:10- 10:00:20),但是这个时候因为水位线,上一个窗口还没被触发计算,只有当水位线时间 >= 10:00:10时,才会被触发计算。

水位线生成器: 事件处理方法,给下游发送水位时间方法(200ms执行一次)

事件时间提取器:提取事件时间

new WatermarkStrategy[OrderDetail] {override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[OrderDetail] = {new WatermarkGenerator[OrderDetail] {//每来一条数据,都会调用onEventvar maxTimestamp = Long.MinValuevar maxOutOfOrderness = 500L;override def onEvent(event: OrderDetail, eventTimestamp: Long, output: WatermarkOutput): Unit = {maxTimestamp = Math.max(maxTimestamp, format.parse(event.orderCreateTime).getTime)}override def onPeriodicEmit(output: WatermarkOutput): Unit = {output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness))}}}//1.老版本  2. lambda}.withTimestampAssigner(new SerializableTimestampAssigner[OrderDetail] {override def extractTimestamp(element: OrderDetail, recordTimestamp: Long): Long = {format.parse(element.orderCreateTime).getTime}}
  1. 整体流程‌:

    • 数据流中每个事件会先经过extractTimestamp提取时间戳
    • 然后触发onEvent方法处理
    • 最后周期性调用onPeriodicEmit生成水位线
  2. 方法调用时机详解‌:

    (1) extractTimestamp方法:

    • 调用时机‌:每条数据到达时立即调用
    • 作用‌:就像快递员拆包裹看发货日期,从数据中提取事件时间戳
    • 示例‌:MyEvent("A", 1630000000000)会提取出1630000000000

    (2) onEvent方法:

    • 调用时机‌:紧接在extractTimestamp之后
    • 作用‌:像记录最高水位的标尺,比较并保存当前最大时间戳
    • 示例‌:当收到时间戳1000的事件,会更新currentMaxTimestamp = 1000

    (3) onPeriodicEmit方法:

    • 调用时机‌:默认每200ms自动调用一次(类似心跳机制)
    • 作用‌:发出水位线 = 当前最大时间戳 - 允许乱序时间
    • 示例‌:如果currentMaxTimestamp=5000maxOutOfOrderness=3000,则发出水位线2000
  3. 参数说明‌:

    • maxOutOfOrderness:相当于"宽容度",设置允许迟到数据的最长时间(如设为3000表示允许3秒内的迟到数据)
    • currentMaxTimestamp:动态变化的变量,始终记录已见数据的最大时间戳
  4. 工作类比‌:
    想象老师在批改时间乱序提交的作业:

    • extractTimestamp:查看每份作业的提交日期
    • onEvent:记录目前看到的最晚提交日期
    • onPeriodicEmit:每隔一段时间宣布:"现在开始只接受比(最晚日期-3天)更早的作业"
迟到数据处理方法:允许延迟,侧输出流,自定义触发器,全局窗口+延迟合并

迟到数据的处理机制

1. ‌允许延迟(Allowed Lateness)

这是最常用的方式,‌扩展窗口等待时间‌:

javaCopy Code

.window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(5)) // 允许5秒延迟

  • 运作方式‌:
    • 水位线到达 窗口结束时间 时触发‌初次计算
    • 在 [窗口结束时间, 窗口结束时间+允许延迟] 区间内:
      • 新到达的属于该窗口的数据‌重新触发窗口计算
      • 窗口状态持续更新
  • 示例‌:

    mermaidCopy Code

    timeline title 10秒窗口 + 5秒允许延迟 section 事件时间 00:13 : 水位线到达00:10 → 触发初次计算 00:14 : 迟到数据(事件时间00:09)到达 → 重新触发计算 00:15 : 迟到数据(事件时间00:08)到达 → 再次触发计算 00:16 : 水位线到达00:11 → 窗口正式关闭

2. ‌侧输出流(Side Output)

捕获‌超过允许延迟的迟到数据‌:

javaCopy Code

OutputTag<Event> lateTag = new OutputTag<>("late-data"); DataStream<Event> main = stream .keyBy(...) .window(...) .allowedLateness(Time.seconds(3)) .sideOutputLateData(lateTag) // 捕获超时迟到数据 .process(...); // 获取迟到数据流 DataStream<Event> lateStream = main.getSideOutput(lateTag);

  • 应用场景‌:
    • 关键业务数据补全(如金融交易)
    • 延迟监控和告警
    • 数据质量分析
3. ‌触发器(Trigger)自定义

精细控制计算触发逻辑:

javaCopy Code

.trigger(new ContinuousEventTimeTrigger(20) { @Override public TriggerResult onElement(...) { // 自定义迟到数据处理逻辑 if (isLateData(element)) { updateWindowState(element); // 手动更新窗口状态 return TriggerResult.FIRE; // 立即触发计算 } return super.onElement(...); } })

  • 适用场景‌:
    • 特殊业务规则(如证券交易中的尾单处理)
    • 动态延迟策略(基于数据量调整)
4. ‌全局窗口(GlobalWindow)+ 延迟合并

处理‌无界迟到数据‌:

javaCopy Code

.window(GlobalWindows.create()) .trigger(PurgingTrigger.of( new LateDataFireTrigger() // 自定义处理所有迟到数据 ))


迟到数据处理流程

mermaidCopy Code

flowchart TD A[数据到达] --> B{事件时间是否在<br/>当前水位线之前?} B -->|是| C[迟到数据] B -->|否| D[正常处理] C --> E{是否在允许延迟内?} E -->|是| F[更新窗口状态并重新触发计算] E -->|否| G{是否配置侧输出?} G -->|是| H[输出到侧输出流] G -->|否| I[直接丢弃]


实际案例:电商订单统计

场景‌:统计每10分钟的订单总额,容忍3秒延迟,超时订单单独记录

javaCopy Code

// 定义迟到标签 OutputTag<Order> lateOrders = new OutputTag<>("late-orders"); SingleOutputStreamOperator<Double> result = orders .keyBy(Order::getCategory) .window(TumblingEventTimeWindows.of(Time.minutes(10))) .allowedLateness(Time.seconds(3)) .sideOutputLateData(lateOrders) .aggregate(new OrderSumAggregator()); // 处理主结果 result.print(); // 处理超时订单 result.getSideOutput(lateOrders) .map(order -> "LATE_ORDER: " + order) .addSink(new LateOrderSink());

数据处理效果‌:

数据事件时间处理水位线处理方式
12:15:0712:15:10进入正常窗口 [12:10,12:20)
12:19:5812:20:05允许延迟期内 → 更新窗口结果
12:20:0212:20:08允许延迟期内 → 再次更新结果
12:20:0512:20:13超过延迟期 → 输出到侧输出流

最佳实践建议

  1. 允许延迟设置‌:

    • 一般设为网络延迟峰值的 ‌2-3倍‌(如Kafka延迟监控的P99值)
    • 不宜过大(避免状态膨胀)
  2. 监控配置‌:

    javaCopy Code

    // 监控迟到数据比例 lateDataStream .map(_ => 1L).windowAll(TumblingProcessingTimeWindows.of(Time.hours(1))) .sum(0) .addSink(new LateDataMetricSink());

  3. 状态清理优化‌:

    javaCopy Code

    .withLateFiredPurgingTrigger() // 及时清理已完成窗口状态

  4. 动态延迟策略‌(高级):

    javaCopy Code

    .allowedLateness(ctx => { if (ctx.element().isPriority()) return Time.minutes(5); return Time.seconds(30); })

💡 ‌核心原则‌:水位线决定‌何时触发计算‌,允许延迟决定‌等待迟到数据多久‌,侧输出决定‌超时数据的归宿‌。三者协同实现完整的迟到数据处理机制

状态

Flink中的状态(State)是指算子任务在运行过程中需要记住的信息,这些信息可以用来实现复杂的有状态计算。状态是Flink实现精确一次(exactly-once)语义和故障恢复的基础。

keyed state 键控状态 只能用于keyedStream上

ValueState,MapState,ReducingState,AggregatingState


import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._object KeyedStateExamples {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 示例数据流val input = env.fromElements(("user1", 10),("user2", 5),("user1", 20),("user3", 8),("user2", 15))// 1. ValueState示例val valueStateResult = input.keyBy(_._1).map(new ValueStateExample).print("ValueState结果")// 2. ListState示例val listStateResult = input.keyBy(_._1).map(new ListStateExample).print("ListState结果")// 3. MapState示例val mapStateResult = input.keyBy(_._1).map(new MapStateExample).print("MapState结果")// 4. ReducingState示例val reducingStateResult = input.keyBy(_._1).map(new ReducingStateExample).print("ReducingState结果")// 5. AggregatingState示例val aggregatingStateResult = input.keyBy(_._1).map(new AggregatingStateExample).print("AggregatingState结果")env.execute("Keyed State Examples")}
}// 1. ValueState实现
class ValueStateExample extends RichMapFunction[(String, Int), (String, Int)] {private var state: ValueState[Int] = _override def open(parameters: Configuration): Unit = {val stateDesc = new ValueStateDescriptor[Int]("valueState", TypeInformation.of(classOf[Int]))state = getRuntimeContext.getState(stateDesc)}override def map(value: (String, Int)): (String, Int) = {val current = Option(state.value()).getOrElse(0)val newValue = current + value._2state.update(newValue)(value._1, newValue)}
}// 2. ListState实现
class ListStateExample extends RichMapFunction[(String, Int), (String, List[Int])] {private var state: ListState[Int] = _override def open(parameters: Configuration): Unit = {val stateDesc = new ListStateDescriptor[Int]("listState", TypeInformation.of(classOf[Int]))state = getRuntimeContext.getListState(stateDesc)}override def map(value: (String, Int)): (String, List[Int]) = {state.add(value._2)(value._1, state.get().asScala.toList)}
}// 3. MapState实现
class MapStateExample extends RichMapFunction[(String, Int), (String, Map[String, Int])] {private var state: MapState[String, Int] = _override def open(parameters: Configuration): Unit = {val stateDesc = new MapStateDescriptor[String, Int]("mapState", TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int]))state = getRuntimeContext.getMapState(stateDesc)}override def map(value: (String, Int)): (String, Map[String, Int]) = {state.put(value._1, value._2)(value._1, state.entries().asScala.map(e => e.getKey -> e.getValue).toMap)}
}// 4. ReducingState实现
class ReducingStateExample extends RichMapFunction[(String, Int), (String, Int)] {private var state: ReducingState[Int] = _override def open(parameters: Configuration): Unit = {val stateDesc = new ReducingStateDescriptor[Int]("reducingState",(a: Int, b: Int) => a + b,  // ReduceFunctionTypeInformation.of(classOf[Int]))state = getRuntimeContext.getReducingState(stateDesc)}override def map(value: (String, Int)): (String, Int) = {state.add(value._2)(value._1, state.get())}
}// 5. AggregatingState实现
class AggregatingStateExample extends RichMapFunction[(String, Int), (String, Double)] {private var state: AggregatingState[Int, Double] = _override def open(parameters: Configuration): Unit = {val stateDesc = new AggregatingStateDescriptor[Int, (Int, Int), Double]("aggregatingState",new AverageAggregate,TypeInformation.of(classOf[(Int, Int)]),TypeInformation.of(classOf[Double]))state = getRuntimeContext.getAggregatingState(stateDesc)}override def map(value: (String, Int)): (String, Double) = {state.add(value._2)(value._1, state.get())}
}// 用于AggregatingState的聚合函数
class AverageAggregate extends AggregateFunction[Int, (Int, Int), Double] {override def createAccumulator(): (Int, Int) = (0, 0)override def add(value: Int, accumulator: (Int, Int)): (Int, Int) = {(accumulator._1 + value, accumulator._2 + 1)}override def getResult(accumulator: (Int, Int)): Double = {accumulator._1.toDouble / accumulator._2}override def merge(a: (Int, Int), b: (Int, Int)): (Int, Int) = {(a._1 + b._1, a._2 + b._2)}
}

算子状态


import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.source.SourceFunctionclass BufferingSource(threshold: Int) extends SourceFunction[String] with CheckpointedFunction {private var isRunning = trueprivate var bufferedElements = List[String]()private var checkpointedState: ListState[String] = _override def run(ctx: SourceFunction.SourceContext[String]): Unit = {while (isRunning) {bufferedElements ::= s"element-${System.currentTimeMillis()}"if (bufferedElements.size >= threshold) {bufferedElements.reverse.foreach(ctx.collect)bufferedElements = Nil}Thread.sleep(100)}}override def snapshotState(context: FunctionSnapshotContext): Unit = {checkpointedState.update(bufferedElements.asJava)}override def initializeState(context: FunctionInitializationContext): Unit = {checkpointedState = context.getOperatorStateStore.getListState(new ListStateDescriptor[String]("buffered-elements", classOf[String]))if (context.isRestored) {bufferedElements = checkpointedState.get().asScala.toList}}
}

class UnionListSource extends SourceFunction[Int] with CheckpointedFunction {private var checkpointedState: ListState[Int] = _private var toEmit = (1 to 100).toListoverride def run(ctx: SourceFunction.SourceContext[Int]): Unit = {toEmit.foreach { num =>ctx.collect(num)Thread.sleep(10)}}override def snapshotState(context: FunctionSnapshotContext): Unit = {checkpointedState.update(toEmit.asJava)}override def initializeState(context: FunctionInitializationContext): Unit = {checkpointedState = context.getOperatorStateStore.getUnionListState(new ListStateDescriptor[Int]("union-state", classOf[Int]))if (context.isRestored) {toEmit = checkpointedState.get().asScala.toList}}
}

import org.apache.flink.api.common.state.{MapStateDescriptor, BroadcastState}
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collectorclass DynamicFilterFunction extends BroadcastProcessFunction[String, String, String] {private final val ruleDescriptor = new MapStateDescriptor[String, String]("rules", classOf[String], classOf[String])override def processElement(value: String,ctx: ReadOnlyContext,out: Collector[String]): Unit = {val rule = ctx.getBroadcastState(ruleDescriptor).get("filter")if (rule == null || value.contains(rule)) {out.collect(value)}}override def processBroadcastElement(rule: String,ctx: Context,out: Collector[String]): Unit = {ctx.getBroadcastState(ruleDescriptor).put("filter", rule)}
}

广播状态

状态存储

状态生存时间

CEP

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/919671.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/919671.shtml
英文地址,请注明出处:http://en.pswp.cn/news/919671.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git 2.15.0 64位安装步骤Windows详细教程从下载到验证(附安装包下载)

一、下载后双击运行 安装包下载&#xff1a;https://pan.quark.cn/s/7200b32a1ecf&#xff0c;找到下载好的文件&#xff1a;​Git-2.15.0-64-bit.exe​双击这个文件&#xff0c;就会弹出安装向导窗口&#xff0c;点 ​​“Next”&#xff08;下一步&#xff09;​​ 二、选择…

在职老D渗透日记day23:sqli-labs靶场通关(第29关-31关)http参数过滤

5.29.第29关 http参数过滤 闭合5.29.1.手动注入&#xff08;1&#xff09;判断注入类型、注入点闭合&#xff08;2&#xff09;有回显&#xff0c;优先用联合查询注入&#xff0c;判读字段数?id1&id2 order by 3 -- ?id1&id2 order by 4 --&#xff08;3&#xff09;…

Spring Boot整合Amazon SNS实战:邮件订阅通知系统开发

Spring Boot整合Amazon SNS实战引言配置服务总结新用户可获得高达 200 美元的服务抵扣金 亚马逊云科技新用户可以免费使用亚马逊云科技免费套餐&#xff08;Amazon Free Tier&#xff09;。注册即可获得 100 美元的服务抵扣金&#xff0c;在探索关键亚马逊云科技服务时可以再额…

LeetCode_动态规划1

动态规划1.动态规划总结1.1 01背1.1.1 二维数组1.1.2 一维数组1.2 完全背包2.斐波那契数(力扣509)3.爬楼梯(力扣70)4.使用最小花费爬楼梯(力扣746)5.不同路径(力扣62)6.不同路径 II(力扣63)7.整数拆分(力扣343)8.不同的二叉搜索树(力扣96)9.分割等和子集(力扣416)10.最后一块石…

【STM32】HAL库中的实现(九):SPI(串行外设接口)

SPI 接口通信原理 SPI&#xff08;Serial Peripheral Interface&#xff09;是全双工主从通信协议&#xff0c;特点是&#xff1a; 信号线功能SCK串行时钟MOSI主设备输出&#xff0c;从设备输入MISO主设备输入&#xff0c;从设备输出CS&#xff08;NSS&#xff09;片选信号&am…

Git常用操作大全(附git操作命令)

Git常用操作大全 一、基础配置 1.1 设置用户名和邮箱 git config --global user.name "你的名字" git config --global user.email "你的邮箱"1.2 查看配置 git config --list二、仓库管理 2.1 初始化本地仓库 git init2.2 克隆远程仓库 git clone <仓库…

详解flink table api基础(三)

文章目录1.使用flink的原因&#xff1a;2. Flink支持两种模式&#xff1a;3. flink table api工作原理&#xff1a;4. Flink table api 使用5. select语句&flink table api&#xff1a;6. 使用flink table api 创建table7. 使用flink table api 写流式数据输出到表或sink8.…

Vue2+Vue3前端开发_Day5

参考课程: 【黑马程序员 Vue2Vue3基础入门到实战项目】 [https://www.bilibili.com/video/BV1HV4y1a7n4] ZZHow(ZZHow1024) 自定义指令 基本语法&#xff08;全局 & 局部注册&#xff09; 介绍&#xff1a;自己定义的指令&#xff0c;可以封装一些 DOM 操作&#xff0c…

机器学习--决策树2

目录 第一代裁判&#xff1a;ID3 与信息增益的 “偏爱” 第二代裁判&#xff1a;C4.5 用 “增益率” 找平衡 第三代裁判&#xff1a;CART 的 “基尼指数” 新思路 遇到连续值&#xff1f;先 “砍几刀” 再说 给决策树 “减肥”&#xff1a;剪枝的学问 动手试试&#xff1…

yggjs_react使用教程 v0.1.1

yggjs_react是一个用于快速创建React项目的工具&#xff0c;它集成了Vite、TypeScript、Zustand和React Router等现代前端技术栈&#xff0c;帮助开发者快速搭建高质量的React应用。 快速入门 快速入门部分将指导您如何安装yggjs_react工具、创建新项目并启动开发服务器。 安…

vulhub可用的docker源

这一块不太容易找&#xff0c;我试了好几个源&#xff0c;下面是20250820测试可用源 编辑方法sudo mkdir -p /etc/docker sudo vim /etc/docker/daemon.json 配置内容 [1] {"registry-mirrors" : ["https://docker.registry.cyou", "https://docker-…

基于YOLOv8-SEAttention与LLMs融合的农作物害虫智能诊断与防控决策系统

1. 引言 1.1 研究背景与意义 农作物虫害是制约农业产量与质量的重要因素。据FAO报告&#xff0c;全球每年因病虫害造成的粮食损失高达 20%–40%。传统人工巡查与经验诊断具有时效性差、成本高与专业人才不足等缺陷。近年来&#xff0c;计算机视觉特别是目标检测技术在农业检测…

从零开始构建GraphRAG红楼梦知识图谱问答项目(三)

文章结尾有CSDN官方提供的学长的联系方式&#xff01;&#xff01; 欢迎关注B站从零开始构建一个基于GraphRAG的红楼梦项目 第三集01 搭建后端服务 创建一个python文件server.py 完整源码放到文章最后了。 1.1 graphrag 相关导入 # GraphRAG 相关导入 from graphrag.query.cont…

S32K328(Arm Cortex-M7)适配CmBacktrace错误追踪

CmBacktrace 相当于重写了hard_fault函数&#xff0c;在hard_fault函数里面去分析SCB寄存器的信息和堆栈信息&#xff0c;然后把这些信息打印出来(或者写到flash)&#xff1b;通过使用串口输出产生hard_fault的堆栈信息&#xff0c;然后利用addr2line工具反推出具体的代码执行函…

AI研究引擎的简单技术实现步骤

产品愿景与核心功能 1.1 产品使命 “洞见 Weaver”是一个全栈AI Web应用,旨在将用户的复杂研究问题,通过AI驱动的动态思维导图和结构化报告,转化为一次沉浸式的、可追溯的视觉探索之旅。我们的使命是,将AI复杂的推理过程透明化,将人类的探索直觉与AI的分析能力无缝结合,…

open webui源码分析5-Tools

本文从最简单的时间工具入手&#xff0c;分析Tools相关的代码。一、安装工具git clone https://github.com/open-webui/openapi-servers cd openapi-servers# 进入时间工具目录 cd servers/timepip install -r requirements.txt# 启动服务 uvicorn main:app --host 0.0.0.0 --r…

windows下通过vscode远程调试linux c/cpp程序配置

windows下通过vscode远程调试linux c/cpp程序配置vscode插件配置linux依赖工具安装launch.json配置vscode插件配置 CodeLLDB插件需要提前下载&#xff1a; linux依赖工具安装 sudo apt update sudo apt install cmake clangdlaunch.json配置 {"version": "0…

IDEA报JDK版本问题

解决思路&#xff1a;1.找到配置jdk的IDEA配置位置settings和project structure2.先配置setting3.再修改项目结构

VirtualBox 安装 Ubuntu Server 系统及 Ubuntu 初始配置

文章目录简介VirtualBoxUbuntu Server 简介Ubuntu Server 下载安装 Ubuntu Server首选项配置导入系统镜像配置系统用户配置内存 CPU 虚拟硬盘开始安装 Ubuntu安装完成登录系统配置网络Ubuntu 系统配置安装常用工具安装 SSH设置 root 密码配置 IP 地址&#xff08;推荐自动分配I…

Milvus 可观测性最佳实践

Milvus 介绍 Milvus 是一个开源的向量数据库&#xff0c;专为处理大规模、高维度向量数据而设计&#xff0c;广泛应用于人工智能、推荐系统、图像检索、自然语言处理等场景。它支持亿级向量的高效存储与快速检索&#xff0c;内置多种相似度搜索算法&#xff08;如 HNSW、IVF、…