1、Job启动流程

在这里插入图片描述

1、Client触发 SparkContext 初始化

2、SparkContextMaster 注册应用

3、Master 调度 Worker 启动 Executor

4、Worker 进程启动 Executor

5、DAGScheduler 将作业分解为 Stage

6、TaskScheduler 分配 TaskExecutor

2、核心组件

组件职责
SparkContext应用入口,协调各组件,管理应用生命周期。
DAGScheduler将 Job 拆分为 Stage,构建 DAG,提交 TaskSet 给 TaskScheduler。
TaskScheduler调度 Task 到 Executor,处理故障重试。
CoarseGrainedSchedulerBackend与集群管理器交互,申请资源,管理 Executor。
ExternalClusterManager抽象层,适配不同集群(Standalone/YARN/Mesos)。
Master & WorkerStandalone 模式下管理集群资源(Master 分配资源,Worker 启动 Executor)。
Executor在 Worker 上运行,执行 Task,管理内存/磁盘。
CoarseGrainedExecutorBackendExecutor 的通信代理,接收 Task,返回状态/结果。
Task计算单元(ShuffleMapTask / ResultTask)。
ShuffleManager管理 Shuffle 数据读写(如 SortShuffleManager)。

3、工作流程

1、SparkContext

负责资源申请、任务提交、与集群管理器通信。

调用runJob方法,将RDD操作传递给DAGScheduler

2、DAGScheduler

将Job拆分为Stage(DAG),处理Shuffle依赖,提交TaskSet给TaskScheduler。

1、DAGSchedulerEvent

/* 作业生命周期事件 */
JobSubmitted //新作业提交时触发
JobCancelled //单个作业被取消
JobGroupCancelled //作业组整体取消
JobTagCancelled //按标签批量取消作业
AllJobsCancelled //取消所有运行中的作业/* 阶段执行事件 */
MapStageSubmitted //Shuffle Map阶段提交
StageCancelled //单个阶段取消
StageFailed //阶段执行失败
ResubmitFailedStages //自动重试失败阶段 ,默认4次/* 任务调度事件 */
TaskSetFailed //整个任务集失败,默认4次
SpeculativeTaskSubmitted //启动推测执行任务
UnschedulableTaskSetAdded //任务集进入待调度队列
UnschedulableTaskSetRemoved //任务集离开待调度队列/* Shuffle 优化事件 */
RegisterMergeStatuses //注册Shuffle合并状态
ShuffleMergeFinalized //Shuffle合并完成
ShufflePushCompleted //Shuffle数据推送完成/* 资源管理事件 */
ExecutorAdded //新Executor注册成功
ExecutorLost //Executor异常丢失
WorkerRemoved //工作节点移除/* 执行过程事件 */
BeginEvent //任务集开始执行 
GettingResultEvent //驱动程序主动获取任务结果
CompletionEvent //作业/阶段完成

2、stage拆分流程

*ResultStage (执行作的最后一个阶段)、ShuffleMapStage (shuffle映射输出文件)*

  1. 用户行动操作触发submitJob,发送JobSubmitted事件。
  2. handleJobSubmitted处理事件,调用createResultStage创建ResultStage。
  3. createResultStage调用getOrCreateParentStages获取父Stage,父Stage的创建会递归进行。
  4. 在创建父Stage的过程中,遇到宽依赖则创建ShuffleMapStage,并递归创建其父Stage。
  5. 当所有父Stage都创建完成后,回到handleJobSubmitted,调用submitStage提交ResultStage。
  6. submitStage检查父Stage是否完成,如果有未完成的父Stage,则递归提交父Stage;否则,提交当前Stage(调用submitMissingTasks)。
  7. submitMissingTasks为Stage创建任务(ShuffleMapTask或ResultTask),并提交给TaskScheduler执行。

3、宽窄依赖切分

