Spark Core

概念

前言

批处理(有界数据)

​ 对静态的、有限的数据集进行一次性处理,数据通常按固定周期(如每小时、每天)收集后统一计算。

特点:

  • 高吞吐量,适合大规模数据。
  • 高延迟(数据需积累到一定量才触发计算)。
  • 典型场景:离线报表生成、历史数据分析(如T+1的统计报表)。
流处理(无界数据)

​ 对动态的、无限的数据流进行实时处理,数据产生后立即处理(逐条或微批次)。

特点:

  • 低延迟(毫秒级到秒级响应)。
  • 处理状态化(如窗口聚合、事件时间处理)。
  • 典型场景:实时监控、欺诈检测、实时推荐。
流批一体

​ 用同一套API或框架同时处理批数据和流数据,底层实现逻辑统一(如将批视为流的特例)。

特点:

  • 开发效率高:无需维护两套代码。
  • 一致性保证:流和批的结果逻辑相同(如实时和离线统计口径一致)。

简介

​ 是专门为大规模数据处理而设计的快速通用的计算引擎,是一种类似 Hadoop MapReduce 的通用并行计算框架。

和MapReduce的对比

MapReduceSpark
编程模型Map 和 Reduce不局限于 Map 和 Reduce,还提供多种数据集操作类型
运算效率每次迭代都要向磁盘写入和读取,中间数据 I/O 开销大,效率低中间结果直接存放到内存,更高的迭代运算效率
调度机制N/A基于 DAG 的任务调度,执行机制更优

模块

在这里插入图片描述

模块作用
Spark Core最基础与最核心的功能。用于离线数据处理,批量计算。
Spark SQL交互式查询。用于操作结构化数据,通过 SQL 或者 Hive 的 HQL 来查询数据。
Spark Streaming准实时流式计算。
Spark MLlib机器学习。
Spark GraphX图计算。

简单入门

创建MAVEN项目,并引入依赖

<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-yarn_2.12</artifactId><version>3.3.2</version><scope>provided</scope></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.12</artifactId><version>2.7.0</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.12</artifactId><version>2.7.0</version></dependency></dependencies>

创建Scala单例对象,并实现

