20. TaskExecutor与ResourceManager心跳
- 现在,需要回过头看
ResourceManager
是如何产生心跳管理服务的。 - cluster.initializeServices 方法的 heartbeatServices = createHeartbeatServices(configuration);产生一个 HeartbeatServicesImpl
- jobmanager的 resourceManager启动的时候,会启动 startHeartbeatServices();
startHeartbeatServices
方法
- 该方法产生2个对象。1个taskManagerHeartbeatManager和1个jobManagerHeartbeatManager。这次主要看前面1个。
private void startHeartbeatServices() {taskManagerHeartbeatManager =heartbeatServices.createHeartbeatManagerSender(resourceId,new TaskManagerHeartbeatListener(),getMainThreadExecutor(),log);jobManagerHeartbeatManager =heartbeatServices.createHeartbeatManagerSender(resourceId,new JobManagerHeartbeatListener(),getMainThreadExecutor(),log);}
- taskManagerHeartbeatManager:负责监控所有已注册的 TaskExecutor,是 ResourceManager 与 TaskExecutor 间心跳通信的核心组件。
- jobManagerHeartbeatManager:与 JobManager 保持心跳(此处暂不关注)。
HeartbeatManagerSenderImpl
HeartbeatManagerSenderImpl
是 ResourceManager 端的心跳发送管理器,具备以下特点:- 继承 HeartbeatManagerImpl,是具体的心跳机制实现;
- 实现 Runnable 接口,自身是一个周期性执行的任务。
- 核心点:
- 周期性遍历
getHeartbeatTargets()
(即所有注册的 TaskExecutor); - 逐个向它们发送心跳请求(
requestHeartbeat
方法)。
- 周期性遍历
HeartbeatManagerSenderImpl(long heartbeatPeriod,long heartbeatTimeout,int failedRpcRequestsUntilUnreachable,ResourceID ownResourceID,HeartbeatListener<I, O> heartbeatListener,ScheduledExecutor mainThreadExecutor,Logger log,HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {super(heartbeatTimeout,failedRpcRequestsUntilUnreachable,ownResourceID,heartbeatListener,mainThreadExecutor,log,heartbeatMonitorFactory);this.heartbeatPeriod = heartbeatPeriod;//这里表明周期调度mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);}@Overridepublic void run() {if (!stopped) {log.debug("Trigger heartbeat request.");for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {//这里就是循环将前面taskExecutor注册心跳取出来,进行心跳requestHeartbeat(heartbeatMonitor);}//这里单线程周期调度getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);}}
requestHeartbeat 方法
//heartbeatMonitor封装了一个taskexecutor网关。说白了就是heartbeatTarget就是调用 taskexecutor方法进行交互。
private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();heartbeatTarget//TaskExecutorHeartbeatSender 就是前面讲的rpc调用.requestHeartbeat(getOwnResourceID(), payload).whenCompleteAsync(handleHeartbeatRpc(heartbeatMonitor.getHeartbeatTargetId()),getMainThreadExecutor());}
-
heartbeatTarget:
-
实际上是封装的
TaskExecutorHeartbeatSender
; -
通过 RPC 接口向对应 TaskExecutor 发出心跳请求;
-
本质上是对 TaskExecutor 方法的远程调用。
-
-
TaskExecutorHeartbeatSender 实质上是一个封装了 TaskExecutor RPC 网关的对象,负责通过 RPC 调用向 TaskExecutor 发送心跳请求。推荐配合 Debug 调试理解整个心跳交互过程,有助于深入掌握 ResourceManager 与 TaskExecutor 之间的通信机制。
小结
-
ResourceManager 在启动阶段就为所有 TaskExecutor 准备好了心跳监控;
-
依靠单线程周期调度器,实现对所有 TaskExecutor 的心跳请求发送;
-
心跳本质是对 TaskExecutor RPC 方法的远程调用;
-
如果某个 TaskExecutor 心跳超时或失败,会触发资源回收与故障恢复机制。