文章目录

  • 1.Spark任务阶段划分
    • 1.1 job,stage与task
    • 1.2 job划分
    • 1.3 stage和task划分
  • 2.任务执行时机
  • 3.task内部数据存储与流动
  • 4.根据sparkUI了解Spark执行计划
    • 4.1查看job和stage
    • 4.2 查看DAG图
    • 4.3查看task

1.Spark任务阶段划分

1.1 job,stage与task

  • 首先根据action()操作顺序将应用划分为作业job。
  • 根据每个job的逻辑处理流程中的ShuffleDependency依赖关系,将job划分为执行阶段stage。
  • 在每个stage中,根据最后生成的RDD的分区个数生成多个计算任务task。

1.2 job划分

举一个简单的例子,在下面这段代码中:

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count, col# 初始化SparkSession
spark = SparkSession.builder.appName("MultiJobStageTaskExample").getOrCreate()# 读取数据(Transformation,不触发Job)
orders = spark.read.csv("orders.csv",header=True,inferSchema=True
).select("用户ID", "订单金额", "支付方式")users = spark.read.csv("users.csv",header=True,inferSchema=True
).select("用户ID", "所在城市")# 缓存重复使用的数据集(优化性能)
orders.cache()
users.cache()# --------------------------
# Job 1:计算不同支付方式的订单数和总金额
# --------------------------
payment_analysis = orders.groupBy("支付方式") \.agg(count("用户ID").alias("订单数"),  # 聚合操作(宽依赖,触发Shuffle)sum("订单金额").alias("总金额"))# Action操作:触发Job 1
payment_result = payment_analysis.collect()  # Job 1
print("支付方式分析结果:", payment_result)# --------------------------
# Job 2:计算每个城市的平均订单金额
# --------------------------
city_analysis = orders.join(users, on="用户ID", how="inner") \  #  join是宽依赖(Shuffle).groupBy("所在城市") \  # 再次宽依赖(Shuffle).agg(sum("订单金额").alias("城市总金额"),count("用户ID").alias("城市订单数")) \.withColumn("平均订单金额", col("城市总金额") / col("城市订单数"))# Action操作:触发Job 2
city_analysis.write.csv("city_avg_order")  # Job 2# --------------------------
# Job 3:统计高消费用户(订单总金额>10000)的分布
# --------------------------
high_value_users = orders.groupBy("用户ID") \  # 宽依赖(Shuffle).agg(sum("订单金额").alias("用户总消费")) \.filter(col("用户总消费") > 10000) \  # 过滤(窄依赖).join(users, on="用户ID", how="inner")  # 宽依赖(Shuffle)# Action操作:触发Job 3
high_value_count = high_value_users.count()  # Job 3
print("高消费用户数量:", high_value_count)spark.stop()

根据payment_analysis.collect(),city_analysis.write.csv(“city_avg_order”)和high_value_count = high_value_users.count(),这段代码被划分成了三个job。

1.3 stage和task划分

如下图所示,在一个job中,出现了shuffle操作,就会划分一个stage。再根据每个stage中的分区数量划分task数量。
在这里插入图片描述

2.任务执行时机

  • job的提交时间与action()被调用的时间有关,当应用程序执行到rdd.action()时,就会立即将rdd.action()形成的job提交给Spark。这其实也就是为什么有的时候写完代码没有运行的原因,因为没写action()操作,job不会被提交到Spark
  • 仅当上游的stage都执行完成后,再执行下游的stage。如果stage之间没有依赖,则并行执行,例如stage1和stage0是并行执行,当且仅当两者执行后,stage2才开始执行。
  • stage中每个task因为是独立而且同构的,可以并行运行没有先后之分。

3.task内部数据存储与流动

task是根据分区来划分的,而一个分区中有很多个record,根据不同record之间的关系,存储的方式也不同:
在这里插入图片描述
这是一个task的执行流程的几种不同的情况:

  • 第一个流程:record之间并没有相互依赖,因此可以进行流式处理,即record1处理成record1’之后就可以将record1从内存中删掉,而不用关心record2和record3处理到哪里了。
  • 第二个流程:f()流程无相互依赖,但是g()流程有相互依赖,也就是说record1在处理成record1’‘后,record1’‘会被保存到内存中,直到record2’‘和record3’'被处理完成。
  • 第三个流程:同理,在record1,record2和record3都被算出之后,才能执行f(),而在执行g()时,record1’,record2’和record3’才不会相互依赖。
  • 第四个流程:无法进行流水线处理,每处理完一个操作,才能回收该操作的输入结果。

4.根据sparkUI了解Spark执行计划

4.1查看job和stage

在spark的首界面可以看到当前正在执行的job:
在这里插入图片描述
点击job的链接,可以看到当前job中的stage数量:
在这里插入图片描述
其中stage 0包含3个task,共Shuffle Write了376.0B,stage 1包含4个task,共Shuffle Write了988.0B,而stage 2包含3个task,一共Shuffle Read了1364.0B=376.0B+988.0B。

4.2 查看DAG图

将Job链接中界面上的DAG Visualization展开,可以看到正在执行的DAG图:
在这里插入图片描述
每个黑色实心圆圈代表一个RDD,但这个图稍显混乱,stage 0中parallelize操作生成的RDD应该是被stage 2中的partitionBy处理的,与stage 1中的parallelize无关,也就是stage 0到stage 2的横箭头并没有在stage1中作停留生成一个RDD
如果想进一步了解黑色实心圆圈代表哪些RDD,则可以进入stage的UI界面:
在这里插入图片描述
这张图展示了每个操作会生成哪些RDD(如join()操作生成了CoGroupedRDD及两个MapPartitionsRDD),但没有展示stage之间的连接关系。但是没有展示Stage的连接关系。

4.3查看task

在某个stage界面,可以看到该stage的task信息:
在这里插入图片描述
stage 0包含3个task,每个task都进行了Shuffle Write,写入了2~3个record,也就是说Spark UI中也会统计Shuffle Write/Read的record数目。
在这里插入图片描述
stage 1包含4个task,每个task都进行了ShuffleWrite,写入了2个record。
在这里插入图片描述
stage 2包含3个task,每个task从上游的stage 0/1那里Shuffle Read了5~6个record。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/95901.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/95901.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/95901.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

16-docker的容器监控方案-prometheus实战篇

文章目录一.前置知识1.监控与报警2.监控系统的设计3.监控系统的分类二、prometheus概述1.什么是prometheus2.prometheus的历史3.为什么要学习prometheus4.prometheus的使用场景5.prometheus的宏观架构图6.prometheus软件下载地址三、部署prometheus server监控软件1.同步集群时…

集成电路学习:什么是Image Processing图像处理

Image Processing,即图像处理,是计算机视觉、人工智能、多媒体等领域的重要基础。它利用计算机对图像进行分析、加工和处理,以达到预期目的的技术。以下是对图像处理的详细解析: 一、定义与分类 定义: 图像处理是指用计算机对图像进行分析,以达到所需结果的技术,又称…

基于Android的随身小管家APP的设计与实现/基于SSM框架的财务管理系统/android Studio/java/原生开发

基于Android的随身小管家APP的设计与实现/基于SSM框架/android Studio/java/原生开发

Web 开发 16

1 在 JavaScript(包括 JSX)中,函数体的写法和返回值处理在 JavaScript(包括 JSX)中,函数体的写法和返回值处理确实有一些简洁的语法规则,尤其是在箭头函数中。这些规则常常让人混淆,…

超高车辆碰撞预警系统如何帮助提升城市立交隧道安全?

超高车辆带来的安全隐患立交桥和隧道的设计通常基于常规车辆的高度标准。然而,随着重型运输业和超高货车的增加,很多超高车辆会误入这些限高区域,造成潜在的安全隐患。超高车辆与立交桥梁或隧道顶盖发生碰撞时,可能导致结构受损&a…

三种变量类型在局部与全局作用域的区别

一、基本概念作用域(Scope): 全局作用域:定义在所有函数外部的变量或函数,具有文件作用域,生命周期为整个程序运行期间。局部作用域:定义在函数、块(如 {})或类内部的变量…

InfluxDB 数据迁移工具:跨数据库同步方案(二)

六、基于 API 的同步方案实战6.1 API 原理介绍InfluxDB 提供的 HTTP API 是实现数据迁移的重要途径。通过这个 API,我们可以向 InfluxDB 发送 HTTP 请求,以实现数据的读取和写入操作。在数据读取方面,使用GET请求,通过指定数据库名…

JVM安全点轮询汇编函数解析

OpenJDK 17 源码的实现逻辑,handle_polling_page_exception 函数在方法返回时的调用流程如下:调用流程分析:栈水印检查触发跳转:当线程执行方法返回前的安全点轮询时(MacroAssembler::safepoint_poll 中 at_returntrue…

Linux怎么查看服务器开放和启用的端口

在 Linux 系统中,可以通过以下方法查看 服务器开放和启用的端口。以下是详细的步骤和工具,适用于不同场景。1. 使用 ss 查看开放的端口ss 是一个现代化工具,用于显示网络连接和监听的端口。1.1 查看正在监听的端口运行以下命令:ba…

XF 306-2025 阻燃耐火电线电缆检测

近几年随着我国经济快速的发展,电气火灾呈现高发趋势,鉴于电线电缆火灾的危险性,国家制定了阻燃,耐火电线电缆的标准,为企业,建设方,施工方等的生产,选材提供了指引。XF 306-2025 阻…

【Java|第二十篇】面向对象(十)——枚举类

目录 (四)面向对象: 12、枚举类: (1)概述: (2)枚举类的定义格式: (3)编译与反编译: (4)Enum类…

第二十一天-OLED显示实验

一、OLED显示原理1、OLED名词解释OLED可以自发光,无需背光光源。2、正点原子OLED模块模块总体概述模块接口模式选择MCU与模块外部连接8080并口读写过程OLED显存因为要进行显示,所以需要有显存。显存容量为128 x 8 byte,一个点用一位表示。SSD…

会议系统核心流程详解:创建、加入与消息交互

一、系统架构概览 会议系统采用"主进程线程池进程池"的分层架构,实现高并发与业务隔离: #mermaid-svg-fDJ5Ja5L3rqPkby0 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-fDJ5Ja5L3r…

Spring 创建 Bean 的 8 种主要方式

Spring(尤其是 Spring Boot)提供了多种方式来让容器创建和管理 Bean。Component、Configuration Bean、EnableConfigurationProperties 都是常见方式。 下面我为你系统地梳理 Spring 创建 Bean 的所有主要方式,并说明它们的使用场景和区别。…

React 第七十节 Router中matchRoutes的使用详解及注意事项

前言 matchRoutes 是 React Router v6 提供的一个核心工具函数,主要用于匹配路由配置与当前路径。它在服务端渲染(SSR)、数据预加载、权限校验等场景中非常实用。下面详细解析其用法、注意事项和案例分析: 1、基本用法 import { m…

iSCSI服务配置全指南(含服务器与客户端)

iSCSI服务配置全指南(含服务器与客户端)一、iSCSI简介 1. 概念 互联网小型计算机系统接口(Internet Small Computer System Interface,简称iSCSI)是一种基于TCP/IP的协议,其核心功能是通过IP网络仿真SCSI高…

堆(Heap):高效的优先级队列实现

什么是堆?堆是一种特殊的完全二叉树,满足以下性质:堆序性:每个节点的值与其子节点满足特定关系最小堆:父节点 ≤ 子节点(根最小)最大堆:父节点 ≥ 子节点(根最大&#xf…

朝花夕拾(四) --------python中的os库全指南

目录 Python os模块完全指南:从基础到高阶文件操作 1. 引言:为什么需要os模块? 1.1 os模块的重要性 1.2 适用场景 1.3 os模块的"瑞士军刀"特性 2. os模块基础功能 2.1 文件与目录操作 2.1.1 核心方法介绍 2.1.2 避坑指南 …

uniappx 安卓端本地打包的一些总结

本人之前没用过android studio,因为有打包到安卓端的需求,所以有了这篇文章。下面一些内容不正常工作,也不报错,是很烦的,根本不知道是哪里出了问题。比如对应的aar包没有引入。或者没有注册信息。 在实现过程中我遇到…

AUTOSAR进阶图解==>AUTOSAR_SWS_UDPNetworkManagement

AUTOSAR UDP网络管理详解 基于AUTOSAR标准的UDP网络管理模块架构分析与实现指南目录 1. 概述2. UDP网络管理架构 2.1 整体架构图2.2 架构组件详解 3. UDP网络管理状态机 3.1 状态机图3.2 状态详解 4. UDP网络管理操作序列 4.1 序列图4.2 操作流程详解 5. UDP网络管理配置模型 …