一、启动模式

1、standalone

在这里插入图片描述

  1. 资源申请:Driver向Master申请Executor资源
  2. Executor启动:Master调度Worker启动Executor
  3. 注册通信:Executor直接向Driver注册

2、YARN

在这里插入图片描述

  1. Driver向YARN ResourceManager(RM)申请AM容器

  2. RM分配NodeManager(NM)启动AM(yarn-client 仅资源代理,不运行用户代码)

  3. AM向RM注册

  4. AM根据申请Executor容器

5.RM分配多个NM

6.每个NM启动ExecutorBackend进程

**7.**注册通信:Executor向AM内的Driver注册

二、Executor端任务执行的核心组件

  1. Driver 端组件
    • CoarseGrainedSchedulerBackend:负责与Executor通信
    • TaskSchedulerImpl:任务调度核心逻辑
    • DAGScheduler:DAG调度与Stage管理
    • BlockManagerMaster:块管理器协调器
    • MapOutputTrackerMaster:Shuffle输出跟踪器
  2. Executor 端组件
    • CoarseGrainedExecutorBackend:Executor的通信端点
    • Executor:任务执行引擎
    • TaskRunner:任务执行线程封装
    • BlockManager:本地数据块管理
    • ShuffleManager:Shuffle读写控制
    • ExecutorSource:指标监控

三、Executor 端任务执行核心流程

1、任务接收与初始化

  • CoarseGrainedExecutorBackend 接收任务
case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {val taskDesc = TaskDescription.decode(data.value)logInfo(log"Got assigned task ${MDC(LogKeys.TASK_ID, taskDesc.taskId)}")executor.launchTask(this, taskDesc)}
  • Executor 任务启动
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {val taskId = taskDescription.taskIdval tr = createTaskRunner(context, taskDescription)runningTasks.put(taskId, tr)val killMark = killMarks.get(taskId)if (killMark != null) {tr.kill(killMark._1, killMark._2)killMarks.remove(taskId)}threadPool.execute(tr)if (decommissioned) {log.error(s"Launching a task while in decommissioned state.")}}

2、任务执行

  • TaskRunner.run
