一、常用的Action算子

1-1、countByKey算子

作用:统计key出现的次数,一般适用于K-V型的RDD。

【注意】:

1、collect()是RDD的算子,此时的Action算子,没有生成新的RDD,所以,没有collect()!!!

2、Action算子,返回值不再是RDD,而是字典!

示例:

1-2、collect算子

1-3、reduce算子

示例:

返回结果:15

回顾:reduceByKey的逻辑:Spark03-RDD01-简介+常用的Transformation算子-CSDN博客

1-4、fold算子

1-5、first算子

示例:

1-6、take算子

功能:获取RDD的前N个元素组合成list返回给你。

示例:

1-7、top算子

功能:对RDD数据集,先降序,再取前N个。相当于:取最大的前N个数字,返回类型:list

【注意】:item之间的比较,可以自定义比较函数。

1-8、count算子

计算RDD有多少条数据,返回的是一个数字!

1-9、takeSample算子

1. 作用

takeSample 用于从 RDD 中随机抽取一定数量的元素返回的是一个 Python list(而不是 RDD)。

它常用于数据探索,比如从一个很大的分布式数据集中 随机取样 看看大概长什么样。


2. 函数签名

RDD.takeSample(withReplacement, num, seed=None)
  • withReplacement: True/False

    • True有放回抽样(同一个元素可能被多次抽到)

    • False:无放回抽样(每个元素最多出现一次)

  • num: int

    • 需要抽取的样本数量

  • seed: int可选

    • 随机数种子。指定后每次结果一致;不指定时每次运行结果可能不同


3. 返回值

返回的是一个 list,包含抽到的样本。

⚠️ 注意:不会返回一个 RDD,而是直接把样本收集到 driver 程序

        

PySpark 的 takeSample 里,如果是无放回抽样 (withReplacement=False),且你请求的样本数量 大于 RDD 总数,即: num > RDD.count(),结果会直接返回 整个 RDD,不会报错。


4. 示例代码

from pyspark import SparkContextsc = SparkContext("local", "TakeSampleExample")data = sc.parallelize(range(1, 101))  # RDD: 1 ~ 100# 无放回抽样,取 10 个
sample1 = data.takeSample(False, 10)
print("无放回抽样:", sample1)# 有放回抽样,取 10 个
sample2 = data.takeSample(True, 10)
print("有放回抽样:", sample2)# 固定随机种子
sample3 = data.takeSample(False, 10, seed=42)
print("固定种子:", sample3)

运行可能结果:

无放回抽样: [57, 3, 85, 21, 92, 38, 44, 71, 5, 66]
有放回抽样: [21, 21, 45, 72, 3, 98, 45, 12, 7, 7]
固定种子: [63, 2, 73, 82, 23, 18, 47, 74, 96, 94]

5. 特点 & 注意点

  1. 返回 Python list,所以抽样结果会被拉回 driver 内存。

    • 不适合 num 特别大(比如几百万),会导致 driver 内存爆炸

  2. sample 不同

    • sample(withReplacement, fraction, seed)返回 RDD(按比例抽样)

    • takeSample(withReplacement, num, seed) → 返回 list(按数量抽样)

    总结:

    • 想要 指定比例抽样 → 用 sample

    • 想要 指定数量抽样 → 用 takeSample


6. 使用场景

  • 调试 / 探索:比如 RDD 太大,不可能直接 collect(),就可以 takeSample(False, 20) 随机取 20 个元素看一眼。

  • 机器学习抽样:从数据集中随机取一部分作为训练集 / 测试集。

  • 模拟实验:需要随机数据时快速取一批样本。

1-10、takeOrder算子


1. 作用

takeOrdered 用于 从 RDD 中取出前 n 个元素返回的是一个 Python list

  • 默认情况下,按 升序 排序后取前 n 个;(最小的前n个)

  • 也可以通过 key 参数指定排序规则。


2. 函数签名

RDD.takeOrdered(num, key=None)
  • num: int
    要取的元素个数。

  • key: function(可选)
    用来指定排序方式。

    • 不指定 → 默认升序

    • 指定 lambda x: -x → 可以变成降序


3. 返回值

返回一个 list,长度最多是 num,包含排序后的前 n 个元素。

