目录
- 1 程序入口:SparkContext对象
- 2 RDD的创建
- 3 RDD算子
- 4 常用Transform算子
- 4.1 map算子
- 4.2 flatMap算子
- 4.3 reduceBykey算子
- 4.4 mapValues算子
- <实例> WordCount
- 4.5 groupBy算子
- 4.6 filter算子
- 4.7 distinct算子
- 4.8 union算子
- 4.9 join算子
- 4.10 intersection算子
- 4.11 glom算子
- 4.12 groupByKey算子
- 4.13 sortBy算子
- 4.14 sortByKey算子
- 5 常用Action算子
- 5.1 collect算子
- 5.2 reduce算子
- 5.3 fold算子
- 5.4 first算子
- 5.5 take算子
- 5.6 takeSample算子
- 5.7 takeOrdered算子
- 5.8 foreach算子
- 5.9 saveAsTextFile算子
- 6 分区操作算子
- 6.1 mapPartitions算子【Transform】
- 6.2 foreachPartition算子【Action】
- 6.3 partitionBy算子【Transform】
- 6.4 repartition算子【Transform】
- 7 groupByKey VS reduceByKey
1 程序入口:SparkContext对象
- 只有构建出SparkContext,基于它才能执行后续的API调用和计算
2 RDD的创建
2.1 本地创建
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])print("默认分区数:", rdd.getNumPartitions())rdd = sc.parallelize([1, 2, 3], 3)print("分区数:", rdd.getNumPartitions())print("RDD内容:", rdd.collect())
2.2 读取文件创建
- textFile API:可以读取本地数据,也可以读取hdfs数据
- 用法:
- sparkcontext.textFile(参数1, 参数2)
- 参数1:必填,文件路径,支持本地文件/HDFS文件/S3协议
- 参数2:可选,表示最小分区数量
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("Local[*]")sc = SparkContext(conf=conf)file_rdd1 = sc.textFile("../data/input/words.text")print("默认分区数:", file_rdd1.getNumPartitions())print("file_rdd内容:", file_rdd1.collect())file_rdd2 = sc.textFile("../data/input/words.text", 3)print("file_rdd2分区数:", file_rdd2.getNumPartitions())file_rdd3 = sc.textFile("../data/input/words.text", 100)print("file_rdd3分区数:", file_rdd3.getNumPartitions())hdfs_rdd = sc.textFile("hdfs://node1:8020/input/words.txt")print("hdfs_rdd内容", hdfs_rdd.collect())
- wholeTextFile:小文件读取专用
- 用法:
- sparkcontext.wholeTextFile(参数1, 参数2)
- 参数1:必填,文件路径,支持本地文件/HDFS文件/S3协议
- 参数2:可选,表示最小分区数量
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("Local[*]")sc = SparkContext(conf=conf)rdd = sc.wholeTextFiles("../data/input/tiny_files")print(rdd.map(lambda x:x[1]).collect())
3 RDD算子
- 算子:分布式集合对象上的API(方法/函数:本地对象的API)
- 分类
- Transform:转换算子
- 定义:返回值仍是RDD
- 特性: lazy懒加载,如果没有Action算子,Transform算子不工作
- Action:动作/行动算子
4 常用Transform算子
4.1 map算子
rdd.map(func)
# func: f:(T) -> U
# f: 函数(方法)
# (T) -> U: 方法的定义
# T/U: 泛型,此处表示任意类型
# (T) -> U: 表示一个方法,接受一个传入参数,类型不限,返回一个返回值,类型不限
# (A) -> A: 表示一个方法,接受一个传入参数,类型不限,返回一个返回值,与传入参数类型一致
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)print(rdd.map(lambda data: data * 10).collect())
4.2 flatMap算子
- 功能:对RDD执行map操作,然后进行解除嵌套操作 【降维】
rdd.flatMap(func)
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize(["aaa bbb ccc", "ab bc cd", "aaa abc ccc"])rdd2 = rdd1.map(lambda line: line.split(" "))print(rdd2.collect())rdd3 = rdd1.flatMap(lambda line: line.split(" "))print(rdd3.collect())
4.3 reduceBykey算子
- 功能:针对KV型RDD,自动按照Key分组,然后依据传入的聚合逻辑,完成Value的聚合计算
rdd.reduceByKey(func)
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4), ('c', 5)])res = rdd.reduceByKey(lambda a, b: a + b)print(res.collect())
4.4 mapValues算子
- 功能:针对二元元组RDD,对其内部Value进行map操作
rdd.mapValues(func)
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('c', 5)])res = rdd.mapValues(lambda x: x * 10)print(res.collect())
<实例> WordCount
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)file = sc.textFile("./Data/words.txt")rdd1 = file.flatMap(lambda line: line.split(" "))rdd2 = rdd1.map(lambda word: (word, 1))res = rdd2.reduceByKey(lambda a, b: a + b)print(res.collect())
4.5 groupBy算子
rdd.groupBy(func)
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])res = rdd.groupBy(lambda t: t[0])print(res.collect())print(res.map(lambda t: (t[0], list(t[1]))).collect())
4.6 filter算子
rdd.filter(func)
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])res = rdd.filter(lambda x: x % 2 == 1)print(res.collect())
4.7 distinct算子
rdd.distinct(参数1)
# 参数1: 去重分区数量,一般无需填写
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 1, 2, 2, 3, 3])res = rdd.distinct()print(res.collect())rdd2 = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1)])res2 = rdd2.distinct()print(res2.collect())
4.8 union算子
- 功能:2个RDD合并为1个返回(不去重,不限类型)
rdd.union(other_rdd)
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1, 1, 2, 3])rdd2 = sc.parallelize(['a', 'b', 'c'])res = rdd1.union(rdd2)print(res.collect())
4.9 join算子
- 功能:对两个二元元组RDD执行join操作(可实现SQL的内/外连接)
rdd.join(other_rdd) # 内连接
rdd.leftOuterJoin(other_rdd) # 左外连接
rdd.rightOuterJoin(other_rdd) # 右外连接
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)staff = sc.parallelize([(1001, 'Sam'), (1002, 'Lily'), (1003, 'John'), (1004, 'Rebecca')])dept = sc.parallelize([(1001, '销售部'), (1002, '科技部')])print(staff.join(dept).collect())print(staff.leftOuterJoin(dept).collect())print(staff.rightOuterJoin(dept).collect())
4.10 intersection算子
rdd.intersection(other_rdd)
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1, 2, 3, 4, 5])rdd2 = sc.parallelize([4, 5, 6, 7, 8])res = rdd1.intersection(rdd2)print(res.collect())
4.11 glom算子
rdd.glom()
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)print(rdd.glom().collect())print(rdd.glom().flatMap(lambda x:x).collect())
4.12 groupByKey算子
rdd.groupByKey()
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])res = rdd.groupByKey()print(res.collect())print(res.map(lambda x:(x[0], list(x[1]))).collect())
4.13 sortBy算子
- 功能:将RDD数据按照指定依据排序(局部分区内有序)
rdd.sortBy(func, ascending=False, numPartitions=1)
# func: (T) -> U, 指定排序依据
# ascending: True 升序, False 降序
# numPartitions: 用多少分区排序, 全局排序时需设置为1
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 5), ('b', 3), ('c', 4), ('e', 1), ('m', 9)], 3)res = rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3)print(res.collect())res2 = rdd.sortBy(lambda x: x[0], ascending=False, numPartitions=3)print(res2.collect())
4.14 sortByKey算子
rdd.sortByKey(ascending=False, numPartitions=None, keyfunc=<function RDD.<lambda>>)
# ascending: True 升序(默认), False 降序
# numPartitions: 用多少分区排序, 全局排序时需设置为1
# keyfunc: 在排序前对key进行处理, (k) -> U, 传入一个参数,返回一个值
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('A', 2), ('b', 5), ('B', 3), ('c', 4), ('E', 1), ('m', 9)], 3)res = rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key:str(key).lower())print(res.collect())
5 常用Action算子
5.1 collect算子
- 功能:将RDD各个分区的数据,统一收集到driver中,返回一个ist对象
rdd.collect()
5.2 reduce算子
rdd.reduce(func)
# func: (T, T) -> T
# 2个传入参数,1个返回值,类型相同
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))res = rdd.reduce(lambda a, b: a+b)print(res)
5.3 fold算子
- 功能:和reduce一样,接受传入逻辑进行聚合,聚合时带有初始值
rdd.fold(func)
# 聚合初始值作用于:分区内聚合 & 分区间聚合
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10), 3)res = rdd.fold(10, lambda a, b: a+b)print(res)
5.4 first算子
rdd.first()
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))print(rdd.first())
5.5 take算子
rdd.take(n)
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))print(rdd.take(5))
5.6 takeSample算子
rdd.takeSample(参数1: True/False, 参数2: 采样数, 参数3: 随机数种子)
# 参数1:能否重复抽取同一个位置的数据
# 参数2:抽样数
# 参数3:随机数种子,如果传入同一个数字,取出的结果一致;如不传,Spark会自动随机赋值
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5], 1)print(rdd.takeSample(True, 10))print(rdd.takeSample(False, 10))
5.7 takeOrdered算子
rdd.takeOrdered(参数1, 参数2)
# 参数1:N
# 参数2:对排序的依据进行更改,不会改变数据本身
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 3, 5, 7, 9], 1)print(rdd.takeOrdered(3))print(rdd.takeOrdered(3, lambda x: -x))
5.8 foreach算子
- 功能:对RDD的每个元素,执行传入的操作逻辑(没有返回值的map方法)
- 在Executor直接执行,不用拉回Driver中对外输出,效率更高
rdd.foreach(func)
# func: (T) -> None
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5], 1)res = rdd.foreach(lambda x: x * 10)print(res)res = rdd.foreach(lambda x: print(x * 10))
5.9 saveAsTextFile算子
- 功能:将RDD的数据写入文本文件中
- 支持本地写入、HDFS等
- RDD有几个分区,就有几个文件
- 在Executor中执行,性能较好
rdd.saveAsTextFile(路径)
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5], 1)rdd.saveAsTextFile("../Data/output/out1")rdd.saveAsTextFile("hdfs://node1:8020/output/out1")
6 分区操作算子
6.1 mapPartitions算子【Transform】
- 功能:和普通map一样,但一次传递一整个分区的数据,作为一个迭代器(一次性List)对象传入