// 1. 类加载与依赖管理
updateDependencies(taskDescription.artifacts.files,taskDescription.artifacts.jars,taskDescription.artifacts.archives,isolatedSession)
// 2. 反序列化任务
task = ser.deserialize[Task[Any]](taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
// 3. 内存管理
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
task.setTaskMemoryManager(taskMemoryManager)// 4. 任务执行
val value = Utils.tryWithSafeFinally {val res = task.run(taskAttemptId = taskId,attemptNumber = taskDescription.attemptNumber,metricsSystem = env.metricsSystem,cpus = taskDescription.cpus,resources = resources,plugins = plugins)threwException = falseres} {// block 释放val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)// memory 释放val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()if (freedMemory > 0 && !threwException) {val errMsg = log"Managed memory leak detected; size = " +log"${LogMDC(NUM_BYTES, freedMemory)} bytes, ${LogMDC(TASK_NAME, taskName)}"if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {throw SparkException.internalError(errMsg.message, category = "EXECUTOR")} else {logWarning(errMsg)}}if (releasedLocks.nonEmpty && !threwException) {val errMsg =log"${LogMDC(NUM_RELEASED_LOCKS, releasedLocks.size)} block locks" +log" were not released by ${LogMDC(TASK_NAME, taskName)}\n" +log" ${LogMDC(RELEASED_LOCKS, releasedLocks.mkString("[", ", ", "]"))})"if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {throw SparkException.internalError(errMsg.message, category = "EXECUTOR")} else {logInfo(errMsg)}}}// 5. 状态上报
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
  • Task run
// hadoop.caller.context.enabled = true
// 添加 HDFS 审计日志 , 用于问题排查 。 
// e.g. 小文件剧增 定位spark 任务
new CallerContext("TASK",SparkEnv.get.conf.get(APP_CALLER_CONTEXT),appId,appAttemptId,jobId,Option(stageId),Option(stageAttemptId),Option(taskAttemptId),Option(attemptNumber)).setCurrentContext()// 任务启动
context.runTaskWithListeners(this)

3、shuffle处理

  • ShuffleMapTask

为下游 Stage 准备 Shuffle 数据(Map 端输出),生成 MapStatus(包含数据位置和大小信息)。

val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 从广播 序列化 rdd 、 dep
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0Lval rdd = rddAndDep._1
val dep = rddAndDep._2
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {partitionId
} else {context.taskAttemptId()
}
dep.shuffleWriterProcessor.write(rdd.iterator(partition, context),dep,mapId,partitionId,context)
  • ResultTask
override def runTask(context: TaskContext): U = {// Deserialize the RDD and the func using the broadcast variables.val threadMXBean = ManagementFactory.getThreadMXBeanval deserializeStartTimeNs = System.nanoTime()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0Lfunc(context, rdd.iterator(partition, context))
}

四、核心通信机制

消息类型方向内容
RegisterExecutorExecutor→DriverexecutorId, hostPort
RegisteredExecutorDriver→Executor注册成功确认
LaunchTaskDriver→Executor序列化的TaskDescription
StatusUpdateExecutor→DrivertaskId, state, result data
KillTaskDriver→Executor终止指定任务
StopExecutorDriver→Executor关闭Executor指令
HeartbeatExecutor→Driver心跳+指标数据

五、Executor 线程模型

Executor JVM Process
├── CoarseGrainedExecutorBackend (netty)
├── ThreadPool (CacheThreadPool)
│   ├── TaskRunner 1
│   ├── TaskRunner 2
│   └── ... 
├── BlockManager
│   ├── MemoryStore (on-heap/off-heap)
│   └── DiskStore
└── ShuffleManager├── SortShuffleWriter└── UnsafeShuffleWriter

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/93764.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/93764.shtml
英文地址,请注明出处:http://en.pswp.cn/web/93764.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

rabbitmq发送的延迟消息时间过长就立即消费了

RabbitMQ延迟消息在设置过长时间后被立即消费的问题,通常与以下原因有关: TTL限制问题 RabbitMQ对消息TTL(Time To Live)有32位整数限制(0-4294967295毫秒),约49.7天。超过该值的延迟时间会导致消息立即被消费解决方案:确保设置的…

kafka的pull的依据

1. 每次 pull() 是否必须在提交上一批消息的 offset 之后?绝对不需要! 提交 offset 和调用 poll() (拉取消息) 是两个完全独立的行为。消费者可以连续调用 poll() 多次,期间完全不提交任何 offset。 这是 Kafka 消费者的正常工作模式。提交 o…

学习嵌入式的第二十一天——数据结构——链表

单向链表特点:存储的内存空间不连续 。为了弥补顺序存储存劣势。优势 插入,删除 O(1) 动态存储 ,在程序运行期间决定大小。劣势: 不能随机访问 O(N) 节点-> 数据域指针域 顺序表(数组) 只有数据域链表的操作代码&#xff1…

Rust Web 全栈开发(十三):发布

Rust Web 全栈开发(十三):发布Rust Web 全栈开发(十三):发布发布 teacher_service发布 svr测试 teacher_service 和 svr发布 wasm-client测试 wasm-clientRust Web 全栈开发(十三)&a…

Zephyr 中的 bt_le_per_adv_set_data 函数的介绍和应用方法

目录 概述 1 函数接口介绍 1.1 函数原型 1.2 功能详解 2 使用方法 2.1 创建流程 2.1.1 创建扩展广播实例 2.1.2 设置周期性广播数据 2.1.3 配置周期性广播参数 2.1.4 启动广播 2.2 主流程函数 2.3 关键配置 (prj.conf) 3 高级用法 3.1 大数据分片传输 3.2 动态数…

Ansible 角色管理指南

Ansible 角色管理指南 实验环境设置 以下命令用于准备实验环境,创建一个工作目录并配置基本的Ansible设置: # 创建web工作目录并进入 [azurewhiskycontroller ~]$ mkdir web && cd web# 创建Ansible配置文件 [azurewhiskycontroller web]$ cat &…

【补充】数据库中有关系统编码和校验规则的简述

一、字符集和校验规则1.创建数据库案例数据库创建方法:使用CREATE DATABASE语句创建数据库字符集指定方式:通过CHARACTER SETutf8指定数据库编码格式默认配置说明:未指定字符集时默认使用utf8和utf8_general_ci配置文件位置&…

计算机网络 HTTP1.1、HTTP2、HTTP3 的核心对比及性能分析

以下是 HTTP/1.1、HTTP/2、HTTP/3 的核心对比及性能分析,重点关注 HTTP/3 的性能优势:📊 HTTP 协议演进对比表特性HTTP/1.1 (1997)HTTP/2 (2015)HTTP/3 (2022)传输层协议TCPTCPQUIC (基于 UDP)连接建立TCP 三次握手 TLS 握手 (高延迟)同 HTT…

【计算机视觉与深度学习实战】07基于Hough变换的答题卡识别技术:原理、实现与生物识别拓展(有完整代码)

1. 引言 在人工智能和计算机视觉快速发展的今天,自动化图像识别技术已经渗透到社会生活的各个角落。从工业质检到医学影像分析,从自动驾驶到教育评估,计算机视觉技术正在重塑我们与数字世界的交互方式。在这众多应用中,答题卡识别技术作为教育信息化的重要组成部分,承载着…

《WASM驱动本地PDF与Excel预览组件的深度实践》

WASM为何能成为本地文件解析的核心载体,首先需要跳出“前端只能处理轻量任务”的固有认知,从“性能与兼容性平衡”的角度切入。PDF与Excel这类文件格式的解析,本质是对复杂二进制数据的解码与重构——PDF包含嵌套的对象结构、字体渲染规则和矢量图形描述,Excel则涉及单元格…

Oracle Free 实例重装系统操作指南

之前申请了两台 x86 架构的 Oracle 机器,偶尔用来部署开源项目测试,有一台在测试 SSH 相关功能时 “变砖”,网上看重装系统发现很繁琐就没去打理,近期又想到这个机器,发现去年就有了官方重装方法,简单配置下…

Linux 基础指令与权限管理

一、Linux 操作系统概述1.1 操作系统的核心价值操作系统的本质是 "使计算机更好用"。它作为用户与硬件之间的中间层,负责内存管理、进程调度、文件系统管理和设备驱动管理等核心功能,让用户无需直接操作硬件即可完成复杂任务。在服务器领域&am…

深度学习-167-MCP技术之工具函数的设计及注册到MCP服务器的两种方式

文章目录 1 MCP协议概述 1.1 MCP的原理 1.2 两种主要的通信模式 2 工具函数的设计与实现 2.1 tools.py(工具函数) 2.2 工具函数的设计原则 2.3 工具函数的测试 3 MCP服务器的构建与配置 3.1 安装mcp库 3.2 main.py(MCP服务器) 3.2.1 方式一(add_tool方法) 3.2.2 方式二(@mcp.to…

哈希:两数之和

问题描述:在一个整数数组中,找到两数之和为target的两个值,返回找到的两个值的下标。 nums[3,3] target6 返回:[0,1] 说明:返回结果,索引无顺序要求;有唯一的答案;不能使用两次相…

PHP反序列化的CTF题目环境和做题复现第5集_POP链构造4

1 题目 下载yii2.0.37版本,https://github.com/yiisoft/yii2/releases/tag/2.0.37 放在phpstudy的www目录下或ubuntu的/var/www/html的目录下。 3 EXP <?php namespace PHPUnit\Framework\MockObject{class MockTrait {private $classCode = "system(whoami);php…

广东省省考备考(第八十一天8.19)——资料分析、数量(强化训练)

资料分析 错题解析解析解析解析解析今日题目正确率&#xff1a;67% 数量&#xff1a;数学运算解析解析解析标记题解析今日题目正确率&#xff1a;80%

决策树剪枝及数据处理

一、核心决策树算法&#xff08;3 类主流算法&#xff09;1. ID3 算法&#xff1a;用 “信息增益” 选属性ID3 是决策树的 “开山鼻祖” 之一&#xff0c;它的核心逻辑是 “选能让数据最‘纯’的属性”—— 这里的 “纯” 用 “信息增益” 衡量。简单说&#xff0c;“信息增益”…

Ansible 角色管理

环境准备# 创建一个叫web的文件夹并进入&#xff08;相当于新建一个工作目录&#xff09;[lykcontroller ~]$ mkdir web && cd web​# 创建Ansible的配置文件ansible.cfg[lykcontroller web]$ cat > ansible.cfg <<EOF[defaults]remote_user lykinventory .…

Java面试准备指南!

现在已经是8月中旬了&#xff0c;秋招马上就要开始了&#xff0c;不知道大家准备好了吗&#xff1f;现阶段找工作真的是千军万马过独木桥&#xff0c;没有真本事&#xff0c;真的会被淘汰掉&#xff0c;现实就是如此的残酷&#xff01; 为了能够帮助到大家在秋招Java面试中脱颖…

Encoder-Decoder Model编码器-解码器模型

Encoder-Decoder编码器-解码器是一种深度学习模型&#xff0c;应用于图像处理、语音识别、自然语言处理等领域。主要由编码器和解码器两部分组成&#xff0c;这种结构能够处理序列到序列的任务。编码器-解码器模型具备独特的双阶段处理&#xff0c;先对输入信息进行编码&#x…