一、引言

Flink 作为一款强大的流处理框架,在其中扮演着关键角色。今天,咱们来聊聊 Flink 中一个极为重要的概念 —— Watermark(水位线),它是处理乱序数据和准确计算的关键。接下来我们直入主题,首先来看看今天的第一个问题Watermark是什么。

二、Watermark 是什么

(一)Watermark 定义​

Watermark(水位线)是 Flink 中用于处理事件时间(Event Time)的一种机制 ,它本质上是一种特殊的时间戳。简单来说,Watermark 是插入到数据流中的一个标记,用于标记事件时间的上界。在 Flink 的流处理中,Watermark 可以看作是一个时间戳,它表示所有小于该时间戳的事件都已经到达,之后不会再有该时间戳之前的事件到来。​

比如,我们有一个订单流,每个订单都有一个下单时间(事件时间)。如果当前 Watermark 的值是 10:00,那就意味着所有下单时间在 10:00 之前的订单都已经到达 Flink 系统,不会再有 10:00 之前下单的订单数据了。​

Watermark 必须单调递增,这是一个非常重要的特性。只有单调递增,才能确保任务的事件时间时钟在向前推进,而不是后退。如果 Watermark 不单调递增,就会导致时间混乱,影响窗口计算等操作的准确性。​

(二)Watermark 作用​

Watermark 的主要作用是解决数据乱序问题 ,确保在事件时间语义下,窗口操作能够准确地触发计算。在实际的流处理场景中,由于网络延迟、分布式系统等因素,数据到达 Flink 系统的顺序往往和它们实际产生的时间顺序不一致,也就是出现乱序问题。如果直接按照数据到达的顺序进行窗口计算,结果可能会不准确。​

举个例子,假设有一个统计每 10 分钟内订单金额总和的窗口操作。如果没有 Watermark 机制,当一个 10:00 - 10:10 的窗口时间到了,系统可能会立刻触发计算,而此时可能还有 10:00 - 10:10 之间产生的订单数据因为网络延迟还没到达。这样计算出来的订单金额总和就会偏小,结果不准确。​

引入 Watermark 后,我们可以设置一个合理的延迟时间。比如设置延迟 5 分钟,当系统接收到一个事件时间为 10:05 的订单时,Watermark 的值可能是 10:00(10:05 - 5 分钟)。此时,即使 10:00 - 10:10 的窗口时间到了,系统也不会立刻触发计算,而是会等待,直到 Watermark 的值达到 10:10(10:15 - 5 分钟),才会触发窗口计算。这样就给了可能迟到的订单数据足够的时间到达,保证了计算结果的准确性。​

总的来说,Watermark 就像是一个 “裁判”,它告诉 Flink 系统什么时候可以放心地进行窗口计算,不用担心有迟到的数据会影响结果。通过设置 Watermark,我们可以在一定程度上平衡延迟和结果正确性,让 Flink 在处理乱序数据时依然能够给出准确的分析结果。

三、Watermark 工作原理​

(一)生成机制​

Watermark 的生成方式主要有两种:周期性生成基于事件的生成 。​

周期性生成:Flink 会按照一定的时间间隔周期性地生成 Watermark。在这种方式下,我们需要设置一个最大允许的乱序时间(maxOutOfOrderness)。例如,假设我们设置最大乱序时间为 5 秒,当 Flink 接收到一个事件时间为 10:05 的事件时,它会根据这个事件时间和最大乱序时间来生成 Watermark。此时 Watermark 的值为 10:00(10:05 - 5 秒)。这意味着系统认为所有 10:00 之前的事件都已经到达,不会再有更晚到达的 10:00 之前的事件了。​

在 Flink 中,我们可以通过BoundedOutOfOrdernessWatermarkStrategy来实现周期性生成 Watermark,示例代码如下:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.OutputTag;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 从Kafka读取数据DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));// 生成WatermarkSingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> {// 从元素中提取事件时间戳return extractTimestamp(element);}));// 后续处理逻辑withWatermark.print();env.execute("Watermark Example");}private static long extractTimestamp(String element) {// 实际应用中根据数据格式提取事件时间戳// 这里只是示例,假设数据格式为 "value, timestamp"String[] parts = element.split(",");return Long.parseLong(parts[1]);}
}

