问题描述

一个常见的开窗逻辑(12H 或者 500条):

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import java.time.Duration;public class UIDWindowWithProcessFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 假设输入数据流包含uid字段和其他数据DataStream<Event> inputStream = env.addSource(...); inputStream.keyBy(event -> event.uid)  // 按UID分组.process(new CustomProcessFunction()).print();env.execute("UID-based Window Processing");}public static class CustomProcessFunction extends KeyedProcessFunction<String, Event, OutputEvent> {// 状态用于计数private transient ValueState<Integer> countState;// 状态用于记录最后更新时间private transient ValueState<Long> lastTimerState;@Overridepublic void open(Configuration parameters) {ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>("count", Types.INT);countState = getRuntimeContext().getState(countDescriptor);ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timerState", Types.LONG);lastTimerState = getRuntimeContext().getState(timerDescriptor);}@Overridepublic void processElement(Event event, Context ctx, Collector<OutputEvent> out) throws Exception {// 获取当前计数Integer currentCount = countState.value();if (currentCount == null) {currentCount = 0;}// 更新计数currentCount += 1;countState.update(currentCount);// 获取当前定时器时间戳Long currentTimer = lastTimerState.value();// 如果是第一条记录,注册12小时后的定时器if (currentCount == 1) {long timerTime = ctx.timestamp() + Duration.ofHours(12).toMillis();ctx.timerService().registerProcessingTimeTimer(timerTime);lastTimerState.update(timerTime);}// 如果达到500条,立即触发并重置if (currentCount >= 500) {// 触发处理OutputEvent output = new OutputEvent(ctx.getCurrentKey(), currentCount,System.currentTimeMillis());out.collect(output);// 清除状态countState.clear();// 取消之前的定时器if (currentTimer != null) {ctx.timerService().deleteProcessingTimeTimer(currentTimer);}lastTimerState.clear();}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputEvent> out) throws Exception {// 定时器触发时处理Integer currentCount = countState.value();if (currentCount != null && currentCount > 0) {OutputEvent output = new OutputEvent(ctx.getCurrentKey(),currentCount,timestamp);out.collect(output);// 清除状态countState.clear();lastTimerState.clear();}}}// 定义输入输出事件类public static class Event {public String uid;// 其他字段...}public static class OutputEvent {public String uid;public int count;public long timestamp;public OutputEvent(String uid, int count, long timestamp) {this.uid = uid;this.count = count;this.timestamp = timestamp;}}
}

虽然 通过uid进行shuffle,即 keyBy(event -> event.uid)。

但因为Flink的并行度,也就是subtask数量 远少于 uid数量,导致每个subtask会处理多个用户的数据。而实际上每个subtask只有一个 CustomProcessFunction。那状态计数是否会冲突?

// 获取当前计数 
Integer currentCount = countState.value();

触发的Timer又是否是只属于一个用户?

@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputEvent> out) throws Exception {// 定时器触发时处理Integer currentCount = countState.value();if (currentCount != null && currentCount > 0) {

实际上这两个问题的答案都是肯定的,实现机制在于:

  • getRuntimeContext().getState()怎么实现对于key绑定状态
  • Timer怎么绑定key?

为什么getRuntimeContext().getState()能够获得和key绑定的state?

Subtask会根据是不是keyedProcessFunction 在处理每条数据时,设置currentKey

OneInputStreamTask 通过 StreamTaskNetworkOutput 处理每一条输入数据。StreamTaskNetworkOutput则创建了recordProcessor 。

private StreamTaskNetworkOutput(Input<IN> operator, WatermarkGauge watermarkGauge, Counter numRecordsIn) {this.operator = checkNotNull(operator);this.watermarkGauge = checkNotNull(watermarkGauge);this.numRecordsIn = checkNotNull(numRecordsIn);this.recordProcessor = RecordProcessorUtils.getRecordProcessor(operator);}

RecordProcessorUtils.getRecordProcessor 根据是不是KeyStream会增加setKeyContextElement操作,这个process会设置Key再调用OP的 processElement。

    public static <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(Input<T> input) {boolean canOmitSetKeyContext;if (input instanceof AbstractStreamOperator) {canOmitSetKeyContext = canOmitSetKeyContext((AbstractStreamOperator<?>) input, 0);} else {canOmitSetKeyContext =input instanceof KeyContextHandler&& !((KeyContextHandler) input).hasKeyContext();}if (canOmitSetKeyContext) {return input::processElement;} else if (input instanceof AsyncKeyOrderedProcessing&& ((AsyncKeyOrderedProcessing) input).isAsyncKeyOrderedProcessingEnabled()) {return ((AsyncKeyOrderedProcessing) input).getRecordProcessor(1);} else {return record -> {input.setKeyContextElement(record);input.processElement(record);};}}

AbstractStreamOperator setKey的实现

    @Override@SuppressWarnings({"unchecked", "rawtypes"})public void setKeyContextElement1(StreamRecord record) throws Exception {setKeyContextElement(record, stateKeySelector1);}private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)throws Exception {if (selector != null) {Object key = selector.getKey(record.getValue());setCurrentKey(key);}}