package com.mrhelloworld.wordcountimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount01Demo {def main(args: Array[String]): Unit = {// ==================== 建立连接 ====================// 初始化配置对象val conf = new SparkConf()// 设置运行模式与 AppNameconf.setMaster("local").setAppName("WordCount")// 根据配置对象初始化上下文对象val sc = new SparkContext(conf)// ==================== 业务处理 ====================// 读取文件,按行读取val lines: RDD[String] = sc.textFile("data/wordcount")// 按空格拆分每一行数据,拆分为一个一个的单词val words: RDD[String] = lines.flatMap(w => w.split("\\s+"))// 将数据根据单词进行分组,便于统计val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(w => w)// 对分组后的数据进行统计val wordCount: RDD[(String, Int)] = wordGroup.map(kv => (kv._1, kv._2.size))// 从结果集中获取指定条数的数据val wordCountTopN: Array[(String, Int)] = wordCount.take(10)// 将结果打印在控制台wordCountTopN.foreach(println)// 简写方式lines.flatMap(_.split("\\s+")).groupBy(w => w).map(kv => (kv._1, kv._2.size)).take(10).foreach(println)// ==================== 关闭连接 ====================if (!sc.isStopped) sc.stop()}
}

运行架构

主要角色

  • 资源层面的主从
    • Master,负责管理与分配整个集群中的资源(CPU Core 和 Memory)
    • Worker,负责接收资源并执行作业中的任务
  • 作业层面的主从
    • Driver,负责管理整个集群中的作业任务调度
    • Executor,负责执行具体的任务

注意:无论什么运行模式都会存在这些角色,只是在不同的运行模式下,这些角色的分布会有所不同

通用运行架构

在这里插入图片描述

注意:一个App -> 一个或多个Job -> 一个或多个Stage -**> ** 一个或多个Task:Task是由分区数决定的

运行流程

  • 启动集群后,Worker 节点会向 Master 节点心跳汇报资源(CPU Core 和 Memory)情况;
  • Client 提交 Application,根据不同的运行模式在不同的位置创建 Driver 进程;
  • SparkContext 连接到 Master,向 Master 注册应用并申请资源(Executor 的 CPU Core 和 Memory);
  • Master 根据 SparkContext 的资源申请并根据 Worker 心跳周期内报告的信息决定在哪个 Worker 上分配资源,也就是Executor;
  • Worker 节点创建 Executor 进程,Executor 向 Driver 进行反向注册;
  • 资源满足后(Executor 注册完毕),SparkContext 解析 Applicaiton 代码,创建 RDD,构建 DAG,并提交给DAGScheduler 分解成 Stage(当碰到 Action 算子时,就会催生 Job,每个 Job 中含有 1 个或多个 Stage),然后将Stage(或者称TaskSet)提交给 TaskScheduler,TaskScheduler 负责将 Task 分配到相应的Worker,最后提交给Executor 执行(发送到 Executor 的线程池中);
  • 每个 Executor 会持有一个线程池,Executor 通过启动多个线程(Task)来对 RDD 的 Partition 进行并行计算,并向SparkContext 报告,直至 Task 完成。

DAG:有向无环图,将作业的调度顺序规定好,不会出现回头执行相同任务或阶段的可能,类似于找一条路线一次发试卷,不要出现走一样的路的情况

运行模式

本地模式

​ Local 模式,就是不需要其他任何节点资源就可以在本地执行 Spark 代码的环境,一般用于教学,调试,演示等。本地模式就是一个独立的进程,通过其内部的多个线程来模拟整个 Spark 运行时环境,每个线程代表一个 Worker。

​ 本地模式,在运行时也有不同的模式:

模式特点
local只有一个工作进程,无并行计算能力
local[N]N 个工作进程,通常设置 N 为机器的 CPU 数量
local[*]工作进程数量等于机器的 CPU 核心数量
local[N, M]指定线程个数以及失败重试次数,失败重试次数为 M(最多重试M-1次)
local[*, M]指定线程个数以及失败重试次数,失败重试次数为 M(最多重试M-1次)
local-cluster
[numSlaves,coresPerSlave,memeoryPerySlave]
本地伪分布式集群

numSlaves :模拟集群的 Worker 节点个数;
coresPerSlave :模拟集群的各个 Worker 节点上的内核数;
memoryPerSlave :模拟集群的各个 Worker 节点上的内存大小,单位 M。

Standalone 独立模式

​ Spark 自身提供的集群模式,也就是所谓的 Standalone 独立模式。Spark 的独立模式体现了经典的 Master-Worker 模式

​ 在搭建时的不同模式:

模式特点
Standalone-SingleMaster 单节点模式,如果 Master 挂了就无法提交应用程序
Standalone-HAMaster 高可用模式。可以使用 FileSystem (文件系统)或 ZooKeeper (分布式协调服务)来实现主备切换
优点
  • 独立模式属于自己独立一套集群(Client/Master/Worker),是 Spark 原生的集群管理器,自带完整的服务,可单独部
    署,无需依赖任何其他资源管理系统
  • 使用 Standalone 可以很方便地搭建一个集群
缺点

​ 资源不利于充分利用

YARN 模式

​ Spark 可以基于 YARN 来计算(将 Spark 应用提交到 YARN 上运行),Spark 客户端直接连接 YARN,不需要额外构建 Spark 集群Spark 中的各个角色运行在 YARN 的容器内部,并组成Spark 集群环境。

​ 在 Spark On Yarn 模式下,Executor 进程名称为 CoarseGrainedExecutorBackend。一个 CoarseGrainedExecutorBackend

有且仅有一个 Executor 对象, 负责将 Task 包装成 taskRunner,并从线程池中抽取一个空闲线程运行 Task,每一个
CoarseGrainedExecutorBackend 能并行运行 Task 的数量取决于分配给它的 CPU 个数。

在这里插入图片描述

Client模式和Cluster模式

​ Client模式和Cluster模式是在上述模式的基础上,运行时的不同模式(不包括普通的本地模式)

模式区别
Client模式客户端在提交任务后,不能离开,并会将最后的结果返回给到客户端,提交任务的节点就是Driver所在的节点
Cluster模式客户端在提交任务后,将直接离开,由Master决定Driver所在节点,Driver管理任务的完成,并将任务结果打印在日志中,结果只能在日志中查看(Yarn中,Driver 在AplicationMaster中)

核心编程

RDD

概念和特性
概念:

​ RDD 是 Resilient Distributed Dataset 的缩写,意思为弹性分布式数据集(一种数据结构),是一个读取分区记录的集合,是 Spark 对需要处理的数据的基本抽象。源码中是一个抽象类,代表一系列弹性的、不可变、可分区、里面元素可并行计算的集合。

注意:RDD中存放的是数据的读取逻辑和数据的计算逻辑,并发给Executor。

特性
  • 弹性
    • 弹性存储:内存与磁盘自动切换;
    • 弹性容错:数据丢失可以自动恢复;
    • 弹性计算:计算出错重试机制;
    • 弹性分片:可根据需求重新分片。
  • 分布式:数据存储在大数据集群不同的节点上;
  • 数据集:RDD 只是封装了计算逻辑,并不保存数据;
  • 数据抽象:RDD 是一个抽象类,需要子类具体实现;
  • 不可变:RDD 封装了计算逻辑,是不可改变的,想要改变,只能产生新的 RDD,在新的 RDD 中封装新的计算逻辑;
  • 可分区:RDD 是一种分布式的数据集,由于数据量很大,因此计算时要被切分并存储在各个结点的分区当中;
  • 并行计算:一个分区对应一个任务。分区是 Spark 计算任务的基本处理单位,决定了并行计算的粒度。
  • 依赖关系:如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来。
  • 惰性执行:Spark 对于 Transformation 转换算子采用惰性计算机制,遇到 Transformation 时并不会立即计算结果,而是要等遇到 Action 行动算子时才会一起执行。
创建RDD
通过集合创建RDD(通过内存)
def main(args: Array[String]): Unit = {// 建立连接val sc = new SparkContext("local[*]", "CreateRDD")// 通过内存创建 RDDval list = List(1, 2, 3, 4, 5)// parallelize 表示并行度,为了方便使用可以调用 makeRDD//val rdd: RDD[Int] = sc.parallelize(list)// 通过源码得知 makeRDD 内部调用了 parallelizeval rdd: RDD[Int] = sc.makeRDD(list)rdd.foreach(println)// 关闭连接if (!sc.isStopped) sc.stop()}
通过文件创建RDD
def main(args: Array[String]): Unit = {// 建立连接val sc = new SparkContext("local[*]", "CreateRDD")// path 可以是具体的文件,也可以是目录,当 path 是目录时,则获取目录下所有文件val rdd01: RDD[String] = sc.textFile("data/test.txt")rdd01.foreach(println)val rdd02: RDD[String] = sc.textFile("data/wordcount")rdd02.foreach(println)// 还支持通配符匹配val rdd03: RDD[String] = sc.textFile("data/wordcount/wd1*.txt")rdd03.foreach(println)// 访问的节点必须为 Active NameNode,也就是说 node01 必须是 Active NameNodeval rdd04: RDD[String] = sc.textFile("hdfs://node02:8020/bd/wordcount")rdd04.foreach(println)// textFile 是按行读取,wholeTextFiles 是按整个文件读取// 返回格式为元组:(文件地址+文件名, 文件内容)val rdd05: RDD[(String, String)] = sc.wholeTextFiles("data/test.txt")rdd05.foreach(println)val rdd06: RDD[(String, String)] = sc.wholeTextFiles("hdfs://node02:8020/bd/wordcount")rdd06.foreach(println)// 关闭连接if (!sc.isStopped) sc.stop()}

分区

​ Spark RDD 是一种分布式的数据集,由于数据量很大,因此计算时要被切分并存储在各个结点的分区当中

在这里插入图片描述

具体分区方法
集合的分区处理makeRDD通过设置numSlices 参数来设置分区数,不传递时将使用默认值defaultParallelism ,该默认值表示将按当前运行环境的最大可用 CPU 核数进行分区
文件的分区处理textFile读取文件数据时,数据会按照 Hadoop 文件读取的规则进行分区
支持重分区的算子\在Spark中有一些算子也可以支持数据的重新分区,详见算子
分区器的分区处理HashPartitioner RangePartitioner等继承Partitioner的自定义类通过继承Partitioner来自己定义实现不同的分区方式

算子

​ 算子的本质就是函数,不同的RDD之间的依赖关系,是一个函数空间到另一个函数空间的映射。

转换算子

​ 转换往往是从一个 RDD 到另一个 RDD 的计算。在执行应用的程序时,遇到转换算子,并不会立即触发计算操作,而是延时到遇到 Action 算子时才会操作。

单 Value的转换算子
算子作用语法例如
map将处理的数据逐条进行映射转换,将返回值构成新的 RDDdef map[U: ClassTag](f: T => U): RDD[U]
mapPartitions以分区为单位进行数据转换操作,将整个分区的数据加载到内存def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
mapPartitionsWithIndexmapPartitionsWithIndex 跟 mapPartitions 差不多,只是参数多了个分区号def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
flatMap首先将函数作用于集合中的每个元素,然后将结果展平,返回新的集合def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
glom将同一个分区的多个单个数据直接转换为相同类型的单个数组进行处理def glom(): RDD[Array[T]]
groupBy(shuffle)将数据按指定条件进行分组,从而方便我们对数据进行统计分析def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
filter指过滤出符合一定条件的元素def filter(f: T => Boolean): RDD[T]
sample是从大量的数据中获取少量的数据def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T]
distinct(shuffle)将数据集中重复的数据去重def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

Shuffer:

sample采样的策略:

  • withReplacement:表示抽出样本数据后是否再放回去,true 表示会放回去,这也就意味着抽出的样本可能有重复;

  • fraction:

    • 抽取不放回的情况下为每个元素被抽取的概率,在 0 ~ 1 之间,e.g. 0.3 表示抽出 30%;1 全取,0 全不取;
    • 抽取放回的情况下为每个元素可能被抽取的次数,e.g. 3 表示可能会抽取 3 次,当然也有可能抽取 1 次或者 2 次,或者 1 次都抽不到或者抽到更多次。
    • seed:表示随机种子可以将这个参数设置为定值。种子相同时,指定相同的 fraction 每次抽样的数据也会相同,如果此时出现数据不相同的情况那就是数据出问题了

coalesce 函数除了可以缩减分区数,还可以扩增分区数,但是扩增分区数时一定要设置 shuffle 为 true

双 Value的转换算子
算子作用语法例如
union对两个 RDD 进行并集(合并)操作,且不去重rdd1.union(rdd2)
intersection表示对两个 RDD 取交集(相同)rdd1.intersection(rdd2)
subtract表示对两个 RDD 取差集(不同)rdd1.subtract(rdd2)
cartesian表示对两个 RDD 进行笛卡尔积操作rdd1.cartesian(rdd2)
zip将两个 RDD 合并成一个 RDD``rdd1.zip(rdd2)`

zip:两个 RDD 的 Partition 数量以及元素数量都必须相同,否则会抛出异常

转换算子(Key-Value)
算子作用语法例如
partitionBy(shuffle)按照指定分区数重新进行分区def partitionBy(partitioner: Partitioner): RDD[(K, V)]mkRDD.partitionBy(new HashPartitioner(2))
sortByKey(shuffle)将 K, V 格式数据的 Key 根据指定的规则进行排序,默认为升序def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]mkRDD.sortByKey()
reduceByKey(shuffle)将相同 Key 的值聚合到一起def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
mkRDD.reduceByKey(_ + _)
groupByKey(shuffle)按 K, V 格式数据的 Key 进行分组,会返回 (K, Iterable[V]) 格式数据def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
mkRDD.groupByKey()
aggregateByKey(shuffle)按 K, V 格式数据的 Key 进行分组,可以实现分区内和分区间不同的计算逻辑def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,<br/> combOp: (U, U) => U): RDD[(K, U)]
mkRDD.aggregateByKey(0)(math.max(_, _), _ + _)
mapValues针对于 K, V 形式的数据只对 V 进行操作def mapValues[U](f: V => U): RDD[(K, U)]aggregateByKeyRDD.mapValues(t => t._1 / t._2)
foldByKey(shuffle)当分区内和分区间计算逻辑相同时使用def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
mkRDD.foldByKey(0)(_ + _)
combineByKey(shuffle)第一个参数是给每个分区的第一个 Key 的 Value 一个初始值规则def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null): RDD[(K, C)]
mkRDD.combineByKey( (_, 1),(t, v) => (t._1 + v, t._2 + 1),(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)
join在类型为 K, V 和 K, W 的 RDD 上调用,返回一个相同 Key 对应的所有元素对在一起的 K, (V, W) 的 RDDdef join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]rdd1.join(rdd2)

分区器Partitioner:

  • HashPartitioner
  • RangePartitioner
  • 继承Partitioner的自定义分区器

五种聚合算子(上表中加粗的算子)最终都会调用到 combineByKeyWithClassTag 这个函数:

在这里插入图片描述

行动算子

​ 一个行动往往代表一种输出到外部系统的操作。在执行应用的程序时,遇到行动算子,会立即产生一个 Job,对已有的 RDD 中的数据执行计算后产生结果,将结果返回 Driver 程序或写入到外部物理存储。

算子作用语法例如
reduce通过函数聚合 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据def reduce(f: (T, T) => T): TmkRDD.reduce(_ + _)
count返回 RDD 中元素的个数def count(): LongmkRDD.count()
take返回 RDD 的前 n 个元素组成的数组def take(num: Int): Array[T]mkRDD.take(3)
takeOrdered返回 RDD 排序后的前 n 个元素组成的数组,默认正序def take(num: Int): Array[T]```mkRDD.takeOrdered(3)(Ordering.Int.reverse)`
first返回 RDD 中的第一个元素,底层就是 take(1)def first(): TmkRDD.first()
foreach循环遍历数据集中的每个元素,运行相应的计算逻辑(函数)def foreach(f: T => Unit): UnitmkRDD.foreach(println)
foreachPartition按分区循环遍历数据集中的每个元素,并运行相应的计算逻辑(函数)def foreachPartition(f: Iterator[T] => Unit): UnitmkRDD.foreachPartition(partitionOfRecords => println(s"分区:${TaskContext.getPartitionId()},记录:${partitionOfRecords.toArray.mkString(",")}"))
collect将不同分区的数据收集到 Driver 并以数组的形式返回数据def collect(): TmkRDD.collect()
aggregate将每个分区里面的元素通过 seqOp 函数和初始值进行聚合,然后用 combOp 函数将每个分区的结果和初始值(zeroValue)进行 combine 操作def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): UmkRDD.aggregate(10)(_ + _, _ + _)
fold当 aggregate 的分区内和分区间计算逻辑相同时,Spark 为了让程序员更方便的使用,提供了 fold 算子def fold(zeroValue: T)(op: (T, T) => T): TmkRDD.fold(10)(_ + _)
countByKey针对 K, V 类型的 RDD,返回一个 K, Int 的 map,表示每一个 Key 对应的元素个数,countByKey 底层调用的是 reduceByKeydef countByKey(): Map[K, Long] = self.withScope {self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap}mkRDD.map(_ -> 1).countByKey()
countByValue针对 K, V 类型的 RDD,返回一个 K, Int 的 map,表示每一个 Key 对应的元素个数countByValue 底层调用的是map().countByKey()def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {map(value => (value, null)).countByKey()}mkRDD.countByValue()

save 系列:

  • saveAsTextFile:将数据集的元素以 TextFile 的形式保存到 HDFS 文件系统或者其他文件系统
  • saveAsSequenceFile:将数据集中的元素以 Hadoop SequenceFile 的格式保存到 HDFS 文件系统或者其他文件系统
  • saveAsObjectFile:将 RDD 中的元素序列化成对象,存储到文件中
控制算子

​ 可以将 RDD 持久化,持久化的单位是 Partition。

算子作用语法例如
cache保存到内存,效率高,数据不安全容易丢失def cache(): TmkRDD.cache()
persist保存到磁盘(临时文件,作业结束后会删除),效率低,数据安全def cache(StorageLevel.xxx): TmkRDD.persist(StorageLevel.MEMORY_AND_DISK)
checkpoint保存到磁盘(永久保存,一般存储在分布式文件系统中,例如 HDFS),效率低,数据安全\核心代码:sparkContext.setCheckpointDir("hdfs://node02:8020/yjx/cp")<br />rdd3.checkpoint()

注意:

  • 当我们没有对RDD持久化及没有使用控制算子时,我们使用行动算子时会自动从数据源开始再进行一次全部逻辑
  • checkpoint 可以理解为改变了数据源,因为关键数据已经计算完成,没有必要重头进行读取,所以 checkpoint
    算子不仅能将 RDD 持久化到磁盘,还能切断 RDD 之间的依赖关系

闭包检测

闭包的概念

​ Spark Job 产生的任务会并行执行,也就是说会分发给多个 Executor 去执行。那么从计算的角度,算子以外的代码都是在 Driver 端执行,算子里面的代码都是在 Executor 端执行,这样就会导致算子内可能会用到算子外的
数据,就会形成闭包的效果。

​ 而算子外的数据想要被 Executor 端的任务使用,就必须先序列化然后通过网络 IO 进行传输,此时如果算子外的数据

无法序列化,就会发生错误。所以为了降低程序出错的可能性,Spark 的算子增加了闭包检查功能,会在执行计算任务
前,检测闭包内的对象是否可以进行序列化。

常见问题

在这里插入图片描述

算子外的数据无法序列化会出现以上问题

解决方案
  • 继承 scala.Serializable
  • 使用样例类(推荐),样例类默认继承了 Serializable
Kryo 序列化框架
  • Spark 内部默认使用的序列化框架 Kryo。
  • 由于 Java 自身的序列化比较重(字节多),所以出于性能的考虑,Spark 2.0 开始支持另外一种序列化机制 Kryo。
  • Kryo 的速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark
    内部使用 Kryo 来序列化。
  • 使用 Kryo 序列化,也需要继承 Serializable 接口。
  • 配置 Kryo 序列化方式如下:
val conf: SparkConf = new SparkConf()
.setMaster("local[*]").setAppName("SerializDemo")
// 设置 Kryo 序列化器
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册 Kryo 序列化器
.registerKryoClasses(Array(classOf[User]))

血统和依赖关系

血统

​ Spark 根据用户 Application 中的 RDD 的转换算子和行动算子,会生成 RDD 之间的依赖关系,多个 RDD 之间的关系又

形成一条关系链叫做 RDD 的血统(Lineage)。

依赖关系
窄依赖(Narrow Dependency)

​ 窄依赖指的是父 RDD 和子 RDD 的 Partition 之间的关系是一对一的(独生子女)。

宽依赖(Wide Dependency)

​ 宽依赖指的是父 RDD 与子 RDD 的 Partition 之间的关系是一对多(多胎),宽依赖会有 Shuffle 的产生。

通常情况下,产生了Shuffer就是宽依赖。

Stage阶段

· 每个 Job 会被拆分成多个 Task,作为一个 TaskSet 任务集,其名称为 Stage,Stage 的划分和调度是由 DAGScheduler
来负责的。Stage 的切割规则为:从后往前,遇到宽依赖就切割 Stage 。

在这里插入图片描述

​ 总结:

  • 从后向前推理,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到 Stage 中;
  • 默认情况下每个 Stage 里面的 Task 的数量是由该 Stage 中最后一个 RDD 的 Partition 数量决定的(一个 Partition 对应一
    个 Task);
  • 最后一个 Stage(ResultStage) 里面的任务类型是 ResultTask,前面所有其他 Stage(ShuffleMapStage) 里面的任务类型都是
    ShuffleMapTask;
  • 代表当前 Stage 的算子一定是该 Stage 的最后一个计算步骤。

Job作业

​ Job 包含了多个 Task 组成的并行计算,往往由 Spark Action 算子触发生成, 一个 Application 中往往会产生多个

Job。一个Job 包含 N 个 Transformation 算子和 1 个 Action 算子。

总结:

  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job 是 Application 的子集,以 Spark Action 算子为界,遇到一个 Action 算子就触发一个 Job;
  • Stage 是 Job 的子集,以 RDD 宽依赖(即 Shuffle)为界,遇到 Shuffle 就做一次划分;
  • Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,就有多少个 Task。

提示:Spark 中所谓的并行度是指 RDD 中的分区数,即 RDD 中的 Task 数。
注意:Application → Job → Stage → Task,每一层都是 1 对 N 的关系。

数据本地化

数据本地化级别:

  • PROCESS_LOCAL进程本地化,性能最好。指代码和数据在同一个进程中,也就是同一个 Executor 中;计算数据的
    Task 由 Executor 执行,此时数据在 Executor 的 BlockManager 中;
  • NODE_LOCAL节点本地化。代码和数据在同一个节点中,数据存储在节点的 HDFS Block,Task 在节点的某个
    Executror 执行;或者数据和 Task 在同一个节点不同的 Executor 中,数据需要跨进程传输;
  • NO_PREF :没有最佳位置这一说,数据从哪里访问都一样,不需要位置优先,比如 SparkSQL 直接读取 MySQL;
  • RACK_LOCAL机架本地化。数据和 Task 在一个机架的两个节点上,数据需要通过网络在节点之间进行传输;
  • ANY :数据和 Task 可能在集群中的任何地方,而且不在一个机架中,性能最差

Process Locality:相关参数 spark.locality.wait 默认值是 3s。Task 任务分配的时候,先是按照 PROCESS_LOCAL
方式去分配 Task,如果 PROCESS_LOCAL 不满足,默认等待 3 秒,看能不能按照这个级别去分配,如果等了 3 秒也实现不
了,那么就按 NODE_LOCAL 级别去分配 ,以此类推。

// 全局设置,默认 3s
spark.locality.wait=3s
// 建议 60s
spark.locality.wait.process=60s
// 建议 30s
spark.locality.wait.node=30s
// 建议 20s
spark.locality.wait.rack=20s

广播变量(分布式共享只读变量)

​ Spark 提供了广播变量,一种分布式共享只读变量,使用了广播变量存储 regex 后,Executor 端

就只会存储一份该数据供多个 Task 使用,为了防止数据被修改,所以是只读变量。

在这里插入图片描述

工作流程
  • 广播变量初始的时候在 Drvier 端会有一份副本
  • Task 在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的 Executor 对应的 BlockManager 中,尝试获取变量副本
  • 如果本地没有,那么就从 Driver 端远程拉取变量副本,并保存在本地的 Executor 对应的 BlockManager 中
  • 此后这个 Executor 上的其他 Task 都会直接使用本地的BlockManager 中的副本
  • Executor 的 BlockManager 除了从 Driver 端上拉取,也能从其他节点的 BlockManager 上拉取变量副本,距离越近越好
优点
  • 不是每个 Task 一份变量副本,而是每个节点的 Executor 一份副本,可以让变量产生的副本数大大减少
  • 变量一旦被定义为一个广播变量,那么这个变量只能读取,不能修改

注意:

  • 不能将RDD广播出去, RDD 是不存储数据的。可以将 RDD 的结果广播出去
  • 广播变量只能在 Driver 端定义,不能在 Executor 端定义。
  • 在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值
  • 如果 Executor 端用到了 Driver 端的变量,不使用广播变量,在 Executor 有多少 Task 就有多少 Driver 端的变量副本
  • 如果 Executor 端用到了 Driver 端的变量,使用广播变量,在每个 Executor 中只有一份 Driver 端的变量副本

累加器(分布式共享只写变量)

​ 累加器是 Spark 提供的一种分布式共享只写变量,主要用于在分布式计算中高效地执行全局聚合操作。

在这里插入图片描述

累加器在 Driver 端定义并赋初始值,累加器只能在 Driver 端读取最后的值,在 Excutor 端更新。

自定义累加器

​ Spark 还支持自定义累加器,只需要继承 AccumulatorV2 即可。

简单示例:

package com.yjxxt.accumulator
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
object WordCountAccumulatorDemo {def main(args: Array[String]): Unit = {// 建立连接val sc = new SparkContext("local[*]", "OnlineCountDemo")// 初始化自定义累加器val wordCountAccumulator = new WordCountAccumulator// 注册累加器sc.register(wordCountAccumulator, "wordCountAccumulator")// 创建 RDDval rdd: RDD[String] = sc.textFile("data/wordcount", 2)// 自定义累加器 实现 WordCountrdd.flatMap(_.split("\\s+")).foreach(wordCountAccumulator.add)// 获取累加器println(wordCountAccumulator.value)// 关闭连接if (!sc.isStopped) sc.stop()}/*** 自定义累加器*/class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {// 定义可变 Map 存放 Word 和 Count(这个就是自定义的累加器)private var wd = mutable.Map[String, Long]()// 累加器是否为零(空)override def isZero: Boolean = this.wd.isEmpty// 拷贝新的累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new WordCountAccumulator// 重置累加器override def reset(): Unit = this.wd.clear()// 累加器相加override def add(word: String): Unit = {val newCount = wd.getOrElse(word, 0L) + 1Lwd.update(word, newCount)}// 累加器合并override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val m1 = this.wdval m2 = other.valuem2.foreach {case (word, count) => {val newCount = m1.getOrElse(word, 0L) + countm1.update(word, newCount)}}}// 获取累加器![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/cebec725e8ed4da0b5c31eecbd6f0da3.png#pic_center)
override def value: mutable.Map[String, Long] = wd}
}

内存管理 (同一内存管理)

		储存内存和执行内存**共享同一块空间**,可以动态**借用对方的空闲区域**。

内存分配

在这里插入图片描述

其中最重要的优化在于动态借用机制,其规则如下:

  • 设定基本的存储内存大小和执行内存大小( spark.memory.storageFraction 参数,默认为 0.5,即各占一半),
    该设定确定了双方各自拥有的空间范围;
  • 双方空间都不足时,则存储到硬盘;若己方空间不足而对方空间空余时,可借用对方的空间(存储空间不足是指不足
    以放下一个完整的 Block);
  • 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后“归还”借用的空间;
  • 存储内存的空间被对方占用后,无法让对方“归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。

内存估算

​ 内存具体估算公式如下:

  • 估算 Other 内存 = 自定义数据结构大小 * 每个 Executor 核数;
  • 估算 Storage 内存 = 广播变量数据大小 + Cache / Executor 数量。例如 Cache 了 100G 数据,10 个 Executor,那么每个
    Executor 分别存储 10G 数据。在 Spark WEBUI 的 Storage 标签页可以直接查看所有的内存占用,大致就对应 Storage
    Memory。
  • 估算 Execution 内存 = 知道了 Storage Memory,Execution Memory 也就知道了,因为默认情况下 Execution Memory 和
    Storage Memory 占用 Spark Memory 的比例是相同的。
  • 估算 Executor 内存 = 每个 Executor 核数 *(数据集大小 / 并行度)。例如 100G 数据,并行度 200(Task),单个并行
    度的数据量为 100G / 200 = 500M。然后 executor-cores 为 4,最终 Executor 内存最少需要 2G 内存。
  • Task 被执行的并行度 = Executor 个数 * 每个 Executor 的 Core 核数。

内存配置

​ 一般情况下,各个区域的内存比例保持默认值即可。如果需要更加精确的控制内存分配,可以按照如下思路:

  • Unified 统一内存比例 spark.memory.fraction = (估算 Storage 内存 + 估算 Execution 内存)/(估算 Storage 内存 + 估算
    Execution 内存 + 估算 Other 内存)。
  • Storage/Execution 内存比例 spark.memory.storageFraction = (估算 Storage 内存)/(估算 Storage 内存 + 估算
    Execution 内存)。

配置参数说明:

  • Unified 统一内存比例:spark.memory.fraction,默认为 0.6;
  • Storage/Execution 内存比例:spark.memory.storageFraction,默认为 0.5。

​ 代入公式计算:

  • Storage 内存 = (spark.executor.memory - 300MB) * spark.memory.fraction * spark.memory.storageFraction
  • Execution 内存 = (spark.executor.memdry - 300MB) * spark.memory.fraction * (1 - spark.memory.storageFraction)
    4,最终 Executor 内存最少需要 2G 内存。
  • Task 被执行的并行度 = Executor 个数 * 每个 Executor 的 Core 核数。

内存配置

​ 一般情况下,各个区域的内存比例保持默认值即可。如果需要更加精确的控制内存分配,可以按照如下思路:

  • Unified 统一内存比例 spark.memory.fraction = (估算 Storage 内存 + 估算 Execution 内存)/(估算 Storage 内存 + 估算
    Execution 内存 + 估算 Other 内存)。
  • Storage/Execution 内存比例 spark.memory.storageFraction = (估算 Storage 内存)/(估算 Storage 内存 + 估算
    Execution 内存)。

配置参数说明:

  • Unified 统一内存比例:spark.memory.fraction,默认为 0.6;
  • Storage/Execution 内存比例:spark.memory.storageFraction,默认为 0.5。

​ 代入公式计算:

  • Storage 内存 = (spark.executor.memory - 300MB) * spark.memory.fraction * spark.memory.storageFraction
  • Execution 内存 = (spark.executor.memdry - 300MB) * spark.memory.fraction * (1 - spark.memory.storageFraction)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/92952.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/92952.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/92952.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VRRP技术

VRRP的概念及应用场景 VRRP&#xff08;虚拟路由冗余协议&#xff09;概念 VRRP&#xff08;Virtual Router Redundancy Protocol&#xff0c;虚拟路由冗余协议&#xff09;是一种路由容错协议&#xff0c;用于在多个路由器之间提供网关冗余&#xff0c;确保当主路由器故障时&a…

表驱动法-灵活编程范式

表驱动法&#xff1a;从理论到实践的灵活编程范式 一、为什么需要表驱动法&#xff1f; 在处理多分支逻辑&#xff08;如消息解析、命令分发&#xff09;时&#xff0c;传统的 if-else 或 switch-case 存在明显局限&#xff1a; 当分支数量庞大&#xff08;如成百上千条命令&am…

零基础-动手学深度学习-10.2. 注意力汇聚:Nadaraya-Watson 核回归

上节介绍了框架下的注意力机制的主要成分 图10.1.3&#xff1a; 查询&#xff08;自主提示&#xff09;和键&#xff08;非自主提示&#xff09;之间的交互形成了注意力汇聚&#xff1b; 注意力汇聚有选择地聚合了值&#xff08;感官输入&#xff09;以生成最终的输出。 本节将…

nginx高新能web服务器

一、Nginx 概述和安装 Nginx是免费的、开源的、高性能的HTTP和反向代理服务器、邮件代理服务器、以及TCP/UDP代理服务器。 Nginx 功能介绍 静态的web资源服务器html&#xff0c;图片&#xff0c;js&#xff0c;css&#xff0c;txt等静态资源 http/https协议的反向代理 结合F…

Unity大型场景性能优化全攻略:PC与安卓端深度实践 - 场景管理、渲染优化、资源调度 C#

本文将深入探讨Unity在大型场景中的性能优化策略&#xff0c;涵盖场景管理、渲染优化、资源调度等核心内容&#xff0c;并提供针对PC和安卓平台的优化方案及实战案例。 提示&#xff1a;内容纯个人编写&#xff0c;欢迎评论点赞。 文章目录1. 大型场景性能挑战1.1 性能瓶颈定位…

Java集合框架、Collection体系的单列集合

Java集合框架、Collection1. 认识Java集合框架及结构1.1 集合框架整体结构1.2 集合框架的核心作用2. Collection的两大常用集合体系及各个系列集合的特点2.1 List系列集合&#xff08;有序、可重复&#xff09;2.2 Set系列集合&#xff08;无序、不可重复&#xff09;3. Collec…

HTML <picture> 元素:让图片根据设备 “智能切换” 的响应式方案

在响应式设计中&#xff0c;图片适配是一个绕不开的难题&#xff1a;同一张高清图片在大屏设备上清晰美观&#xff0c;但在小屏手机上可能加载缓慢&#xff1b;而适合手机的小图在桌面端又会模糊失真。传统的解决方案往往需要用JavaScript判断设备尺寸并动态替换图片源&#xf…

Spring Boot 监控与日志管理实战

在 Spring Boot 应用开发中&#xff0c;指标监控和日志管理是保障应用稳定运行的核心环节。指标监控能实时掌握应用健康状态、性能瓶颈&#xff0c;日志管理则用于问题排查和安全审计。本文基于 Spring Boot 提供的 Actuator 监控工具、Spring Boot Admin 可视化平台&#xff0…

【排序算法】②希尔排序

系列文章目录 第一篇&#xff1a;【排序算法】①直接插入排序-CSDN博客 第二篇&#xff1a;【排序算法】②希尔排序-CSDN博客 第三篇&#xff1a;【排序算法】③直接选择排序-CSDN博客 第四篇&#xff1a;【排序算法】④堆排序-CSDN博客 第五篇&#xff1a;【排序算法】⑤冒…

Linux Shell为文件添加BOM并自动转换为unix格式

1.添加并查看BOM添加bomvim -c "set bomb|set fileencodingutf-8|wq" ./gradlew查看bomhead -c 3 ./gradlew | hexdump -C2.安装dos2unix并转换为unix格式安装sudo apt install dos2unix转换dos2unix ./gradlew

华清远见25072班C语言学习day5

重点内容&#xff1a;数组&#xff1a;为什么有数组&#xff1f;为了便于存储多个数据特点&#xff1a;连续存储多个同种数据类型元素(连续指内存地址连续)数组名&#xff1a;数组中首元素的地址&#xff0c;是一个地址常量。一维整形数组&#xff1a;定义&#xff1a;数据类型…

安全守护,温情陪伴 — 智慧养老产品上新

- 养老智慧看护终端接入萤石开放平台 - 在2025 ECDC萤石云开发者大会&#xff0c;萤石产品经理已经介绍了基于萤石云服务AI能力适老化设备的养老智能能力开放。 而今天&#xff0c;养老智慧看护终端再升级&#xff0c;集成跌倒检测、物理隐私遮蔽、火柴人遮蔽、AI语音智能体…

鸿蒙flutter项目接入极光推送

推送的自分类权益 需要审核15个工作日&#xff0c;实际约3个工作日 项目使用极光推送flutter代码&#xff0c;代码端已经配置的东西&#xff08;需要配置flutter端和对应各自平台原生端&#xff09;&#xff0c;我的工程是多target&#xff0c;所以和单target有一点不同。 一、…

2025牛客多校第八场 根号-2进制 个人题解

J.根号-2进制 #数学 #FFT 思路 赛后发现身边的同学都是通过借位来解决进位问题的&#xff0c;在此提供一种全程不出现减法的顺推做法 首先A,BA,BA,B可以理解为两个多项式&#xff1a;A0A1−2A2(−2)2…A_{0}A_{1}\sqrt{ -2 }A_{2}(\sqrt{ -2 })^2\dotsA0​A1​−2​A2​(−…

DataEase官方出品丨SQLBot:基于大模型和RAG的智能问数系统

2025年8月7日&#xff0c;DataEase开源项目组发布SQLBot开源项目&#xff08;github.com/dataease/SQLBot&#xff09;。SQLBot是一款基于大语言模型&#xff08;Large Language Model&#xff0c;LLM&#xff09;和RAG&#xff08;Retrieval Augmented Generation&#xff0c;…

第十四节 代理模式

在代理模式&#xff08;Proxy Pattern&#xff09;中&#xff0c;一个类代表另一个类的功能。这种类型的设计模式属于结构型模式。在代理模式中&#xff0c;我们创建具有现有对象的对象&#xff0c;以便向外界提供功能接口。介绍意图&#xff1a;为其他对象提供一种代理以控制对…

训推一体 | 暴雨X8848 G6服务器 x Intel®Gaudi® 2E AI加速卡

近日&#xff0c;暴雨信息携手英特尔&#xff0c;针对Gaudi 2E AI加速器HL-288 PCIe卡&#xff08;简称IntelGaudi 2E PCIe卡&#xff0c;下同&#xff09;完成专项调优与适配工作&#xff0c;并重磅推出Intel Eagle Stream平台4U8卡解决方案。该方案通过软硬件协同优化&#x…

GB17761-2024标准与电动自行车防火安全的技术革新

随着我国电动自行车保有量突破3.5亿辆&#xff0c;这一便捷的交通工具已成为城市出行的重要组成。然而&#xff0c;伴随市场规模扩大而来的是日益突出的安全问题——2023年全国电动自行车火灾事故高达2.5万起&#xff0c;年均增长率约20%&#xff0c;火灾中塑料件加速燃烧并释放…

利用容器编排完成haproxy和nginx负载均衡架构实施

1 创建测试目录和文件[rootdocker-a ~]# mkdir lee [rootdocker-a ~]# cd lee/ [rootdocker-a lee]# touch docker-compose.yml # 容器编排工具Docker Compose 默认识别docker-compose.yml文件2 编写docker-compose.yml文件和haproxy.cfg文件2.1 核心配置说明2.1.1 服务结构共定…

WinRAR v7.13 烈火汉化稳定版,解压缩全格式专家

[软件名称]: WinRAR v7.13 烈火汉化稳定版 [软件大小]: 3.8 MB [下载通道]: 夸克盘 | 迅雷盘 软件介绍 WinRAR 压缩文件管理器&#xff0c;知名解压缩软件&#xff0c;电脑装机必备软件&#xff0c;国内最流行最好用的压缩文件管理器、解压缩必备软件。它提供 RAR 和 ZIP 文…