基于事件的生成:基于事件的生成方式是当特定的事件发生时生成 Watermark ,这种方式适用于某些特殊的业务场景,比如在电商订单流中,当接收到一个特殊的 “订单批次结束” 事件时,生成 Watermark。在 Flink 中,我们可以通过实现AssignerWithPunctuatedWatermarks接口来实现基于事件的 Watermark 生成。这种方式需要我们在处理每个事件时,判断是否满足生成 Watermark 的条件。如果满足,则生成一个新的 Watermark。 基于事件的生成方式示例代码如下:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;public class PunctuatedWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 从Kafka读取数据DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));// 生成WatermarkSingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String>() {@Overridepublic long extractTimestamp(String element, long previousElementTimestamp) {// 从元素中提取事件时间戳return extractTimestamp(element);}@Overridepublic Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {// 判断是否满足生成Watermark的条件,这里假设当元素为 "END" 时生成Watermarkif ("END".equals(lastElement)) {return new Watermark(extractedTimestamp);}return null;}});// 后续处理逻辑withWatermark.print();env.execute("Punctuated Watermark Example");}private static long extractTimestamp(String element) {// 实际应用中根据数据格式提取事件时间戳// 这里只是示例,假设数据格式为 "value, timestamp"String[] parts = element.split(",");return Long.parseLong(parts[1]);}
}

(二)传播机制​

Watermark 在 Flink 算子间的传播过程是确保窗口计算准确性的关键 。当 Watermark 在数据流中生成后,它会随着数据一起在算子间传递。在单并行度的情况下,Watermark 的传播相对简单,直接从上游算子传递到下游算子。比如,在一个简单的数据流中,数据源生成 Watermark 后,经过 map 算子,再到 window 算子,Watermark 会依次传递下去。​

在多并行度的情况下,Watermark 的传播会涉及到对齐(alignment)的概念 。假设一个数据源有多个并行子任务,每个子任务都会生成自己的 Watermark。当这些带有不同 Watermark 的数据流汇聚到下游算子时,下游算子会选择所有输入流中最小的 Watermark 作为自己的 Watermark。这是因为只有当所有输入流中小于等于该 Watermark 时间戳的事件都到达后,才能确保窗口计算的准确性。比如,有三个并行输入流,它们的 Watermark 分别为 10:00、10:05 和 10:10,那么下游算子会选择 10:00 作为自己的 Watermark。这样做的目的是为了保证所有可能参与窗口计算的数据都已经到达,避免因为某个输入流的数据延迟而导致窗口计算结果不准确。​

(三)触发机制​

Watermark 达到一定条件时会触发窗口计算 。具体来说,当 Watermark 的值大于或等于窗口的结束时间,并且该窗口内有数据时,就会触发窗口计算。例如,我们定义一个 10:00 - 10:10 的窗口,设置最大乱序时间为 5 分钟。当 Watermark 的值达到 10:10(假设事件时间为 10:15 的数据到达,生成的 Watermark 为 10:10,10:15 - 5 分钟),并且 10:00 - 10:10 这个窗口内有数据时,就会触发窗口计算,计算该窗口内的数据结果并输出。​

再举个例子,假设我们有一个统计每 5 分钟内用户访问次数的窗口操作。如果没有数据迟到,当时间到达 10:05 时,窗口内的数据统计完成,Watermark 达到 10:05,此时窗口计算被触发,输出 10:00 - 10:05 这个时间段内的用户访问次数。但如果有数据迟到,比如 10:03 产生的数据在 10:08 才到达,由于我们设置了一定的乱序时间,Watermark 不会立刻达到 10:05,而是会等待迟到的数据。当 Watermark 最终达到 10:05 时,窗口计算才会触发,这样就能保证迟到的数据也能被正确统计到 10:00 - 10:05 的窗口内 。

四、Watermark 生成策略​

(一)固定延迟策略​