RuntimeContext创建

AbstractStreamOperator 会创建 runtime

        this.runtimeContext =new StreamingRuntimeContext(environment,environment.getAccumulatorRegistry().getUserMap(),getMetricGroup(),getOperatorID(),getProcessingTimeService(),null,environment.getExternalResourceInfoProvider());

 AbstractUdfStreamOperator 会向udf注入runtime

      
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT>{@Overrideprotected void setup(StreamTask<?, ?> containingTask,StreamConfig config,Output<StreamRecord<OUT>> output) {super.setup(containingTask, config, output);FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());}//FunctionUtils
public static void setFunctionRuntimeContext(Function function, RuntimeContext context) {if (function instanceof RichFunction) {RichFunction richFunction = (RichFunction) function;richFunction.setRuntimeContext(context);}}

StreamingRuntimeContext 获取状态,这就是getRuntimeContext().getState()调用的。

    @Overridepublic <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);stateProperties.initializeSerializerUnlessSet(this::createSerializer);return keyedStateStore.getState(stateProperties);}// 返回成员对象private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {checkNotNull(stateDescriptor, "The state properties must not be null");checkNotNull(keyedStateStore,String.format("Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.",stateDescriptor.getName(), stateDescriptor.getType()));return keyedStateStore;}

注意这个 keyedStateStore 在StreamingRuntimeContext 刚new出来时 是null,在AbstractStreamOperator 的以下函数进行初始化

    @Overridepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager)throws Exception {final TypeSerializer<?> keySerializer =config.getStateKeySerializer(getUserCodeClassloader());final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());final CloseableRegistry streamTaskCloseableRegistry =Preconditions.checkNotNull(containingTask.getCancelables());final StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getJobConfiguration(),runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState(),isAsyncKeyOrderedProcessingEnabled());stateHandler =new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);timeServiceManager =isAsyncKeyOrderedProcessingEnabled()? context.asyncInternalTimerServiceManager(): context.internalTimerServiceManager();beforeInitializeStateHandler();stateHandler.initializeOperatorState(this);runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));}

StreamOperatorStateHandler 会根据有没有keyedStateBackend 来判断是不是要产生DefaultKeyedStateStore。

    public StreamOperatorStateHandler(StreamOperatorStateContext context,ExecutionConfig executionConfig,CloseableRegistry closeableRegistry) {this.context = context;this.keySerializer = context.keySerializer();this.operatorStateBackend = context.operatorStateBackend();this.keyedStateBackend = context.keyedStateBackend();this.asyncKeyedStateBackend = context.asyncKeyedStateBackend();this.closeableRegistry = closeableRegistry;if (keyedStateBackend != null || asyncKeyedStateBackend != null) {keyedStateStore =new DefaultKeyedStateStore(keyedStateBackend,asyncKeyedStateBackend,new SerializerFactory() {@Overridepublic <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {return typeInformation.createSerializer(executionConfig.getSerializerConfig());}});} else {keyedStateStore = null;}}