(⚠️ 和 takeSample 一样,也会把结果拉回到 driver


4. 示例代码

from pyspark import SparkContext
sc = SparkContext("local", "TakeOrderedExample")data = sc.parallelize([5, 1, 8, 3, 2, 10, 6])# 取前 3 个最小的元素(默认升序)
result1 = data.takeOrdered(3)
print("最小的3个:", result1)# 取前 3 个最大的元素(用 key 参数)
result2 = data.takeOrdered(3, key=lambda x: -x)
print("最大的3个:", result2)# 按元素的平方排序,取前 3 个
result3 = data.takeOrdered(3, key=lambda x: x*x)
print("平方最小的3个:", result3)

可能输出:

最小的3个: [1, 2, 3]
最大的3个: [10, 8, 6]
平方最小的3个: [1, 2, 3]

5. 特点 & 注意点

  1. 返回 Python list,结果会直接拉到 driver。

    • 如果 num 很大,可能导致内存压力。

  2. 和其他算子的区别

    • top(n):返回最大的 n 个元素,默认降序。(只能是降序

    • takeOrdered(n):返回最小的 n 个元素,默认升序。

    • sortBy(key, ascending, numPartitions):返回排序后的 RDD,比 takeOrdered 重得多,因为它要分布式全排序

    总结:

    • 只想取 前 n 个 → 用 takeOrderedtop(高效)

    • 想要 全局排序 → 用 sortBy(代价更大)


6. 使用场景

  • Top-N 或 Bottom-N 样本,比如成绩前 10 名、销售额最高的 5 个商品。

  • 数据探索时快速查看极值(最小/最大值)。

  • 机器学习前的数据预处理,比如截取一部分样本。


1-11、foreach算子

1. 作用

foreach 用于对 RDD 的每个元素执行一个指定的函数(function),但 不会返回任何结果

它的典型用途是:

  • 在每个分区的 worker 节点上,对数据做副作用操作,比如写数据库、写文件、更新计数器。


2. 函数签名

RDD.foreach(f)
  • f: 一个函数,接收 RDD 的元素作为输入,对它进行处理。


3. 特点

  1. 没有返回值

    • foreach 的返回值是 None,所以你不能像 map 那样拿到新 RDD。

    • 它是一个 Action 算子,会触发真正的执行。

  2. 副作用在 Executor 端发生

    • 函数 f 会在集群各个节点(Executor)上执行,而不是在 driver 上。

    • 所以你在 fprint日志会打印到 Executor 的日志里,而不是 driver 的控制台。

    • 如果你要在 driver 上调试看数据,可以用 collect()

  3. 常用场景

    • 写数据库:foreach(lambda x: save_to_mysql(x))

    • 写文件系统:foreach(lambda x: write_to_hdfs(x))

    • 更新外部存储:foreach(lambda x: redis_client.set(x[0], x[1]))


4. 示例

from pyspark import SparkContextsc = SparkContext("local", "ForeachExample")data = sc.parallelize([1, 2, 3, 4, 5])def process(x):print(f"处理元素: {x}")# foreach 对每个元素执行 process
data.foreach(process)

⚠️ 注意:

  • 在本地模式(local)下,你可能能在控制台看到输出。

  • 集群模式(YARN、Standalone、Mesos),打印信息会在 Executor 日志,driver 控制台一般看不到。


5. foreach 和 foreachPartition 的区别

  • foreach(f)
    → 每个元素都执行一次 f

  • foreachPartition(f)
    → 每个 分区 执行一次 ff 的输入是该分区的迭代器。

一般写数据库、写外部存储时推荐 foreachPartition,这样可以:

  • 避免频繁建立连接(每个分区建立一次连接,而不是每条记录都建立)。

  • 提高性能。


6. 对比 map

算子是否返回新 RDD是否触发 Action典型用途
map✅ 是❌ 否数据转换
foreach❌ 否✅ 是副作用操作(写库/打印/发送消息)

1-12、saveAsTextFile算子


1. 基本功能

saveAsTextFile(path)Action算子(触发计算的算子),用于RDD 的内容 保存到 HDFS、本地文件系统或其他兼容 Hadoop 的文件系统,存储格式是 文本文件

  • 每个元素会被转换为一行字符串(调用 str() 方法)

  • 最终生成的结果是 一个目录,而不是单个文件

  • 目录中包含多个分区文件(如 part-00000part-00001 …),每个文件对应 RDD 的一个分区


2. 使用方法

# 假设已有一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=2)# 保存为文本文件
rdd.saveAsTextFile("output_rdd")

结果目录结构

output_rdd/├── part-00000├── part-00001└── _SUCCESS
  • part-00000part-00001:存储 RDD 每个分区的数据

  • _SUCCESS:一个空文件,表示任务成功结束


3. 关键注意事项

  1. 路径必须不存在
    Spark 默认不允许写入已存在的目录,否则会报错:

    org.apache.hadoop.mapred.FileAlreadyExistsException
    

    解决办法:先删除旧目录,再保存。

    import shutil
    shutil.rmtree("output_rdd", ignore_errors=True)
    rdd.saveAsTextFile("output_rdd")
    
  2. 输出是多个文件
    如果需要单个文件,可以在保存前 合并分区

    rdd.coalesce(1).saveAsTextFile("output_single_file")
    

    输出目录下只会有一个 part-00000

  • coalesce(1) 会把 RDD 的所有数据压缩到 一个分区。“创建一个新的目标分区,然后把数据往里压”。
  • 保存时 Spark 会根据分区数写出文件,因此只会生成 一个 part-00000 文件

如果是要交付给外部系统(比如 CSV 文件要交给别人用),那通常会 coalesce(1)

        3. 数据类型要求

        示例:

kv_rdd = sc.parallelize([("a", 1), ("b", 2)], 2)
kv_rdd.saveAsTextFile("output_kv")
# 文件内容大概是:
# ('a', 1)  part-00000
# ('b', 2)  part-00001
  • saveAsTextFile 默认调用 str() 转换元素

  • 如果是 (key, value) 形式的 RDD,输出会是 (key, value) 的字符串


4. 典型应用场景

  • 保存日志处理结果到 HDFS

  • 将 RDD 转换为文本存储,供下游任务(Hive、Spark SQL)使用

  • saveAsSequenceFilesaveAsObjectFile 对比,用于不同场景的持久化存储


【小结】:

  • foreach
  • saveAsTestFile

这两个算子是分区(Excutor)直接执行的,跳过Driver,由所在的分区(Excutor)直接执行,性能比较好!

其余的Action算子都会将结果发送至Driver

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/93819.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/93819.shtml
英文地址,请注明出处:http://en.pswp.cn/web/93819.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Android] 显示的内容被导航栏这挡住

上图中弹出的对话框的按钮“Cancel/Save”被导航栏遮挡了部分显示&#xff0c;影响了使用。Root cause: Android 应用的主题是 Theme.AppCompat.Light1. 修改 AndroidManifest.xml 将 application 标签的 android:theme 属性指向新的自定义主题&#xff1a;<applicationandr…

分贝单位全指南:从 dB 到 dBm、dBc

引言在射频、音频和通信工程中&#xff0c;我们经常会在示波器、频谱仪或测试报告里看到各种各样的dB单位&#xff0c;比如 dBm、dBc、dBV、dBFS 等。它们看起来都带个 dB&#xff0c;实则各有不同的定义和参考基准&#xff1a;有的表示相对功率&#xff0c;有的表示电压电平&a…

怎么确定mysql 链接成功了呢?

asyncio.run(test_connection()) ✗ Connection failed: cryptography package is required for sha256_password or caching_sha2_password auth methods 根据你提供的错误信息,问题出现在 MySQL 的认证插件和加密连接配置上。以下是几种解决方法: 1. 安装 cryptography 包…

(5)软件包管理器 yum | Vim 编辑器 | Vim 文本批量化操作 | 配置 Vim

Ⅰ . Linux 软件包管理器 yum01 安装软件在 Linux 下安装软件并不像 Windows 下那么方便&#xff0c;最通常的方式是去下载程序的源代码并进行编译&#xff0c;从而得到可执行程序。正是因为太麻烦&#xff0c;所以有些人就把一些常用的软件提前编译好并做成软件包&#xff0c;…

VGG改进(3):基于Cross Attention的VGG16增强方案

第一部分&#xff1a;交叉注意力机制解析1.1 注意力机制基础注意力机制的核心思想是模拟人类的选择性注意力——在处理信息时&#xff0c;对重要部分分配更多"注意力"。在神经网络中&#xff0c;这意味着模型可以学习动态地加权输入的不同部分。传统的自注意力(Self-…

代理ip平台哪家好?专业代理IP服务商测评排行推荐

随着互联网的深度发展&#xff0c;通过网络来获取全球化的信息资源&#xff0c;已成为企业与机构在竞争中保持优势的一大举措。但想要获取其他地区的信息&#xff0c;可能需要我们通过代理IP来实现。代理IP平台哪家好&#xff1f;下文就让我们从IP池资源与技术优势等细节&#…

PWA》》以京东为例安装到PC端

如果访问 浏览器右侧出现 安装 或 点击这个 也可以完成安装桌面 会出现 如下图标

Linux系统:C语言进程间通信信号(Signal)

1. 引言&#xff1a;从"中断"到"信号"想象一下&#xff0c;你正在书房专心致志地写代码&#xff0c;这时厨房的水烧开了&#xff0c;鸣笛声大作。你会怎么做&#xff1f;你会暂停&#xff08;Interrupt&#xff09; 手头的工作&#xff0c;跑去厨房关掉烧水…

LoRa 网关组网方案(二)

LoRa 网关组网方案 现有需求&#xff1a;网关每6秒接收不同节点的数据&#xff0c;使用SX1262芯片。 以下是完整的组网方案&#xff1a;1. 网络架构设计 采用星型拓扑&#xff1a; 网关&#xff1a;作为中心节点&#xff0c;持续监听多个信道节点&#xff1a;分布在网关周围&am…

服装外贸系统软件怎么用才高效防风险?

服装外贸系统软件概述 服装外贸系统软件&#xff0c;如“艾格文ERP”&#xff0c;是现代外贸企业不可或缺的管理工具。它整合了订单处理、库存管理、客户资源保护、财务控制等多功能模块&#xff0c;旨在全面提升业务运营效率。通过系统化的管理方式&#xff0c;艾格文ERP能够从…

【沉浸式解决问题】peewee.ImproperlyConfigured: MySQL driver not installed!

目录一、问题描述二、原因分析三、解决方案✅ 推荐&#xff1a;安装 pymysql&#xff08;纯 Python&#xff0c;跨平台&#xff0c;安装简单&#xff09;✅ 可选&#xff1a;安装 mysqlclient&#xff08;更快&#xff0c;但需要本地编译环境&#xff09;✅ 总结四、mysql-conn…

C++进阶-----C++11

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

(论文速读)航空轴承剩余寿命预测:多生成器GAN与CBAM融合的创新方法

论文题目&#xff1a;Remaining Useful Life Prediction Approach for Aviation Bearings Based on Multigenerator Generative Adversarial Network and CBAM&#xff08;基于多发生器生成对抗网络和CBAM的航空轴承剩余使用寿命预测方法&#xff09;期刊&#xff1a;IEEE TRAN…

3ds Max 流体模拟终极指南:从创建到渲染,打造真实液体效果

流体模拟是提升 3D 场景真实感的重要技术之一。无论是模拟飞瀑流泉、杯中溢出的饮料&#xff0c;还是黏稠的蜂蜜或熔岩&#xff0c;熟练掌握流体动力学无疑能为你的作品增色不少。本文将以 3ds Max 为例&#xff0c;系统讲解流体模拟的创建流程与渲染方法&#xff0c;帮助你实现…

《算法导论》第 35 章-近似算法

大家好&#xff01;今天我们深入拆解《算法导论》第 35 章 ——近似算法。对于 NP 难问题&#xff08;如旅行商、集合覆盖&#xff09;&#xff0c;精确算法在大规模数据下往往 “力不从心”&#xff0c;而近似算法能在多项式时间内给出 “足够好” 的解&#xff08;有严格的近…

系统架构设计师-操作系统-避免死锁最小资源数原理模拟题

写在前面&#xff1a;银行家算法的核心目标是确保系统始终处于“安全状态”。一、5个进程各需2个资源&#xff0c;至少多少资源避免死锁&#xff1f; 解题思路 根据死锁避免的资源分配公式&#xff0c;不发生死锁的最少资源数为&#xff1a; 最少资源数k(n−1)1 \text{最少资源…

Preprocessing Model in MPC 2 - 背景、基础原语和Beaver三元组

参考论文&#xff1a;SoK: Multiparty Computation in the Preprocessing Model MPC (Secure Multi-Party Computation) 博士生入门资料。抄袭必究。 本系列教程将逐字解读参考论文(以下简称MPCiPPM)&#xff0c;在此过程中&#xff0c;将论文中涵盖的40篇参考文献进行梳理与讲…

ACCESS/SQL SERVER保存软件版本号为整数类型,转成字符串

在 Access 中&#xff0c;若已将版本号&#xff08;如1.3.15&#xff09;转换为整数形式&#xff08;如10315&#xff0c;即1*10000 3*100 15&#xff09;&#xff0c;可以通过 SQL 的数学运算反向解析出原始版本号格式&#xff08;主版本.次版本.修订号&#xff09;。实现思…

编程语言学习

精通 Java、Scala、Python、Go、Rust、JavaScript ✅ 1. Java 面向对象编程&#xff08;OOP&#xff09;、异常处理、泛型JVM 原理、内存模型&#xff08;JMM&#xff09;、垃圾回收&#xff08;GC&#xff09;多线程与并发&#xff08;java.util.concurrent&#xff09;Java 8…

软件测试:如何利用Burp Suite进行高效WEB安全测试

Burp Suite 被广泛视为 Web 应用安全测试领域的行业标准工具集。要发挥其最大效能&#xff0c;远非简单启动扫描即可&#xff0c;而是依赖于测试者对其模块化功能的深入理解、有机组合及策略性运用。一次高效的测试流程&#xff0c;始于精细的环境配置与清晰的测试逻辑。测试初…