固定延迟策略是一种较为简单直观的 Watermark 生成策略 。它的原理是在接收到的最大事件时间的基础上,减去一个固定的延迟时间,以此来生成 Watermark。例如,我们设置固定延迟时间为 10 秒,当接收到的事件时间为 10:00:15 时,生成的 Watermark 时间戳就是 10:00:05(10:00:15 - 10 秒)。这表示系统认为所有 10:00:05 之前的事件都已经到达,不会再有更晚到达的 10:00:05 之前的事件了。​

在 Flink 中,使用固定延迟策略生成 Watermark 的代码示例如下:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;public class FixedDelayWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 从Kafka读取数据DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));// 生成Watermark,设置固定延迟10秒SingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((element, recordTimestamp) -> {// 从元素中提取事件时间戳return extractTimestamp(element);}));// 后续处理逻辑withWatermark.print();env.execute("Fixed Delay Watermark Example");}private static long extractTimestamp(String element) {// 实际应用中根据数据格式提取事件时间戳// 这里只是示例,假设数据格式为 "value, timestamp"String[] parts = element.split(",");return Long.parseLong(parts[1]);}
}

(二)滑动策略​

滑动策略是指 Watermark 按照一定的时间间隔(滑动步长)和窗口大小进行生成 。与固定延迟策略不同,滑动策略下的 Watermark 不是简单地基于最大事件时间减去固定延迟,而是在滑动窗口的基础上生成。例如,我们设置滑动窗口大小为 5 分钟,滑动步长为 1 分钟。当第一个窗口(0:00 - 0:05)内的事件处理完成后,生成第一个 Watermark,其时间戳为 0:05。接着,当第二个窗口(0:01 - 0:06)内的事件处理完成后,生成第二个 Watermark,其时间戳为 0:06,以此类推。​

以电商订单统计为例,如果我们想统计每 5 分钟内的订单金额总和,并且希望每分钟更新一次统计结果,就可以使用滑动策略。假设我们设置滑动窗口大小为 5 分钟,滑动步长为 1 分钟。当接收到第一个订单时,判断其事件时间,将其分配到对应的窗口(例如 0:00 - 0:05)。随着订单的不断接收,当 0:00 - 0:05 这个窗口内的订单处理完成后,生成一个 Watermark,其时间戳为 0:05,表示 0:05 之前的订单都已经到达。此时,开始计算 0:00 - 0:05 这个窗口内的订单金额总和并输出结果。接着,当 0:01 - 0:06 这个窗口内的订单处理完成后,生成下一个 Watermark,时间戳为 0:06,计算并输出 0:01 - 0:06 这个窗口内的订单金额总和。这样,我们就能每分钟得到一个最近 5 分钟内的订单金额统计结果。​

(三)自定义策略​

在一些复杂的业务场景中,固定延迟策略和滑动策略可能无法满足需求,这时就需要自定义 Watermark 生成策略 。自定义策略允许用户根据具体的业务逻辑和数据特点,灵活地生成 Watermark。例如,在物联网设备数据处理场景中,不同设备的数据产生频率和延迟情况可能各不相同。有些设备可能每隔几秒就发送一次数据,而有些设备可能由于网络问题,数据延迟较高。在这种情况下,我们可以根据设备 ID 来分别设置不同的 Watermark 生成策略。对于数据产生频率高、延迟低的设备,可以设置较小的固定延迟;对于数据延迟较高的设备,可以根据历史数据的延迟情况,动态地调整 Watermark 的生成逻辑。​

在 Flink 中,实现自定义 Watermark 生成策略需要实现WatermarkStrategy接口 。通过重写接口中的方法,我们可以定义自己的 Watermark 生成逻辑。例如,下面是一个简单的自定义 Watermark 生成策略的示例代码:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;public class CustomWatermarkStrategy implements WatermarkStrategy<String> {@Overridepublic WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomWatermarkGenerator();}private static class CustomWatermarkGenerator implements WatermarkGenerator<String> {private long maxTimestamp;private final long delay = 10000; // 延迟10秒public CustomWatermarkGenerator() {this.maxTimestamp = Long.MIN_VALUE;}@Overridepublic void onEvent(String event, long eventTimestamp, WatermarkOutput output) {maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTimestamp - delay));}}
}

 在上述代码中,我们定义了一个CustomWatermarkStrategy类,实现了WatermarkStrategy接口。在createWatermarkGenerator方法中,返回一个CustomWatermarkGenerator实例。CustomWatermarkGenerator类实现了WatermarkGenerator接口,通过onEvent方法记录接收到的最大事件时间,在onPeriodicEmit方法中,根据最大事件时间减去固定延迟生成 Watermark。这样,我们就实现了一个简单的自定义 Watermark 生成策略,可以根据具体业务需求进行灵活调整 。