getState方法如下,最终调用 keyedStateBackend相关方法。

    @Overridepublic <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {requireNonNull(stateProperties, "The state properties must not be null");try {stateProperties.initializeSerializerUnlessSet(serializerFactory);return getPartitionedState(stateProperties);} catch (Exception e) {throw new RuntimeException("Error while getting state", e);}}protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor)throws Exception {checkState(keyedStateBackend != null&& supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1,"Current operator does not integrate the async processing logic, "+ "thus only supports state v1 APIs. Please use StateDescriptor under "+ "'org.apache.flink.runtime.state'.");return keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);}

那context中的keyedStateBackend是怎么注入的?AbstractStreamOperator初始化产生了StreamOperatorStateContext。

StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getJobConfiguration(),runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState(),isAsyncKeyOrderedProcessingEnabled());

这里创建StreamOperatorStateContext实际使用 StreamTaskStateInitializerImpl ,该对象包含了操作符执行所需的各种状态后端和时间服务管理器。

主要初始化内容

1. 状态后端初始化

  • Keyed State Backend(键控状态后端)

    • 根据 keySerializer 是否存在决定是否创建键控状态后端
    • 支持同步和异步两种键控状态后端
    • 通过 StateBackend.createKeyedStateBackend() 或 StateBackend.createAsyncKeyedStateBackend() 创建
  • Operator State Backend(操作符状态后端)

    • 创建 DefaultOperatorStateBackend 来管理操作符状态
    • 处理操作符级别的状态恢复

2. 原始状态输入流初始化

  • Raw Keyed State Inputs(原始键控状态输入)

    • 为自定义键控状态提供输入流
    • 处理从检查点或保存点恢复的原始键控状态数据
  • Raw Operator State Inputs(原始操作符状态输入)

    • 为自定义操作符状态提供输入流
    • 处理从检查点或保存点恢复的原始操作符状态数据

3. 时间服务管理器初始化

  • Internal Timer Service Manager(内部定时器服务管理器)
    • 创建和管理内部定时器服务
    • 支持同步和异步状态后端的定时器管理
    • 当 keyedStatedBackend != null 创建 timeServiceManager

初始化依据

1. 任务环境信息

  • 通过 Environment 获取任务的基本信息,包括:
    • 任务信息(TaskInfo)
    • 任务状态管理器(TaskStateManager
    • 作业ID和任务索引等

2. 操作符标识

  • 根据 OperatorID 从 TaskStateManager 中获取特定操作符的优先级状态信息(PrioritizedOperatorSubtaskState)
  • 这包含了从检查点或保存点恢复的状态数据

3. 状态恢复信息

  • 从 PrioritizedOperatorSubtaskState 获取各种状态:
    • 管理的键控状态(getPrioritizedManagedKeyedState())
    • 管理的操作符状态(getPrioritizedManagedOperatorState())
    • 原始键控状态(getPrioritizedRawKeyedState())
    • 原始操作符状态(getPrioritizedRawOperatorState())

4. 配置参数

  • managedMemoryFraction:管理内存的分配比例
  • isUsingCustomRawKeyedState:是否使用自定义原始键控状态
  • isAsyncState:是否使用异步状态后端

Timer怎么和key绑定?

Timer 详细分析见:

揭秘Fliuk Timer机制:是否多线程触发?

调用链:

  • 用户在 KeyedProcessFunction 中调用 ctx.timerService().registerProcessingTimeTimer(...)

  • KeyedProcessOperator 将 context 注入 KeyedProcessFunctionKeyedProcessFunction 调用 ctx.timerService()实际转发 KeyedProcessOperator 注入的 SimpleTimerService

  • SimpleTimerService 将调用转发给 internalTimerService.registerProcessingTimeTimer(...)

  • InternalTimerService (内部使用一个支持删除的索引堆,懒判断到期后)StreamTaskProcessingTimeService 注册一个回调。

KeyedProcessOperator 的 open方法 创建时间服务和Context。

    public void open() throws Exception {super.open();collector = new TimestampedCollector<>(output);InternalTimerService<VoidNamespace> internalTimerService =getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);TimerService timerService = new SimpleTimerService(internalTimerService);context = new ContextImpl(userFunction, timerService);onTimerContext = new OnTimerContextImpl(userFunction, timerService);}