rdd.mapPartitions(func)
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5], 3)def process(iter):res = []for i in iter:res.append(i * 10)return resprint(rdd.mapPartitions(process).collect())
6.2 foreachPartition算子【Action】
- 功能:和普通foreach一样,一次处理一整个分区数据
- foreachPartition就是一个没有返回值的mapPartitions
rdd.foreachPartition()
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7], 3)def process(iter):res = []for i in iter:res.append(i * 10)print(res)rdd.foreachPartition(process)
6.3 partitionBy算子【Transform】
rdd.partitionBy(参数1, 参数2)
# 参数1:重新分区后有几个分区
# 参数2:func, 自定义分区规则,函数传入;(K) -> int
# 分区编号从0开始
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('c', 4), ('d', 5)])def process(k):if 'a' == k or 'b' == k: return 0if 'c' == k: return 1return 2print(rdd.partitionBy(3, process).glom().collect())
6.4 repartition算子【Transform】
- 功能:对RDD的分区执行重新分区(仅数量)
- 注意:对分区数量进行操作一定要慎重;因为修改分区数量会影响并行计算(内存迭代的并行管道数量);分区增加极大可能会导致shuffle
rdd.repartition(N)
# N:新分区数
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5], 3)print(rdd.repartition(1).getNumPartitions())print(rdd.repartition(5).getNumPartitions())print(rdd.coalesce(1).getNumPartitions())print(rdd.coalesce(5).getNumPartitions())print(rdd.coalesce(5, True).getNumPartitions())
7 groupByKey VS reduceByKey
- 功能上
- groupByKey:分组
- reduceByKey:分组+聚合
- 性能上
- reduceByKey的性能远大于groupByKey + 聚合逻辑
- groupByKey + 聚合:先分组(shuffle)后聚合

- reduceByKey:先在分区内做预聚合,再分组(shuffle),再最终聚合。【减少shuffle数据量,降低网络IO开销,极大提升性能;数据量越大,优势越大】