五、实际生产使用

定义数据源:这里我们以从 Kafka 读取数据为例,假设 Kafka 中存储的是订单数据,每条数据格式为订单ID,下单时间戳,订单金额。示例代码如下:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import java.util.Properties;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// Kafka配置Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "watermark-group");// 从Kafka读取数据DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("order-topic", new SimpleStringSchema(), props));// 后续代码...}
}

设置 Watermark 生成器:我们采用固定延迟策略生成 Watermark,设置最大乱序时间为 5 秒。示例代码如下:

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "watermark-group");DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("order-topic", new SimpleStringSchema(), props));// 设置Watermark生成器,固定延迟5秒SingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> {// 从元素中提取事件时间戳,假设数据格式为 "订单ID,下单时间戳,订单金额"String[] parts = element.split(",");return Long.parseLong(parts[1]);}));// 后续代码...}
}

窗口操作与计算:对带有 Watermark 的数据进行窗口操作,统计每 10 分钟内的订单总金额。示例代码如下:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "watermark-group");DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("order-topic", new SimpleStringSchema(), props));SingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> {String[] parts = element.split(",");return Long.parseLong(parts[1]);}));// 按订单ID分组,进行窗口操作,统计每10分钟内的订单总金额DataStream<String> result = withWatermark.keyBy(value -> value.split(",")[0]).timeWindow(Time.minutes(10)).apply(new WindowFunction<String, String, String, TimeWindow>() {@Overridepublic void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {double totalAmount = 0;for (String element : input) {String[] parts = element.split(",");double amount = Double.parseDouble(parts[2]);totalAmount += amount;}out.collect("订单ID: " + key + ", 窗口开始时间: " + window.getStart() + ", 窗口结束时间: " + window.getEnd() + ", 订单总金额: " + totalAmount);}});result.print();env.execute("Watermark Example");}
}

代码解析​

  1. 创建流处理环境:StreamExecutionEnvironment.getExecutionEnvironment()获取当前运行环境的执行环境对象,env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)设置流处理的时间特性为事件时间,这是使用 Watermark 的前提。​
  2. 定义数据源:通过FlinkKafkaConsumer从 Kafka 中读取数据,props中配置了 Kafka 的连接地址和消费者组 ID,SimpleStringSchema用于将 Kafka 中的数据反序列化为字符串。​
  3. 设置 Watermark 生成器:WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))表示采用固定延迟策略生成 Watermark,最大乱序时间为 5 秒。withTimestampAssigner方法用于从输入数据中提取事件时间戳,这里假设数据格式为订单ID,下单时间戳,订单金额,通过Long.parseLong(parts[1])提取下单时间戳。​
  4. 窗口操作与计算:keyBy(value -> value.split(",")[0])按订单 ID 进行分组,timeWindow(Time.minutes(10))定义了一个 10 分钟的滚动窗口,apply方法中实现了具体的计算逻辑,遍历窗口内的所有订单数据,累加订单金额,最后输出统计结果。

 当我们运行上述代码后,控制台会输出每 10 分钟内每个订单 ID 的订单总金额。例如:

订单ID: 1001, 窗口开始时间: 1633420800000, 窗口结束时间: 1633421400000, 订单总金额: 1000.5

订单ID: 1002, 窗口开始时间: 1633420800000, 窗口结束时间: 1633421400000, 订单总金额: 2000.3

从结果可以看出,Watermark 机制有效地处理了乱序数据,确保了窗口计算的准确性。即使订单数据到达的顺序混乱,通过 Watermark 的延迟策略,Flink 也能准确地统计出每个窗口内的订单总金额 。 