调用了 AbstractStreamOperator的方法 获取时间服务

    public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {if (timeServiceManager == null) {throw new RuntimeException("The timer service has not been initialized.");}@SuppressWarnings("unchecked")InternalTimeServiceManager<K> keyedTimeServiceHandler =(InternalTimeServiceManager<K>) timeServiceManager;TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();checkState(keySerializer != null, "Timers can only be used on keyed operators.");return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable);}

Triggerable 接口有两个方法:onEventTime(InternalTimer<K, N> timer) 和 onProcessingTime(InternalTimer<K, N> timer)。当 InternalTimerService 检测到有定时器到期时,就会调用实现了这个接口的对象的相应方法。

这个方法根据  InternalTimeServiceManagerImpl 获取 TimerService

    public <N> InternalTimerService<N> getInternalTimerService(String name,TypeSerializer<K> keySerializer,TypeSerializer<N> namespaceSerializer,Triggerable<K, N> triggerable) {checkNotNull(keySerializer, "Timers can only be used on keyed operators.");// the following casting is to overcome type restrictions.TimerSerializer<K, N> timerSerializer =new TimerSerializer<>(keySerializer, namespaceSerializer);InternalTimerServiceImpl<K, N> timerService =registerOrGetTimerService(name, timerSerializer);timerService.startTimerService(timerSerializer.getKeySerializer(),timerSerializer.getNamespaceSerializer(),triggerable);return timerService;}

register中保证每个名字只有一个 TimerService

<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {InternalTimerServiceImpl<K, N> timerService =(InternalTimerServiceImpl<K, N>) timerServices.get(name);if (timerService == null) {if (priorityQueueSetFactory instanceof AsyncKeyedStateBackend) {timerService =new InternalTimerServiceAsyncImpl<>(taskIOMetricGroup,localKeyGroupRange,keyContext,processingTimeService,createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),cancellationContext);} else {timerService =new InternalTimerServiceImpl<>(taskIOMetricGroup,localKeyGroupRange,keyContext,processingTimeService,createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),cancellationContext);}timerServices.put(name, timerService);}return timerService;}

startTimerService方法是 InternalTimerServiceImpl 的初始化入口。它负责设置必要的序列化器、触发目标(通常是算子自身),并且在从故障恢复时处理已保存的定时器。

与处理时间定时器的联系点:

// ... existing code ...this.triggerTarget = Preconditions.checkNotNull(triggerTarget);// re-register the restored timers (if any)// 关键点:检查处理时间定时器队列 (processingTimeTimersQueue) 的头部是否有定时器final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();if (headTimer != null) {// 如果存在(通常意味着是从快照恢复的),// 则调用 processingTimeService.registerTimer 来重新注册这个最早到期的处理时间定时器。// this::onProcessingTime 是回调方法,当定时器触发时,会调用 InternalTimerServiceImpl 的 onProcessingTime 方法。nextTimer =processingTimeService.registerTimer(headTimer.getTimestamp(), this::onProcessingTime);}this.isInitialized = true;} else {
// ... existing code ...
  • 恢复处理时间定时器:
    • 在 if (restoredTimersSnapshot != null) 的逻辑块之后(或者如果 restoredTimersSnapshot 为 null),代码会检查 processingTimeTimersQueue。这个队列存储了当前算子实例负责的所有处理时间定时器。
    • 如果 processingTimeTimersQueue.peek() 返回一个非 null 的 headTimer,这通常意味着在任务启动时,状态后端已经恢复了之前保存的定时器到这个队列中。
    • 此时,InternalTimerServiceImpl 需要告诉底层的 ProcessingTimeService (由 StreamTask 提供,通常是基于 JVM 的 ScheduledExecutorService):“嘿,我这里最早有一个处理时间定时器需要在 headTimer.getTimestamp() 这个时间点触发,到时请调用我的 onProcessingTime 方法。”
    • processingTimeService.registerTimer(headTimer.getTimestamp(), this::onProcessingTime) 就是在执行这个注册操作。this::onProcessingTime 是一个方法引用,指向 InternalTimerServiceImpl 自己的 onProcessingTime 方法。当 ProcessingTimeService 确定时间到达后,会通过 Mailbox 机制回调这个方法。
    • nextTimer 字段保存了 ProcessingTimeService 返回的 ScheduledFuture<?>,允许后续取消或管理这个已注册的系统级定时器。

