Spark Core
概念
前言
批处理(有界数据)
对静态的、有限的数据集进行一次性处理,数据通常按固定周期(如每小时、每天)收集后统一计算。
特点:
- 高吞吐量,适合大规模数据。
- 高延迟(数据需积累到一定量才触发计算)。
- 典型场景:离线报表生成、历史数据分析(如T+1的统计报表)。
流处理(无界数据)
对动态的、无限的数据流进行实时处理,数据产生后立即处理(逐条或微批次)。
特点:
- 低延迟(毫秒级到秒级响应)。
- 处理状态化(如窗口聚合、事件时间处理)。
- 典型场景:实时监控、欺诈检测、实时推荐。
流批一体
用同一套API或框架同时处理批数据和流数据,底层实现逻辑统一(如将批视为流的特例)。
特点:
- 开发效率高:无需维护两套代码。
- 一致性保证:流和批的结果逻辑相同(如实时和离线统计口径一致)。
简介
是专门为大规模数据处理而设计的快速通用的计算引擎,是一种类似 Hadoop MapReduce
的通用并行计算框架。
和MapReduce的对比
MapReduce | Spark | |
---|---|---|
编程模型 | Map 和 Reduce | 不局限于 Map 和 Reduce,还提供多种数据集操作类型 |
运算效率 | 每次迭代都要向磁盘写入和读取,中间数据 I/O 开销大,效率低 | 中间结果直接存放到内存,更高的迭代运算效率 |
调度机制 | N/A | 基于 DAG 的任务调度,执行机制更优 |
模块
模块 | 作用 |
---|---|
Spark Core | 最基础与最核心的功能。用于离线数据处理,批量计算。 |
Spark SQL | 交互式查询。用于操作结构化数据,通过 SQL 或者 Hive 的 HQL 来查询数据。 |
Spark Streaming | 准实时流式计算。 |
Spark MLlib | 机器学习。 |
Spark GraphX | 图计算。 |
简单入门
创建MAVEN项目,并引入依赖
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-yarn_2.12</artifactId><version>3.3.2</version><scope>provided</scope></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.12</artifactId><version>2.7.0</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.12</artifactId><version>2.7.0</version></dependency></dependencies>
创建Scala单例对象,并实现
package com.mrhelloworld.wordcountimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount01Demo {def main(args: Array[String]): Unit = {// ==================== 建立连接 ====================// 初始化配置对象val conf = new SparkConf()// 设置运行模式与 AppNameconf.setMaster("local").setAppName("WordCount")// 根据配置对象初始化上下文对象val sc = new SparkContext(conf)// ==================== 业务处理 ====================// 读取文件,按行读取val lines: RDD[String] = sc.textFile("data/wordcount")// 按空格拆分每一行数据,拆分为一个一个的单词val words: RDD[String] = lines.flatMap(w => w.split("\\s+"))// 将数据根据单词进行分组,便于统计val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(w => w)// 对分组后的数据进行统计val wordCount: RDD[(String, Int)] = wordGroup.map(kv => (kv._1, kv._2.size))// 从结果集中获取指定条数的数据val wordCountTopN: Array[(String, Int)] = wordCount.take(10)// 将结果打印在控制台wordCountTopN.foreach(println)// 简写方式lines.flatMap(_.split("\\s+")).groupBy(w => w).map(kv => (kv._1, kv._2.size)).take(10).foreach(println)// ==================== 关闭连接 ====================if (!sc.isStopped) sc.stop()}
}
运行架构
主要角色
- 资源层面的主从
- Master,负责管理与分配整个集群中的资源(CPU Core 和 Memory)
- Worker,负责接收资源并执行作业中的任务
- 作业层面的主从
- Driver,负责管理整个集群中的作业任务调度
- Executor,负责执行具体的任务
注意:无论什么运行模式都会存在这些角色,只是在不同的运行模式下,这些角色的分布会有所不同
通用运行架构
注意:一个App -> 一个或多个Job -> 一个或多个Stage -**> ** 一个或多个Task:Task是由分区数决定的
运行流程
- 启动集群后,Worker 节点会向 Master 节点心跳汇报资源(CPU Core 和 Memory)情况;
- Client 提交 Application,根据不同的运行模式在不同的位置创建 Driver 进程;
- SparkContext 连接到 Master,向 Master 注册应用并申请资源(Executor 的 CPU Core 和 Memory);
- Master 根据 SparkContext 的资源申请并根据 Worker 心跳周期内报告的信息决定在哪个 Worker 上分配资源,也就是Executor;
- Worker 节点创建 Executor 进程,Executor 向 Driver 进行反向注册;
- 资源满足后(Executor 注册完毕),SparkContext 解析 Applicaiton 代码,创建 RDD,构建 DAG,并提交给DAGScheduler 分解成 Stage(当碰到 Action 算子时,就会催生 Job,每个 Job 中含有 1 个或多个 Stage),然后将Stage(或者称TaskSet)提交给 TaskScheduler,TaskScheduler 负责将 Task 分配到相应的Worker,最后提交给Executor 执行(发送到 Executor 的线程池中);
- 每个 Executor 会持有一个线程池,Executor 通过启动多个线程(Task)来对 RDD 的 Partition 进行并行计算,并向SparkContext 报告,直至 Task 完成。
DAG:有向无环图,将作业的调度顺序规定好,不会出现回头执行相同任务或阶段的可能,类似于找一条路线一次发试卷,不要出现走一样的路的情况
运行模式
本地模式
Local 模式,就是不需要其他任何节点资源就可以在本地执行 Spark 代码的环境,一般用于教学,调试,演示等。本地模式就是一个独立的进程,通过其内部的多个线程来模拟整个 Spark 运行时环境,每个线程代表一个 Worker。
本地模式,在运行时也有不同的模式:
模式 | 特点 |
---|---|
local | 只有一个工作进程,无并行计算能力 |
local[N] | N 个工作进程,通常设置 N 为机器的 CPU 数量 |
local[*] | 工作进程数量等于机器的 CPU 核心数量 |
local[N, M] | 指定线程个数以及失败重试次数,失败重试次数为 M(最多重试M-1次) |
local[*, M] | 指定线程个数以及失败重试次数,失败重试次数为 M(最多重试M-1次) |
local-cluster [numSlaves,coresPerSlave,memeoryPerySlave] | 本地伪分布式集群 |
numSlaves :模拟集群的 Worker 节点个数;
coresPerSlave :模拟集群的各个 Worker 节点上的内核数;
memoryPerSlave :模拟集群的各个 Worker 节点上的内存大小,单位 M。
Standalone 独立模式
Spark 自身提供的集群模式,也就是所谓的 Standalone 独立模式。Spark 的独立模式体现了经典的 Master-Worker 模式
在搭建时的不同模式:
模式 | 特点 |
---|---|
Standalone-Single | Master 单节点模式,如果 Master 挂了就无法提交应用程序 |
Standalone-HA | Master 高可用模式。可以使用 FileSystem (文件系统)或 ZooKeeper (分布式协调服务)来实现主备切换 |
优点
- 独立模式属于自己独立一套集群(Client/Master/Worker),是 Spark 原生的集群管理器,自带完整的服务,可单独部
署,无需依赖任何其他资源管理系统 - 使用 Standalone 可以很方便地搭建一个集群
缺点
资源不利于充分利用
YARN 模式
Spark 可以基于 YARN 来计算(将 Spark 应用提交到 YARN 上运行),Spark 客户端直接连接 YARN,不需要额外构建 Spark 集群Spark 中的各个角色运行在 YARN 的容器内部,并组成Spark 集群环境。
在 Spark On Yarn 模式下,Executor 进程名称为 CoarseGrainedExecutorBackend。一个 CoarseGrainedExecutorBackend
有且仅有一个 Executor 对象, 负责将 Task 包装成 taskRunner,并从线程池中抽取一个空闲线程运行 Task,每一个
CoarseGrainedExecutorBackend 能并行运行 Task 的数量取决于分配给它的 CPU 个数。
Client模式和Cluster模式
Client模式和Cluster模式是在上述模式的基础上,运行时的不同模式(不包括普通的本地模式)
模式 | 区别 |
---|---|
Client模式 | 客户端在提交任务后,不能离开,并会将最后的结果返回给到客户端,提交任务的节点就是Driver所在的节点 |
Cluster模式 | 客户端在提交任务后,将直接离开,由Master决定Driver所在节点,Driver管理任务的完成,并将任务结果打印在日志中,结果只能在日志中查看(Yarn中,Driver 在AplicationMaster中) |
核心编程
RDD
概念和特性
概念:
RDD 是 Resilient Distributed Dataset 的缩写,意思为弹性分布式数据集(一种数据结构),是一个读取分区记录的集合,是 Spark 对需要处理的数据的基本抽象。源码中是一个抽象类,代表一系列弹性的、不可变、可分区、里面元素可并行计算的集合。
注意:RDD中存放的是数据的读取逻辑和数据的计算逻辑,并发给Executor。
特性
- 弹性
- 弹性存储:内存与磁盘自动切换;
- 弹性容错:数据丢失可以自动恢复;
- 弹性计算:计算出错重试机制;
- 弹性分片:可根据需求重新分片。
- 分布式:数据存储在大数据集群不同的节点上;
- 数据集:RDD 只是封装了计算逻辑,并不保存数据;
- 数据抽象:RDD 是一个抽象类,需要子类具体实现;
- 不可变:RDD 封装了计算逻辑,是不可改变的,想要改变,只能产生新的 RDD,在新的 RDD 中封装新的计算逻辑;
- 可分区:RDD 是一种分布式的数据集,由于数据量很大,因此计算时要被切分并存储在各个结点的分区当中;
- 并行计算:一个分区对应一个任务。分区是 Spark 计算任务的基本处理单位,决定了并行计算的粒度。
- 依赖关系:如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来。
- 惰性执行:Spark 对于 Transformation 转换算子采用惰性计算机制,遇到 Transformation 时并不会立即计算结果,而是要等遇到 Action 行动算子时才会一起执行。
创建RDD
通过集合创建RDD(通过内存)
def main(args: Array[String]): Unit = {// 建立连接val sc = new SparkContext("local[*]", "CreateRDD")// 通过内存创建 RDDval list = List(1, 2, 3, 4, 5)// parallelize 表示并行度,为了方便使用可以调用 makeRDD//val rdd: RDD[Int] = sc.parallelize(list)// 通过源码得知 makeRDD 内部调用了 parallelizeval rdd: RDD[Int] = sc.makeRDD(list)rdd.foreach(println)// 关闭连接if (!sc.isStopped) sc.stop()}
通过文件创建RDD
def main(args: Array[String]): Unit = {// 建立连接val sc = new SparkContext("local[*]", "CreateRDD")// path 可以是具体的文件,也可以是目录,当 path 是目录时,则获取目录下所有文件val rdd01: RDD[String] = sc.textFile("data/test.txt")rdd01.foreach(println)val rdd02: RDD[String] = sc.textFile("data/wordcount")rdd02.foreach(println)// 还支持通配符匹配val rdd03: RDD[String] = sc.textFile("data/wordcount/wd1*.txt")rdd03.foreach(println)// 访问的节点必须为 Active NameNode,也就是说 node01 必须是 Active NameNodeval rdd04: RDD[String] = sc.textFile("hdfs://node02:8020/bd/wordcount")rdd04.foreach(println)// textFile 是按行读取,wholeTextFiles 是按整个文件读取// 返回格式为元组:(文件地址+文件名, 文件内容)val rdd05: RDD[(String, String)] = sc.wholeTextFiles("data/test.txt")rdd05.foreach(println)val rdd06: RDD[(String, String)] = sc.wholeTextFiles("hdfs://node02:8020/bd/wordcount")rdd06.foreach(println)// 关闭连接if (!sc.isStopped) sc.stop()}
分区
Spark RDD 是一种分布式的数据集,由于数据量很大,因此计算时要被切分并存储在各个结点的分区当中
具体分区方法 | ||
---|---|---|
集合的分区处理 | makeRDD | 通过设置numSlices 参数来设置分区数,不传递时将使用默认值defaultParallelism ,该默认值表示将按当前运行环境的最大可用 CPU 核数进行分区 |
文件的分区处理 | textFile | 读取文件数据时,数据会按照 Hadoop 文件读取的规则进行分区 |
支持重分区的算子 | \ | 在Spark中有一些算子也可以支持数据的重新分区,详见算子 |
分区器的分区处理 | HashPartitioner RangePartitioner等继承Partitioner的自定义类 | 通过继承Partitioner来自己定义实现不同的分区方式 |
算子
算子的本质就是函数,不同的RDD之间的依赖关系,是一个函数空间到另一个函数空间的映射。
转换算子
转换往往是从一个 RDD 到另一个 RDD 的计算。在执行应用的程序时,遇到转换算子,并不会立即触发计算操作,而是延时到遇到 Action 算子时才会操作。
单 Value的转换算子
算子 | 作用 | 语法 | 例如 |
---|---|---|---|
map | 将处理的数据逐条进行映射转换,将返回值构成新的 RDD | def map[U: ClassTag](f: T => U): RDD[U] | |
mapPartitions | 以分区为单位进行数据转换操作,将整个分区的数据加载到内存 | def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | |
mapPartitionsWithIndex | mapPartitionsWithIndex 跟 mapPartitions 差不多,只是参数多了个分区号 | def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] | |
flatMap | 首先将函数作用于集合中的每个元素,然后将结果展平,返回新的集合 | def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] | |
glom | 将同一个分区的多个单个数据直接转换为相同类型的单个数组进行处理 | def glom(): RDD[Array[T]] | |
groupBy(shuffle) | 将数据按指定条件进行分组,从而方便我们对数据进行统计分析 | def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] | |
filter | 指过滤出符合一定条件的元素 | def filter(f: T => Boolean): RDD[T] | |
sample | 是从大量的数据中获取少量的数据 | def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] | |
distinct(shuffle) | 将数据集中重复的数据去重 | def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] |
Shuffer:
sample采样的策略:
withReplacement:表示抽出样本数据后是否再放回去,true 表示会放回去,这也就意味着抽出的样本可能有重复;
fraction:
- 抽取不放回的情况下为每个元素被抽取的概率,在 0 ~ 1 之间,e.g. 0.3 表示抽出 30%;1 全取,0 全不取;
- 抽取放回的情况下为每个元素可能被抽取的次数,e.g. 3 表示可能会抽取 3 次,当然也有可能抽取 1 次或者 2 次,或者 1 次都抽不到或者抽到更多次。
- seed:表示随机种子可以将这个参数设置为定值。种子相同时,指定相同的 fraction 每次抽样的数据也会相同,如果此时出现数据不相同的情况那就是数据出问题了
coalesce 函数除了可以缩减分区数,还可以扩增分区数,但是扩增分区数时一定要设置 shuffle 为 true
双 Value的转换算子
算子 | 作用 | 语法 | 例如 |
---|---|---|---|
union | 对两个 RDD 进行并集(合并)操作,且不去重 | rdd1.union(rdd2) | |
intersection | 表示对两个 RDD 取交集(相同) | rdd1.intersection(rdd2) | |
subtract | 表示对两个 RDD 取差集(不同) | rdd1.subtract(rdd2) | |
cartesian | 表示对两个 RDD 进行笛卡尔积操作 | rdd1.cartesian(rdd2) | |
zip | 将两个 RDD 合并成一个 RDD | ``rdd1.zip(rdd2)` |
zip:两个 RDD 的 Partition 数量以及元素数量都必须相同,否则会抛出异常
转换算子(Key-Value)
算子 | 作用 | 语法 | 例如 |
---|---|---|---|
partitionBy(shuffle) | 按照指定分区数重新进行分区 | def partitionBy(partitioner: Partitioner): RDD[(K, V)] | mkRDD.partitionBy(new HashPartitioner(2)) |
sortByKey(shuffle) | 将 K, V 格式数据的 Key 根据指定的规则进行排序,默认为升序 | def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] | mkRDD.sortByKey() |
reduceByKey(shuffle) | 将相同 Key 的值聚合到一起 | def reduceByKey(func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] | mkRDD.reduceByKey(_ + _) |
groupByKey(shuffle) | 按 K, V 格式数据的 Key 进行分组,会返回 (K, Iterable[V]) 格式数据 | def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] | mkRDD.groupByKey() |
aggregateByKey(shuffle) | 按 K, V 格式数据的 Key 进行分组,可以实现分区内和分区间不同的计算逻辑 | def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,<br/> combOp: (U, U) => U): RDD[(K, U)] | mkRDD.aggregateByKey(0)(math.max(_, _), _ + _) |
mapValues | 针对于 K, V 形式的数据只对 V 进行操作 | def mapValues[U](f: V => U): RDD[(K, U)] | aggregateByKeyRDD.mapValues(t => t._1 / t._2) |
foldByKey(shuffle) | 当分区内和分区间计算逻辑相同时使用 | def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] | mkRDD.foldByKey(0)(_ + _) |
combineByKey(shuffle) | 第一个参数是给每个分区的第一个 Key 的 Value 一个初始值规则 | def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null): RDD[(K, C)] | mkRDD.combineByKey( (_, 1),(t, v) => (t._1 + v, t._2 + 1),(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2) |
join | 在类型为 K, V 和 K, W 的 RDD 上调用,返回一个相同 Key 对应的所有元素对在一起的 K, (V, W) 的 RDD | def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] | rdd1.join(rdd2) |
分区器Partitioner:
- HashPartitioner
- RangePartitioner
- 继承Partitioner的自定义分区器
五种聚合算子(上表中加粗的算子)最终都会调用到 combineByKeyWithClassTag 这个函数:
行动算子
一个行动往往代表一种输出到外部系统的操作。在执行应用的程序时,遇到行动算子,会立即产生一个 Job,对已有的 RDD 中的数据执行计算后产生结果,将结果返回 Driver 程序或写入到外部物理存储。
算子 | 作用 | 语法 | 例如 |
---|---|---|---|
reduce | 通过函数聚合 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据 | def reduce(f: (T, T) => T): T | mkRDD.reduce(_ + _) |
count | 返回 RDD 中元素的个数 | def count(): Long | mkRDD.count() |
take | 返回 RDD 的前 n 个元素组成的数组 | def take(num: Int): Array[T] | mkRDD.take(3) |
takeOrdered | 返回 RDD 排序后的前 n 个元素组成的数组,默认正序 | def take(num: Int): Array[T] | ```mkRDD.takeOrdered(3)(Ordering.Int.reverse)` |
first | 返回 RDD 中的第一个元素,底层就是 take(1) | def first(): T | mkRDD.first() |
foreach | 循环遍历数据集中的每个元素,运行相应的计算逻辑(函数) | def foreach(f: T => Unit): Unit | mkRDD.foreach(println) |
foreachPartition | 按分区循环遍历数据集中的每个元素,并运行相应的计算逻辑(函数) | def foreachPartition(f: Iterator[T] => Unit): Unit | mkRDD.foreachPartition(partitionOfRecords => println(s"分区:${TaskContext.getPartitionId()},记录:${partitionOfRecords.toArray.mkString(",")}")) |
collect | 将不同分区的数据收集到 Driver 并以数组的形式返回数据 | def collect(): T | mkRDD.collect() |
aggregate | 将每个分区里面的元素通过 seqOp 函数和初始值进行聚合,然后用 combOp 函数将每个分区的结果和初始值(zeroValue)进行 combine 操作 | def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U | mkRDD.aggregate(10)(_ + _, _ + _) |
fold | 当 aggregate 的分区内和分区间计算逻辑相同时,Spark 为了让程序员更方便的使用,提供了 fold 算子 | def fold(zeroValue: T)(op: (T, T) => T): T | mkRDD.fold(10)(_ + _) |
countByKey | 针对 K, V 类型的 RDD,返回一个 K, Int 的 map,表示每一个 Key 对应的元素个数,countByKey 底层调用的是 reduceByKey | def countByKey(): Map[K, Long] = self.withScope {self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap} | mkRDD.map(_ -> 1).countByKey() |
countByValue | 针对 K, V 类型的 RDD,返回一个 K, Int 的 map,表示每一个 Key 对应的元素个数countByValue 底层调用的是map().countByKey() | def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {map(value => (value, null)).countByKey()} | mkRDD.countByValue() |
save 系列:
- saveAsTextFile:将数据集的元素以 TextFile 的形式保存到 HDFS 文件系统或者其他文件系统
- saveAsSequenceFile:将数据集中的元素以 Hadoop SequenceFile 的格式保存到 HDFS 文件系统或者其他文件系统
- saveAsObjectFile:将 RDD 中的元素序列化成对象,存储到文件中
控制算子
可以将 RDD 持久化,持久化的单位是 Partition。
算子 | 作用 | 语法 | 例如 |
---|---|---|---|
cache | 保存到内存,效率高,数据不安全容易丢失 | def cache(): T | mkRDD.cache() |
persist | 保存到磁盘(临时文件,作业结束后会删除),效率低,数据安全 | def cache(StorageLevel.xxx): T | mkRDD.persist(StorageLevel.MEMORY_AND_DISK) |
checkpoint | 保存到磁盘(永久保存,一般存储在分布式文件系统中,例如 HDFS),效率低,数据安全 | \ | 核心代码:sparkContext.setCheckpointDir("hdfs://node02:8020/yjx/cp")<br />rdd3.checkpoint() |
注意:
- 当我们没有对RDD持久化及没有使用控制算子时,我们使用行动算子时会自动从数据源开始再进行一次全部逻辑
- checkpoint 可以理解为改变了数据源,因为关键数据已经计算完成,没有必要重头进行读取,所以 checkpoint
算子不仅能将 RDD 持久化到磁盘,还能切断 RDD 之间的依赖关系
闭包检测
闭包的概念
Spark Job 产生的任务会并行执行,也就是说会分发给多个 Executor 去执行。那么从计算的角度,算子以外的代码都是在 Driver 端执行,算子里面的代码都是在 Executor 端执行,这样就会导致算子内可能会用到算子外的
数据,就会形成闭包的效果。
而算子外的数据想要被 Executor 端的任务使用,就必须先序列化然后通过网络 IO 进行传输,此时如果算子外的数据
无法序列化,就会发生错误。所以为了降低程序出错的可能性,Spark 的算子增加了闭包检查功能,会在执行计算任务
前,检测闭包内的对象是否可以进行序列化。
常见问题
算子外的数据无法序列化会出现以上问题
解决方案
- 继承 scala.Serializable
- 使用样例类(推荐),样例类默认继承了 Serializable
Kryo 序列化框架
- Spark 内部默认使用的序列化框架 Kryo。
- 由于 Java 自身的序列化比较重(字节多),所以出于性能的考虑,Spark 2.0 开始支持另外一种序列化机制 Kryo。
- Kryo 的速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark
内部使用 Kryo 来序列化。 - 使用 Kryo 序列化,也需要继承 Serializable 接口。
- 配置 Kryo 序列化方式如下:
val conf: SparkConf = new SparkConf()
.setMaster("local[*]").setAppName("SerializDemo")
// 设置 Kryo 序列化器
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册 Kryo 序列化器
.registerKryoClasses(Array(classOf[User]))
血统和依赖关系
血统
Spark 根据用户 Application 中的 RDD 的转换算子和行动算子,会生成 RDD 之间的依赖关系,多个 RDD 之间的关系又
形成一条关系链叫做 RDD 的血统(Lineage)。
依赖关系
窄依赖(Narrow Dependency)
窄依赖指的是父 RDD 和子 RDD 的 Partition 之间的关系是一对一的(独生子女)。
宽依赖(Wide Dependency)
宽依赖指的是父 RDD 与子 RDD 的 Partition 之间的关系是一对多(多胎),宽依赖会有 Shuffle 的产生。
通常情况下,产生了Shuffer就是宽依赖。
Stage阶段
· 每个 Job 会被拆分成多个 Task,作为一个 TaskSet 任务集,其名称为 Stage,Stage 的划分和调度是由 DAGScheduler
来负责的。Stage 的切割规则为:从后往前,遇到宽依赖就切割 Stage 。
总结:
- 从后向前推理,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到 Stage 中;
- 默认情况下每个 Stage 里面的 Task 的数量是由该 Stage 中最后一个 RDD 的 Partition 数量决定的(一个 Partition 对应一
个 Task); - 最后一个 Stage(ResultStage) 里面的任务类型是 ResultTask,前面所有其他 Stage(ShuffleMapStage) 里面的任务类型都是
ShuffleMapTask; - 代表当前 Stage 的算子一定是该 Stage 的最后一个计算步骤。
Job作业
Job 包含了多个 Task 组成的并行计算,往往由 Spark Action 算子触发生成, 一个 Application 中往往会产生多个
Job。一个Job 包含 N 个 Transformation 算子和 1 个 Action 算子。
总结:
- Application:初始化一个 SparkContext 即生成一个 Application;
- Job 是 Application 的子集,以 Spark Action 算子为界,遇到一个 Action 算子就触发一个 Job;
- Stage 是 Job 的子集,以 RDD 宽依赖(即 Shuffle)为界,遇到 Shuffle 就做一次划分;
- Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,就有多少个 Task。
提示:Spark 中所谓的并行度是指 RDD 中的分区数,即 RDD 中的 Task 数。
注意:Application → Job → Stage → Task,每一层都是 1 对 N 的关系。
数据本地化
数据本地化级别:
PROCESS_LOCAL
:进程本地化,性能最好。指代码和数据在同一个进程中,也就是同一个 Executor 中;计算数据的
Task 由 Executor 执行,此时数据在 Executor 的 BlockManager 中;NODE_LOCAL
:节点本地化。代码和数据在同一个节点中,数据存储在节点的 HDFS Block,Task 在节点的某个
Executror 执行;或者数据和 Task 在同一个节点不同的 Executor 中,数据需要跨进程传输;NO_PREF
:没有最佳位置这一说,数据从哪里访问都一样,不需要位置优先,比如 SparkSQL 直接读取 MySQL;RACK_LOCAL
:机架本地化。数据和 Task 在一个机架的两个节点上,数据需要通过网络在节点之间进行传输;ANY
:数据和 Task 可能在集群中的任何地方,而且不在一个机架中,性能最差。
Process Locality:相关参数 spark.locality.wait 默认值是 3s。Task 任务分配的时候,先是按照 PROCESS_LOCAL
方式去分配 Task,如果 PROCESS_LOCAL 不满足,默认等待 3 秒,看能不能按照这个级别去分配,如果等了 3 秒也实现不
了,那么就按 NODE_LOCAL 级别去分配 ,以此类推。// 全局设置,默认 3s
spark.locality.wait=3s
// 建议 60s
spark.locality.wait.process=60s
// 建议 30s
spark.locality.wait.node=30s
// 建议 20s
spark.locality.wait.rack=20s
广播变量(分布式共享只读变量)
Spark 提供了广播变量,一种分布式共享只读变量,使用了广播变量存储 regex 后,Executor 端
就只会存储一份该数据供多个 Task 使用,为了防止数据被修改,所以是只读变量。
工作流程
- 广播变量初始的时候在 Drvier 端会有一份副本
- Task 在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的 Executor 对应的 BlockManager 中,尝试获取变量副本
- 如果本地没有,那么就从 Driver 端远程拉取变量副本,并保存在本地的 Executor 对应的 BlockManager 中
- 此后这个 Executor 上的其他 Task 都会直接使用本地的BlockManager 中的副本
- Executor 的 BlockManager 除了从 Driver 端上拉取,也能从其他节点的 BlockManager 上拉取变量副本,距离越近越好
优点
- 不是每个 Task 一份变量副本,而是每个节点的 Executor 一份副本,可以让变量产生的副本数大大减少
- 变量一旦被定义为一个广播变量,那么这个变量只能读取,不能修改
注意:
- 不能将RDD广播出去, RDD 是不存储数据的。可以将 RDD 的结果广播出去。
- 广播变量只能在 Driver 端定义,不能在 Executor 端定义。
- 在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值
- 如果 Executor 端用到了 Driver 端的变量,不使用广播变量,在 Executor 有多少 Task 就有多少 Driver 端的变量副本
- 如果 Executor 端用到了 Driver 端的变量,使用广播变量,在每个 Executor 中只有一份 Driver 端的变量副本
累加器(分布式共享只写变量)
累加器是 Spark 提供的一种分布式共享只写变量,主要用于在分布式计算中高效地执行全局聚合操作。
累加器在 Driver 端定义并赋初始值,累加器只能在 Driver 端读取最后的值,在 Excutor 端更新。
自定义累加器
Spark 还支持自定义累加器,只需要继承 AccumulatorV2 即可。
简单示例:
package com.yjxxt.accumulator
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
object WordCountAccumulatorDemo {def main(args: Array[String]): Unit = {// 建立连接val sc = new SparkContext("local[*]", "OnlineCountDemo")// 初始化自定义累加器val wordCountAccumulator = new WordCountAccumulator// 注册累加器sc.register(wordCountAccumulator, "wordCountAccumulator")// 创建 RDDval rdd: RDD[String] = sc.textFile("data/wordcount", 2)// 自定义累加器 实现 WordCountrdd.flatMap(_.split("\\s+")).foreach(wordCountAccumulator.add)// 获取累加器println(wordCountAccumulator.value)// 关闭连接if (!sc.isStopped) sc.stop()}/*** 自定义累加器*/class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {// 定义可变 Map 存放 Word 和 Count(这个就是自定义的累加器)private var wd = mutable.Map[String, Long]()// 累加器是否为零(空)override def isZero: Boolean = this.wd.isEmpty// 拷贝新的累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new WordCountAccumulator// 重置累加器override def reset(): Unit = this.wd.clear()// 累加器相加override def add(word: String): Unit = {val newCount = wd.getOrElse(word, 0L) + 1Lwd.update(word, newCount)}// 累加器合并override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val m1 = this.wdval m2 = other.valuem2.foreach {case (word, count) => {val newCount = m1.getOrElse(word, 0L) + countm1.update(word, newCount)}}}// 获取累加器
override def value: mutable.Map[String, Long] = wd}
}
内存管理 (同一内存管理)
储存内存和执行内存**共享同一块空间**,可以动态**借用对方的空闲区域**。
内存分配
其中最重要的优化在于动态借用机制,其规则如下:
- 设定基本的存储内存大小和执行内存大小( spark.memory.storageFraction 参数,默认为 0.5,即各占一半),
该设定确定了双方各自拥有的空间范围; - 双方空间都不足时,则存储到硬盘;若己方空间不足而对方空间空余时,可借用对方的空间(存储空间不足是指不足
以放下一个完整的 Block); - 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后“归还”借用的空间;
- 存储内存的空间被对方占用后,无法让对方“归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。
内存估算
内存具体估算公式如下:
- 估算 Other 内存 = 自定义数据结构大小 * 每个 Executor 核数;
- 估算 Storage 内存 = 广播变量数据大小 + Cache / Executor 数量。例如 Cache 了 100G 数据,10 个 Executor,那么每个
Executor 分别存储 10G 数据。在 Spark WEBUI 的 Storage 标签页可以直接查看所有的内存占用,大致就对应 Storage
Memory。 - 估算 Execution 内存 = 知道了 Storage Memory,Execution Memory 也就知道了,因为默认情况下 Execution Memory 和
Storage Memory 占用 Spark Memory 的比例是相同的。 - 估算 Executor 内存 = 每个 Executor 核数 *(数据集大小 / 并行度)。例如 100G 数据,并行度 200(Task),单个并行
度的数据量为 100G / 200 = 500M。然后 executor-cores 为 4,最终 Executor 内存最少需要 2G 内存。 - Task 被执行的并行度 = Executor 个数 * 每个 Executor 的 Core 核数。
内存配置
一般情况下,各个区域的内存比例保持默认值即可。如果需要更加精确的控制内存分配,可以按照如下思路:
- Unified 统一内存比例 spark.memory.fraction = (估算 Storage 内存 + 估算 Execution 内存)/(估算 Storage 内存 + 估算
Execution 内存 + 估算 Other 内存)。 - Storage/Execution 内存比例 spark.memory.storageFraction = (估算 Storage 内存)/(估算 Storage 内存 + 估算
Execution 内存)。
配置参数说明:
- Unified 统一内存比例:spark.memory.fraction,默认为 0.6;
- Storage/Execution 内存比例:spark.memory.storageFraction,默认为 0.5。
代入公式计算:
- Storage 内存 = (spark.executor.memory - 300MB) * spark.memory.fraction * spark.memory.storageFraction
- Execution 内存 = (spark.executor.memdry - 300MB) * spark.memory.fraction * (1 - spark.memory.storageFraction)
4,最终 Executor 内存最少需要 2G 内存。 - Task 被执行的并行度 = Executor 个数 * 每个 Executor 的 Core 核数。
内存配置
一般情况下,各个区域的内存比例保持默认值即可。如果需要更加精确的控制内存分配,可以按照如下思路:
- Unified 统一内存比例 spark.memory.fraction = (估算 Storage 内存 + 估算 Execution 内存)/(估算 Storage 内存 + 估算
Execution 内存 + 估算 Other 内存)。 - Storage/Execution 内存比例 spark.memory.storageFraction = (估算 Storage 内存)/(估算 Storage 内存 + 估算
Execution 内存)。
配置参数说明:
- Unified 统一内存比例:spark.memory.fraction,默认为 0.6;
- Storage/Execution 内存比例:spark.memory.storageFraction,默认为 0.5。
代入公式计算:
- Storage 内存 = (spark.executor.memory - 300MB) * spark.memory.fraction * spark.memory.storageFraction
- Execution 内存 = (spark.executor.memdry - 300MB) * spark.memory.fraction * (1 - spark.memory.storageFraction)