六、常见问题​

(一)Watermark 延迟设置​

在设置 Watermark 延迟时间时,一定要充分考虑业务场景的需求和数据的实际延迟情况 。如果延迟设置过小,可能无法给迟到的数据足够的时间到达,导致窗口计算结果不准确,丢失重要数据。比如在电商促销活动期间,订单数据量会大幅增加,网络延迟也可能更严重。如果 Watermark 延迟设置只有 1 分钟,很可能一些在促销活动刚开始几分钟内产生的订单数据因为网络拥堵,在 1 分钟后才到达,这些迟到的订单数据就无法被正确统计到相应的窗口内,影响销售数据的统计准确性。​

相反,如果延迟设置过大,虽然能确保所有数据都能被处理,但会导致窗口计算的延迟增加,影响实时性 。假设一个实时监控系统,需要及时发现设备的异常情况并发出警报。如果 Watermark 延迟设置为 30 分钟,即使设备在几分钟前就出现了异常,但由于 Watermark 延迟时间过长,要等 30 分钟后才会触发窗口计算,这就导致警报发出延迟,可能会给生产带来严重的损失。​

(二)数据乱序处理​

当数据乱序严重时,原本设置的 Watermark 策略可能无法满足需求 。比如在一些特殊的物联网场景中,由于传感器分布范围广,网络状况复杂,数据乱序情况非常严重。此时,可以考虑增加 Watermark 的延迟时间,以确保更多迟到的数据能够被正确处理。例如,原本设置的延迟时间为 5 秒,在数据乱序严重的情况下,可以将延迟时间增加到 10 秒甚至更长。​

还可以结合其他技术手段,如使用侧输出流(Side Output)来处理迟到的数据 。侧输出流可以将迟到的数据单独收集起来,进行后续的特殊处理。在电商订单处理中,对于超过一定延迟时间(如 10 分钟)的订单数据,可以将其输出到侧输出流,然后对这些数据进行单独分析,看是否是由于某些特殊原因导致的延迟,如支付系统故障、物流信息更新不及时等。​

(三)Watermark 与窗口的配合​

Watermark 与窗口的大小和类型密切相关 ,直接影响窗口计算的正确性。如果窗口大小设置不合理,即使 Watermark 生成正常,也可能导致计算结果不准确。例如,在统计网站用户活跃情况时,如果将窗口设置得过大,如以 1 天为一个窗口,那么在这一天内,可能会有大量的数据涌入,而且数据乱序情况也可能更复杂。即使 Watermark 能够处理一定程度的乱序数据,但由于窗口时间跨度太长,仍然可能出现部分数据统计不准确的情况。相反,如果窗口设置得过小,如以 1 分钟为一个窗口,虽然能提高实时性,但可能会导致频繁的窗口计算,增加系统开销,而且对于一些需要统计较长时间段内数据的业务场景,可能无法得到准确的结果。​

不同类型的窗口(如滚动窗口、滑动窗口、会话窗口)与 Watermark 的配合方式也有所不同 。滚动窗口是固定大小且不重叠的窗口,Watermark 达到窗口结束时间时触发计算。滑动窗口有固定的窗口大小和滑动步长,Watermark 需要根据滑动步长和窗口大小来合理触发计算。会话窗口则是根据事件之间的时间间隔来定义窗口,Watermark 在处理会话窗口时,需要考虑会话的开始和结束条件,以确保窗口计算的准确性。在实际应用中,需要根据业务需求选择合适的窗口类型,并合理配置 Watermark,以实现准确高效的流处理 。

七、总结

Watermark 作为 Flink 流处理中处理事件时间和乱序数据的关键机制,其重要性不言而喻。它本质是一种特殊的时间戳,用于标记事件时间的上界,确保在乱序数据的情况下,窗口操作能够准确地触发计算。通过周期性生成或基于事件的生成方式,Watermark 在数据流中不断推进,在单并行度和多并行度的情况下,以不同的传播机制在算子间传递,最终在满足触发条件时,准确地触发窗口计算。​