所以,startTimerService 在初始化阶段确保了从状态恢复的处理时间定时器能够被正确地重新调度。

registerProcessingTimeTimer 方法是用户(通过 KeyedProcessFunction -> SimpleTimerService)实际注册一个新的处理时间定时器时调用的核心逻辑。

注意这里向Timer队列添加的时候,Timer 包含 keyContext.getCurrentKey()

// ... existing code ...@Overridepublic void registerProcessingTimeTimer(N namespace, long time) {// 获取当前处理时间定时器队列中最早的定时器 (如果存在)InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();// 将新的定时器添加到处理时间定时器队列中// TimerHeapInternalTimer 包含了时间戳、key 和 namespace// keyContext.getCurrentKey() 获取当前正在处理的元素的 keyif (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {// 如果添加成功 (通常意味着队列状态改变了,比如新定时器成了新的头部,或者队列之前是空的)// 获取之前队列头部的触发时间,如果队列之前为空,则认为是 Long.MAX_VALUElong nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;// 检查新注册的定时器是否比当前已调度的系统级定时器更早if (time < nextTriggerTime) {// 如果新定时器更早,说明需要重新调度if (nextTimer != null) {// 取消之前已注册的系统级定时器 (nextTimer)// false 表示不中断正在执行的任务 (如果回调已经在执行中)nextTimer.cancel(false);}// 使用 processingTimeService 注册新的、更早的定时器// 当这个新的时间点到达时,会回调 this::onProcessingTimenextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);}}}
// ... existing code ...
  • 添加定时器到内部队列:
    • 首先,new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace) 创建了一个新的处理时间定时器对象。
    • processingTimeTimersQueue.add(...) 将这个新定时器添加到内部的优先队列中。这个队列会根据时间戳对定时器进行排序。
  • 与 ProcessingTimeService 交互以优化调度:
    • InternalTimerServiceImpl 只会向底层的 ProcessingTimeService 注册一个系统级的定时器,即其内部队列中最早到期的那个处理时间定时器。这样做是为了避免向系统注册过多的定时器回调,提高效率。
    • InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek(); 获取在添加新定时器之前队列中最早的定时器。
    • long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE; 获取之前需要触发的时间。
    • if (time < nextTriggerTime): 这个判断至关重要。它检查新注册的定时器 time 是否比当前已在 ProcessingTimeService 中注册的下一个触发时间 nextTriggerTime 更早。
      • 如果新定时器确实更早,那么之前向 ProcessingTimeService 注册的那个 nextTimer 就作废了(因为它不再是最早的了)。
      • nextTimer.cancel(false); 取消旧的系统级定时器。
      • nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime); 然后向 ProcessingTimeService 注册这个新的、更早的定时器。
    • 如果新注册的定时器并不比当前已调度的 nextTimer 更早,那么就不需要做任何操作,因为当前的 nextTimer 仍然是有效的,它会在其预定时间触发,届时 onProcessingTime 方法会处理所有到期的定时器(包括这个新加入但不是最早的定时器)。

Timer触发的时候怎么绑定key

KeyedProcessOperator 的 onProcessingTime 函数 调用触发 udf 的 onTimer

       @Overridepublic void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {collector.eraseTimestamp();invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);}private void invokeUserFunction(TimeDomain timeDomain, InternalTimer<K, VoidNamespace> timer)throws Exception {onTimerContext.timeDomain = timeDomain;onTimerContext.timer = timer;userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);onTimerContext.timeDomain = null;onTimerContext.timer = null;}

而这个函数通过 InternalTimerServiceImpl 调用,这里通过timer.getKey()设置了key