private def stageDependsOn(stage: Stage, target: Stage): Boolean = {if (stage == target) {return true}// DFS遍历RDD依赖树val visitedRdds = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visitingval waitingForVisit = new ListBuffer[RDD[_]]waitingForVisit += stage.rdddef visit(rdd: RDD[_]): Unit = {if (!visitedRdds(rdd)) {visitedRdds += rddfor (dep <- rdd.dependencies) {dep match {// 宽依赖:创建新的ShuffleMapStagecase shufDep: ShuffleDependency[_, _, _] =>val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)if (!mapStage.isAvailable) {waitingForVisit.prepend(mapStage.rdd)}  // Otherwise there's no need to follow the dependency back// 窄依赖:继续回溯case narrowDep: NarrowDependency[_] =>waitingForVisit.prepend(narrowDep.rdd)}}}}while (waitingForVisit.nonEmpty) {visit(waitingForVisit.remove(0))}visitedRdds.contains(target.rdd)}

3、TaskScheduler

接收TaskSet,按调度策略(FIFO/FAIR)将Task分配给Executor。

1、执行流程

1、DAGScheduler 调用 taskScheduler.submitTasks() 后,任务进入 TaskScheduler 调度阶段

2、任务提交submitTasks

// TaskSetManager管理任务集
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// 添加到调度池
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 触发资源分配
backend.reviveOffers()

3、资源分配 (Driver)

// CoarseGrainedSchedulerBackend.scala
override def reviveOffers(): Unit = {driverEndpoint.send(ReviveOffers)  // 向DriverEndpoint发送消息
}// DriverEndpoint处理
case ReviveOffers =>makeOffers()  // 触发资源分配

4、资源分配核心

private def makeOffers(): Unit = {// Make sure no executor is killed while some task is launching on itval taskDescs = withLock {// 1. 获取所有可用Executor资源val activeExecutors = executorDataMap.filter { case (id, _) => isExecutorActive(id) }val workOffers = activeExecutors.map {case (id, executorData) => buildWorkerOffer(id, executorData)}.toIndexedSeq// 2. 调用任务调度器分配任务scheduler.resourceOffers(workOffers, true)}// 3. 启动分配的任务if (taskDescs.nonEmpty) {launchTasks(taskDescs)}
}

5、任务启动

// CoarseGrainedSchedulerBackend
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {for (task <- tasks.flatten) {// 1. 序列化任务val serializedTask = TaskDescription.encode(task)// 2. 检查任务大小if (serializedTask.limit() >= maxRpcMessageSize) {Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {val executorData = executorDataMap(task.executorId)// Do resources allocation here. The allocated resources will get released after the task// finishes.executorData.freeCores -= task.cpustask.resources.foreach { case (rName, addressAmounts) =>executorData.resourcesInfo(rName).acquire(addressAmounts)}logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")// 发送任务到ExecutorexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/93196.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/93196.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/93196.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL 临时表与复制表

一、MySQL 临时表临时表是会话级别的临时数据载体&#xff0c;其设计初衷是为了满足短期数据处理需求&#xff0c;以下从技术细节展开说明。&#xff08;一&#xff09;核心特性拓展1.生命周期与会话绑定会话结束的判定&#xff1a;包括正常断开连接&#xff08;exit/quit&…

从配置到调试:WinCC与S7-1200/200SMT无线Modbus TCP通讯方案

测试设备与参数l 西门子PLC型号&#xff1a;S7-1200 1台l 西门子PLC型号&#xff1a;S7-200Smart 1台l 上位机&#xff1a;WinCC7.4 1台l 无线通讯终端——DTD418MB 3块l 主从关系&#xff1a;1主2从l 通讯接口&#xff1a;RJ45接口l 供电&#xff1a;12-24VDCl 通讯协议&a…

Android沉浸式全屏显示与隐藏导航栏的实现

1. 总体流程以下是实现沉浸式全屏显示和隐藏导航栏的流程&#xff1a;步骤描述步骤1创建一个新的Android项目步骤2在布局文件中定义需要展示的界面步骤3在Activity中设置沉浸式全屏显示步骤4处理系统UI的显示与隐藏步骤5运行应用并测试效果2. 详细步骤步骤1&#xff1a;创建一个…

EN 62368消费电子、信息技术设备和办公设备安全要求标准

EN 62368认证标准是一项全球性的电子产品安全标准&#xff0c;用于评估和认证消费电子、信息技术设备和办公设备的安全性。该标准由国际电工委员会(IEC)制定&#xff0c;取代了传统的EN60065和EN 60950两个标准&#xff0c;成为国际电子产品安全领域的新指导。IEC /EN 62368-1是…

【unity实战】使用Splines+DOTween制作弯曲手牌和抽牌动画效果

最终效果 文章目录最终效果前言实战1、Splines的使用2、绘制样条线3、DOTween安装和使用4、基于样条曲线&#xff08;Spline&#xff09;的手牌管理系统4.1 代码实现4.2 解释&#xff1a;&#xff08;1&#xff09;计算第一张卡牌的位置&#xff08;居中排列&#xff09;&#…

Flask模板注入梳理

从模板开始介绍&#xff1a;Flask中有许多不同功能的模板&#xff0c;他们之间是相互隔离的地带&#xff0c;可供引入和使用。Flask中的模块&#xff1a;flask 主模块&#xff1a;包含框架的核心类和函数&#xff0c;如 Flask&#xff08;应用实例&#xff09;、request&#x…

企业级的即时通讯平台怎么保护敏感行业通讯安全?

聊天记录存在第三方服务器、敏感文件被误发至外部群组、离职员工仍能查看历史消息.对于金融、医疗、政务等对数据安全高度敏感的行业而言&#xff0c;“沟通效率与”信息安全”的矛盾&#xff0c;从未像今天这样尖锐。企业即时通讯怎么保护敏感行业通讯安全&#xff1f;这个问题…

Java Spring框架最新版本及发展史详解(截至2025年8月)-优雅草卓伊凡

Java Spring框架最新版本及发展史详解&#xff08;截至2025年8月&#xff09;-优雅草卓伊凡引言今天有个新项目 客户问我为什么不用spring 4版本&#xff0c;卓伊凡我今天刚做完项目方案&#xff0c;我被客户这一句问了有点愣住&#xff0c;Java Spring框架最新版本及发展史详解…

Android实现Glide/Coil样式图/视频加载框架,Kotlin

Android实现Glide/Coil样式图/视频加载框架&#xff0c;Kotlin <uses-permission android:name"android.permission.WRITE_EXTERNAL_STORAGE" /><uses-permission android:name"android.permission.READ_EXTERNAL_STORAGE" /><uses-permiss…

【k8s】pvc 配置的两种方式volumeClaimTemplates 和 PersistentVolumeClaim

pvc配置实例 实例1在Deployment中配置 template:xxxxxxvolumeClaimTemplates:- metadata:name: dataspec:accessModes:- ReadWriteOnceresources:requests:storage: 1GistorageClassName: nfsdev-storageclass (创建好的storageClassName)实例2#先创建一个pvc 然后在 Deploym…

Logistic Loss Function|逻辑回归代价函数

----------------------------------------------------------------------------------------------- 这是我在我的网站中截取的文章&#xff0c;有更多的文章欢迎来访问我自己的博客网站rn.berlinlian.cn&#xff0c;这里还有很多有关计算机的知识&#xff0c;欢迎进行留言或…

计算机网络技术-知识篇(Day.1)

一、网络概述 1、网络的概念 两个不在同一地理位置的主机&#xff0c;通过传输介质和通信协议&#xff0c;实现通信和资源共享。 2、网络发展史 第一阶段&#xff08;20世纪60年代&#xff09; 标志性事件&#xff1a;ARPANET的诞生关键技术&#xff1a;分组交换技术 第二…

工业元宇宙:迈向星辰大海的“玄奘之路”

一、从认知革命到工业革命&#xff1a;文明跃迁的底层逻辑1.1 认知革命&#xff1a;人类协作的基石时间线&#xff1a;约7万年前&#xff0c;智人通过语言和想象力构建共同虚拟现实&#xff0c;形成部落协作模式。核心突破&#xff1a;虚构能力&#xff1a;创造神、国家、法律等…

9. React组件生命周期

2. React组件生命周期 2.1. 认识生命周期 2.1.1. 很多事物都有从创建到销毁的整个过程&#xff0c;这个过程称之为生命周期&#xff1b;2.1.2. React组件也有自己的生命周期&#xff0c;了解生命周期可以让我们在最合适的地方完成想要的功能2.1.3. 生命周期和生命周期函数的关系…

【单板硬件开发】关于复位电路的理解

阅读紫光同创供应商提供的FPGA单板硬件开发手册&#xff0c;发现复位电路他们家解释的很通俗易懂&#xff0c;所以分享一下。如下图&#xff0c;RST_N 是低有效的异步全芯片复位信号&#xff0c;一般外部连接电路有 3 种形式如图 3–2&#xff0c;可根据实际需要选择合适的电路…

《Unity Shader入门精要》学习笔记一

1、本书的源代码 https://github.com/candycat1992/Unity_Shaders_Book 2、第1章 Shader是面向GPU的工作方式 3、第2章 渲染流水线 Shader&#xff1a;着色器 渲染流水线&#xff1a;目标是渲染一张二维纹理&#xff0c;输入是一个虚拟摄像机、一些光源、一些Shader以及纹…

从零到一:TCP 回声服务器与客户端的完整实现与原理详解

目录 一、TCP 通信的核心逻辑 二、TCP 服务器编程步骤 步骤 1&#xff1a;创建监听 Socket 步骤 2&#xff1a;绑定地址与端口&#xff08;bind&#xff09; 步骤 3&#xff1a;设置监听状态&#xff08;listen&#xff09; 步骤 4&#xff1a;接收客户端连接&#xff08…

MyBatis-Plus核心内容

MyBatis-Plus MyBatis-Plus 是一个基于 MyBatis的增强工具&#xff0c;旨在简化开发过程&#xff0c;减少重复代码。它在MyBatis的基础上增加了CRUD操作封装&#xff0c;条件构造器、代码生成器等功能。 一、核心特性与优势 1. 核心特性 无侵入&#xff1a;只做增强不做改变&am…

计算机网络摘星题库800题笔记 第4章 网络层

第4章 网络层4.1 网络层概述题组闯关1.在 Windows 的网络配置中&#xff0c;“默认网关” 一般被设置为 ( ) 的地址。 A. DNS 服务器 B. Web 服务器 C. 路由器 D. 交换机1.【参考答案】C 【解析】只有在计算机上正确安装网卡驱动程序和网络协议&#xff0c;并正确设置 IP 地址信…

非root用户在linux中配置zsh(已解决ncurses-devel报错)

Zsh&#xff08;Z Shell&#xff09;是一款功能强大的交互式 Unix shell&#xff0c;以其高度可定制性和丰富的功能著称&#xff0c;被视为 Bash 的增强替代品。它支持智能补全、主题美化、插件扩展&#xff08;如 Oh My Zsh 框架&#xff09;、自动纠错、全局别名等特性&#…