在实际应用中,我们可以根据业务场景选择合适的 Watermark 生成策略,如固定延迟策略、滑动策略或自定义策略 。在代码实现方面,通过搭建 Flink 开发环境,按照引入依赖、创建流处理环境、定义数据源、设置 Watermark 生成器、进行窗口操作与计算的步骤,能够实现基于 Watermark 的流处理任务。同时,我们也要注意 Watermark 延迟设置、数据乱序处理以及 Watermark 与窗口的配合等问题,以确保流处理的准确性和高效性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/89181.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/89181.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/89181.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Rust Web 全栈开发(五):使用 sqlx 连接 MySQL 数据库

Rust Web 全栈开发&#xff08;五&#xff09;&#xff1a;使用 sqlx 连接 MySQL 数据库Rust Web 全栈开发&#xff08;五&#xff09;&#xff1a;使用 sqlx 连接 MySQL 数据库项目创建数据库准备连接请求功能实现Rust Web 全栈开发&#xff08;五&#xff09;&#xff1a;使用…

【zynq7020】PS的“Hello World”

目录 基本过程 新建Vivado工程 ZYNQ IP核设置 使用SDK进行软件开发 基于Vivado2017 Vivado工程建立 SDK调试 固化程序 注&#xff1a;Vivado 2019.1 及之前&#xff1a;默认使用 SDK Vivado 2019.2-2020.1&#xff1a;逐步过渡&#xff0c;支持 SDK 与 Vitis 并存 Vi…

希尔排序和选择排序及计数排序的简单介绍

希尔排序法又称缩小增量法。希尔排序法的基本思想是&#xff1a;先选定一个整数gap&#xff0c;把待排序文件中所有数据分成几个组&#xff0c;所有距离为gap的数据分在同一组内&#xff0c;并对每一组内的数据进行排序。然后gap减减&#xff0c;重复上述分组和排序的工作。当到…

Solid Edge多项目并行,浮动许可如何高效调度?

在制造企业的数字化设计体系中&#xff0c;Solid Edge 作为主流 CAD 工具&#xff0c;因其灵活的建模能力、同步技术和强大的装配设计功能&#xff0c;广泛应用于机械设备、零部件制造等行业的研发场景。随着企业设计任务复杂化&#xff0c;多项目并行成为常态&#xff0c;Soli…

Flink cdc 使用总结

Flink 与 Flink CDC 版本兼容对照表Flink 版本支持的 Flink CDC 版本关键说明Flink 1.11.xFlink CDC 1.2.x早期版本&#xff0c;需注意 Flink 1.11.0 的 Bug&#xff08;如 Upsert 写入问题&#xff09;&#xff0c;建议使用 1.11.1 及以上。Flink 1.12.xFlink CDC 2.0.x&#…

企业培训笔记:axios 发送 ajax 请求

文章目录axios 简介一&#xff0c;Vue工程中安装axios二&#xff0c;编写app.vue三&#xff0c;编写HomeView.vue四&#xff0c;Idea打开后台项目五&#xff0c;创建HelloController六&#xff0c;配置web访问端口七&#xff0c;运行项目&#xff0c;查看效果&#xff08;一&am…

Maven下载与配置对Java项目的理解

目录 一、背景 二、JAVA项目与Maven的关系 2.1标准java项目 2.2 maven 2.2.1 下载maven 1、下载 2、配置环境 2.2.2 setting.xml 1、配置settings.xml 2、IDEA配置maven 一、背景 在java项目中&#xff0c;新手小白很有可能看不懂整体的目录结构&#xff0c;以及每个…

Mars3d的走廊只能在一个平面的无法折叠的解决方案

问题场景&#xff1a;1. Mars3d的CorridorEntity只能在一个平面修改高度值&#xff0c;无法根据坐标点位制作有高度值的走廊效果&#xff0c;想要做大蜀山盘山走廊的效果实现不了。解决方案&#xff1a;1.使用原生cesium实现对应的走廊的截面形状、走廊的坐标点&#xff0c;包括…

LeetCode 每日一题 2025/7/7-2025/7/13