public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {void onProcessingTime(long time) throws Exception {// null out the timer in case the Triggerable calls registerProcessingTimeTimer()// inside the callback.nextTimer = null;InternalTimer<K, N> timer;while ((timer = processingTimeTimersQueue.peek()) != null&& timer.getTimestamp() <= time&& !cancellationContext.isCancelled()) {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();triggerTarget.onProcessingTime(timer);taskIOMetricGroup.getNumFiredTimers().inc();}if (timer != null && nextTimer == null) {nextTimer =processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/98681.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/98681.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/98681.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】模版初阶---函数模版、类模版

&#x1f31f;个人主页&#xff1a;第七序章 &#x1f308;专栏系列&#xff1a;C&#xff0b;&#xff0b; 目录 ❄️前言&#xff1a; &#x1f308;1.泛型编程&#xff1a; &#x1f308;2.函数模板 &#x1f36d;2.1函数模板概念 &#x1f36d;2.2函数模板格式 &am…

查找算法(Java)

目录 一.定义 二.分类 三.线性查找 原理&#xff1a; 思路分析 代码实现 例题实践 1.两数之和 方法一&#xff1a;暴力穷举法 思路分析 代码实现 方法二&#xff1a;创建哈希表 思路分析 代码实现 2.移动零 思路分析 代码实现 四.二分查找 原理&#xff1a; …

计算机网络--四层模型,IP地址和MAC地址

四层模型&#xff1a;分别是应用层&#xff0c;传输层&#xff0c;网络层和链路层。应用层&#xff1a;提供了应用程序之间相互通信的接口&#xff0c;允许用户访问网络服务。这一层定义了应用程序如何与底层网络进行交互。例如HTTP协议。传输层&#xff1a;它处理数据的分段、…

解析、创建Excel文件的开源库OpenXLSX介绍

OpenXLSX是一个C库&#xff0c;用于读取、写入、创建和修改.xlsx格式的Microsoft Excel文件&#xff0c;源码地址&#xff1a;https://github.com/troldal/OpenXLSX &#xff0c;License为BSD-3-Clause&#xff0c;可在Windows、Linux、MaCOS平台上使用。最新发布版本为v0.3.2&…

【C++】C++11 篇二

【C】C11 篇二前言移动构造函数移动赋值运算符重载类成员变量初始化 &#xff08;缺省值出自C11强制生成默认函数的关键字default:禁止生成默认函数的关键字delete:继承和多态中的final与override关键字&#xff08;出自C11可变参数模板递归函数方式展开参数包逗号表达式展开参…

构建Python环境的几种工具

本文主要介绍如何构建Python环境来处理不同的工作。 1.常用的构建Python环境的工具 ①venv(内置模块):Python 3.3 内置标准库模块&#xff0c;无需额外安装。 ②virtualenv:venv的前身&#xff0c;功能更强大且支持旧版Python。 ③conda:来自 Anaconda 或 Miniconda。不仅能…

c#项目编译时外部依赖文件的同步问题

很多场景因为资源文件太多或太大无法放到资源里面或者是依赖的dll文件&#xff0c;需要编译时同步到bin\debug或bin\release下的&#xff0c;这里面要修改工程文件代码实现。 比如&#xff0c;我把这个项目依赖的dll和附加文件放到ref_dll文件夹里面&#xff0c;希望编译的时候…

数学建模常用算法-模拟退火算法

一、模拟退火算法模拟退火的灵感来源于物理中的 “退火过程”—— 将金属加热到高温后&#xff0c;缓慢冷却&#xff0c;金属原子会在热能作用下自由运动&#xff0c;逐渐形成能量最低的稳定结构。算法将这一过程抽象为数学模型&#xff1a;“温度 T”&#xff1a;对应物理中的…

架构很简单:业务架构图

缘起业务架构是一个复杂的体系&#xff0c;如何更简单的表达&#xff0c;并能使用起来呢&#xff1f;所谓&#xff1a;大道至简。基于此&#xff0c;这篇文章就开始了。业务是一切架构的开始&#xff0c;如果没有业务&#xff0c;架构又有什么作用呢&#xff1f;所以做架构首先…

【前端埋点】纯前端实现 A/B Test

“纯前端实现 A/B Test”&#xff0c;意思就是 没有后端分流、也不依赖流量网关&#xff0c;那么只能靠前端逻辑来做“流量切分”。 &#x1f3af; 目标 80% 的用户 → A 页面20% 的用户 → B 页面且要保证 同一个用户每次访问结果一致&#xff08;否则用户刷新页面时 A/B 会跳…

Day22_【机器学习—集成学习(3)—Boosting—Adaboost算法】

Adaptive Boosting(自适应提升)是基于 Boosting思想实现的一种集成学习算法&#xff0c;核心思想是通过逐步提高那些被前一步分类错误的样本的权重来训练一个强分类器。一、Adaboost算法直线相当于一个弱学习器&#xff0c;正确的数据权重减小&#xff0c;错误的数据权重增加二…

C#语言入门详解(18)传值、输出、引用、数组、具名、可选参数、扩展方法

C#语言入门详解&#xff08;18&#xff09;传值、输出、引用、数组、具名、可选参数、扩展方法一、传值参数1. 值类型2. 引用类型&#xff0c;并且创建对象3. 引用类型&#xff0c;不创建对象二、引用参数1. 值类型2. 引用类型&#xff0c;创建新对象3. 引用类型&#xff0c;不…

模型部署:(二)安卓端部署Yolov5-v6.0目标检测项目全流程记录

模型部署&#xff1a;&#xff08;二&#xff09;安卓端部署Yolov5-v6.0目标检测项目全流程记录1、下载NCNN项目2、下载基于安卓的yolov5项目3、修改相应的配置文件4、将项目打包成apk包5、部署自己训练的yolov5模型6、相关资料下载&#xff1a;1、下载NCNN项目 # 链接地址 ht…

图形基础算法:如何将点与带曲线边的多边形位置关系算法做稳定

简介判断点与多边形位置关系算法是几何算法中最基础的算法之一&#xff0c;包括布尔运算在内的非常非常多的算法都会用到它。它的稳定是算法库稳定的关键。下面我们从一个边都是直线的多边形开始了解射线法的原理。然后看看引入曲线后会带来哪些问题&#xff0c;以及在实际应用…

react的filber架构

好问题 &#x1f44d; 你问的是 React Fiber 架构。这是 React 16 引入的核心机制&#xff0c;用来解决 React 在大规模更新时的性能问题。下面我给你从 背景 → Fiber 是什么 → 原理 → 优点 → 流程 来系统讲。一、为什么需要 Fiber&#xff1f;在 React 15 及以前&#xff…

Lucky STUN穿透结合群晖NAS实现docker下transmission监听端口动态更新

参考文章 LCUKY系列教程 一 「LUCKY STUN穿透」使用 cURL 自动修改 Transmission 的监听端口 二 「LUCKY STUN穿透」使用 Webhook 自动修改 qbittorrent 的监听端口 三 LUCKY STUN穿透在Windows上使用UPnP工具为BT客户端自动添加内外端口号不同的映射规则 四「LUCKY STUN穿透」…

如何在Ubuntu畅玩鸣潮等游戏

本教程只包括Steam上的游戏。# 更新软件源 sudo apt update # 安装Steam sudo apt install steam首先&#xff0c;在Ubuntu的snap商店安装Steam&#xff0c;启动&#xff0c;登陆&#xff0c;下载游戏。到这里的操作都比较简单&#xff0c;对于没有反作弊的游戏&#xff0c;往往…

机器学习09——聚类(聚类性能度量、K均值聚类、层次聚类)

上一章&#xff1a;机器学习08——集成学习 下一章&#xff1a;机器学习10——降维与度量学习 机器学习实战项目&#xff1a;【从 0 到 1 落地】机器学习实操项目目录&#xff1a;覆盖入门到进阶&#xff0c;大学生就业 / 竞赛必备 文章目录一、聚类任务&#xff08;无监督学习…

解决 Docker 构建中 Python 依赖冲突的完整指南

问题背景 在基于 registry.cn-shenzhen.aliyuncs.com/all_dev/dev:invoice-base 镜像构建 Docker 容器时,我们遇到了一个常见的 Python 依赖管理问题: ERROR: ResolutionImpossible: for help visit https://pip.pypa.io/en/latest/topics/dependency-resolution/#dealing-…

光子计算芯片实战:Lightmatter Passage互连架构性能评测

点击 “AladdinEdu&#xff0c;同学们用得起的【H卡】算力平台”&#xff0c;H卡级别算力&#xff0c;80G大显存&#xff0c;按量计费&#xff0c;灵活弹性&#xff0c;顶级配置&#xff0c;学生更享专属优惠。 摘要 随着人工智能计算需求呈指数级增长&#xff0c;传统电子计算…