记录了初步解题思路 以及本地实现代码&#xff1b;并不一定为最优 也希望大家能一起探讨 一起进步 目录7/7 1353. 最多可以参加的会议数目7/8 1751. 最多可以参加的会议数目 II7/9 3439. 重新安排会议得到最多空余时间 I7/10 3440. 重新安排会议得到最多空余时间 II7/11 3169. …

Bash常见条件语句和循环语句

以下是 Bash 中常用的条件语句和循环语句分类及语法说明&#xff0c;附带典型用例&#xff1a;一、条件语句 1. if 语句 作用&#xff1a;根据条件执行不同代码块 语法&#xff1a; if [ 条件 ]; then# 条件为真时执行 elif [ 其他条件 ]; then# 其他条件为真时执行 else# 所有…

uni-app 选择国家区号

uni-app选择国家区号组件 hy-countryPicker 我们在做登录注册功能的时候&#xff0c;可能会遇到选择区号来使用不同国家手机号来登录或者注册的功能。这里我就介绍下我这个uni-app中使用的选择区号的组件&#xff0c;包含不同国家国旗图标。 效果图 别的不说&#xff0c;先来…

客户端主机宕机,服务端如何处理 TCP 连接?详解

文章目录一、客户端主机宕机后迅速重启1、服务端有数据发送2、服务端开启「保活」机制3、服务端既没有数据发送&#xff0c;也没有开启「保活」机制二、客户端主机宕机后一直没有重启1、服务端有数据发送2、服务端开启「保活」机制3、服务端既没有数据发送&#xff0c;也没有开…

《大数据技术原理与应用》实验报告五 熟悉 Hive 的基本操作

目 录 一、实验目的 二、实验环境 三、数据集 四、实验内容与完成情况 4.1 创建一个内部表 stocks&#xff0c;字段分隔符为英文逗号&#xff0c;表结构下所示。 4.2 创建一个外部分区表 dividends&#xff08;分区字段为 exchange 和symbol&#xff09;&#xff0c;字段…

【橘子分布式】Thrift RPC(编程篇)

一、简介 之前我们研究了一下thrift的一些知识&#xff0c;我们知道他是一个rpc框架&#xff0c;他作为rpc自然是提供了客户端到服务端的访问以及两端数据传输的消息序列化&#xff0c;消息的协议解析和传输&#xff0c;所以我们今天就来了解一下他是如何实现这些功能&#xff…

清理C盘--办法

c盘经常爆红1、命令行2、属性3、临时文件

Java-71 深入浅出 RPC Dubbo 上手 父工程配置编写 附详细POM与代码

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; AI炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有…

创客匠人:创始人 IP 打造的内核,藏在有效的精神成长里

当创始人 IP 成为企业增长的重要引擎&#xff0c;许多人急于寻找 “爆款公式”&#xff0c;却忽略了一个更本质的问题&#xff1a;IP 的生命力&#xff0c;终究源于创始人的精神成长。创客匠人在深耕知识付费赛道的过程中&#xff0c;见证了无数案例&#xff1a;那些能持续实现…

GPT和MBR分区

GPT&#xff08;GUID分区表&#xff09;和MBR&#xff08;主引导记录&#xff09;是两种不同的磁盘分区表格式&#xff0c;用于定义硬盘上分区的布局、位置及启动信息&#xff0c;二者在设计、功能和适用场景上有显著差异。以下从多个维度详细对比&#xff1a; 一、核心定义与起…

c#进阶之数据结构(字符串篇)----String

1、String介绍首先我们得明白&#xff0c;string和String代表的实际上是同一个类型&#xff0c;string是C#中的关键字&#xff0c;代表String类型&#xff0c;因此我们直接来学习String类型。从官方的底层实现代码可以看出&#xff0c;当前String类型实际上就是一个Char类型的聚…

快速排序递归和非递归方法的简单介绍

基本思想为&#xff1a;任取待排序元素序列中 的某元素作为基准值&#xff0c;按照该排序码将待排序集合分割成两子序列&#xff0c;左子序列中所有元素均小于基准值&#xff0c;右 子序列中所有元素均大于基准值&#xff0c;然后最左右子序列重复该过程&#xff0c;直到所有元…