文章目录

RDD-Resilient Distributed Dataset

一、RDD五大特性

二、RDD创建方式


RDD-Resilient Distributed Dataset

在 Apache Spark 编程中,RDD(Resilient Distributed Dataset,弹性分布式数据集)是 Spark Core 中最基本的数据抽象,代表一个不可变、可分区、可并行计算的元素集合。RDD 允许用户在集群上以容错的方式执行计算。

一、RDD五大特性

首先回顾下Spark WordCount的核心代码流程:

sc.textFile("...").flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).foreach(println)

结合以上代码,我们理解RDD的五大特性,RDD理解图如下:

RDD五大特性:

1) RDD由一系列Partition组成(A list of partitions)

RDD由多个Partition组成,这些Partition分布在集群的不同节点上。如果读取的是HDFS中的数据,每个partition对应一个Split,每个Split大小默认与每个Block大小一样,。

2) 函数是作用在每个Partition(Split)上的(A function for computing each split)

RDD 定义了在每个分区上进行计算的函数,例如 flatMap、map 等操作,这些函数对每个分区中的数据进行处理。

3) RDD之间有依赖关系(A list of dependencies on other RDDs)

RDD之间存在依赖关系,上图中RDD2可以基于RDD1生成,RDD1叫做父RDD,RDD2叫做子RDD。

4) 分区器作用在K,V格式的RDD上(Optionally, a Partitioner for key-value RDDs)

上图中RDD3中的数据是Tuple类型,这种类型叫做K,V格式的RDD。Spark分区器作用是决定数据发往下游RDD哪个Partition中,分区器只能作用在这种K,V格式的RDD中,默认根据Key的hash值与下游RDD的Partition个数取模决定该条数据去往下游RDD的哪个Paritition中。

5) RDD提供一系列最佳的计算位置(Optionally, a list of preferred locations to compute each split on)

RDD 提供每个分区的最佳计算位置,通常是数据所在的节点,这样可以将计算task调度到数据所在的位置,减少数据传输,提高计算效率(计算移动,数据不移动原则)。

关于RDD的注意点如下:

  • textFile底层读取文件方式与MR读取文件方式类似,首先对数据split,默认Split是一个block大小。
  • 读取数据文件时,RDD的Paritition个数默认与Split个数相同,也可以在创建RDD的时候指定,Partition是分布在不同节点上的。
  • RDD虽然叫做数据集,但实际上不存储数据,RDD类似迭代器,对象不可变,处理数据时,下游RDD会依次向上游RDD获取对应数据,这就是RDD之间为什么有依赖关系的原因。
  • 如果RDD中数据类型为二元组对象,那么这种RDD我们称作K,V格式的RDD。
  • RDD的弹性体现在RDD中Partition个数可以由用户设置、RDD可以根据依赖关系基于上一个RDD按照迭代器方式计算出下游RDD。
  • RDD提供最佳计算位置,task发送到相应的partition节点上处理数据,体现了“计算移动,数据不移动”的理念。

二、RDD创建方式

在Spark中创建RDD可以通过读取集合、读取文件方式创建,还可以基于已有RDD转换创建,后续我们主要使用第三种方式,这里先介绍前两种方式。下面分别使用Java和Scala API演示RDD的创建。

  • Java API
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("generateRDD");
JavaSparkContext sc = new JavaSparkContext(conf);
//1.从集合中创建RDD,并指定并行度为3,默认并行度为1
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b", "c", "d"),3);
System.out.println("rdd1并行度为:"+rdd1.getNumPartitions());
rdd1.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}
});//2.从集合创建K,V格式RDD
JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("a", 1),new Tuple2<String, Integer>("b", 2),new Tuple2<String, Integer>("c", 3),new Tuple2<String, Integer>("d", 4)
));rdd2.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp._1 + " " + tp._2);}
});//3.从文件中创建RDD,并指定并行度为3,默认并行度为1
JavaRDD<String> rdd3 = sc.textFile("./data/data.txt",3);
System.out.println("rdd3并行度为:"+rdd3.getNumPartitions());
rdd3.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}
});
  • Scala API
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("GenerateRDDTest")
val sc = new SparkContext(conf)
//1.从集合创建RDD,并指定并行度为3,默认并行度为1
val rdd1 =sc.parallelize(1 to 20,3)
println(s"rdd1 并行度为:${rdd1.getNumPartitions}")
rdd1.foreach(println)//2.从集合创建K,V格式RDD
val rdd1KV:RDD[(String,Int)] = sc.parallelize(Array(("a",1),("b",2),("c",3),("d",4),("e",5)))
println(s"rdd1KV 并行度为:${rdd1KV.getNumPartitions}")
rdd1KV.foreach(println)//3.从集合创建RDD,并指定并行度为3,默认并行度为1
val rdd2 =sc.makeRDD(1 to 20,3)
println(s"rdd2 并行度为:${rdd2.getNumPartitions}")
rdd2.foreach(println)//4.从文件创建RDD,并指定并行度为3,默认并行度为1
val rdd3 = sc.textFile("./data/data.txt",3)
println(s"rdd3 并行度为:${rdd2.getNumPartitions}")
rdd3.foreach(println)

注意以下两点:

1、无论是基于集合或者文件创建RDD,默认RDD分区数为1,也可以在创建时指定RDD paritition个数;

2、Scala API中parallelize方法可以从集合中得到K,V或者非K,V格式RDD,还可以通过makeRDD方法读取集合转换成RDD。formation算子对RDD进行转换处理。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/98404.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/98404.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/98404.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java,通过SqlSessionFactory实现动态表明的插入和查询(适用于一个版本一个表的场景)

1,测试实体类package org.springblade.sample.test;import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data;/*** Author: 肖扬* CreateTime: 2025-09-05* Description: SqlSessionFactoryTest测试* Version: 1.0*/ Data TableName("session_factory_…

鹧鸪云光储流程系统全新升级:视频指引与分阶段模块使用指南

鹧鸪云光储流程系统近日完成重要更新&#xff0c;全面优化了操作指引体系&#xff0c;为用户带来更高效、直观的使用体验。本次升级重点推出了全套功能操作视频&#xff0c;并明确了不同业务阶段的核心模块使用指南&#xff0c;助力用户快速上手、提升工作效率。全覆盖视频操作…

ChatGPT 协作调优:把 SQL 查询从 5s 优化到 300ms 的全过程

ChatGPT 协作调优&#xff1a;把 SQL 查询从 5s 优化到 300ms 的全过程 &#x1f31f; Hello&#xff0c;我是摘星&#xff01; &#x1f308; 在彩虹般绚烂的技术栈中&#xff0c;我是那个永不停歇的色彩收集者。 &#x1f98b; 每一个优化都是我培育的花朵&#xff0c;每一个…

复杂计算任务的智能轮询优化实战

目录 复杂计算任务的智能轮询优化实战 一、轮询方法介绍 二、三种轮询优化策略 1、用 setTimeout 替代 setInterval 2、轮询时间指数退避 3、标签页可见性检测&#xff08;Page Visibility API&#xff09; 三、封装一个简单易用的智能轮询方法 四、结语 作者&#xff…

Java开发中常用CollectionUtils方式,以及Spring中CollectionUtils常用方法示例

场景 Java开发中常用的CollectionUtils 一、Spring Framework的CollectionUtils 包路径&#xff1a;org.springframework.util.CollectionUtils 核心方法&#xff1a; isEmpty(Collection<?> coll) List<String> list null; boolean empty CollectionUtil…

人工智能学习:Transformer结构(文本嵌入及其位置编码器)

一、输入部分介绍 输入部分包含: 编码器源文本嵌入层及其位置编码器 解码器目标文本嵌入层及其位置编码器 在transformer的encoder和decoder的输入层中,使用了Positional Encoding,使得最终的输入满足: 这里,input_embedding是通过常规embedding层,将每一个词的…

⸢ 肆 ⸥ ⤳ 默认安全建设方案:c-1.增量风险管控

&#x1f44d;点「赞」&#x1f4cc;收「藏」&#x1f440;关「注」&#x1f4ac;评「论」 在金融科技深度融合的背景下&#xff0c;信息安全已从单纯的技术攻防扩展至架构、合规、流程与创新的系统工程。作为一名从业十多年的老兵&#xff0c;将系统阐述数字银行安全体系的建设…

第二课、熟悉Cocos Creator 编辑器界面

本文主要介绍Cocos Creator 编辑器界面中几个常规的面板功能&#xff0c;让新手了解编辑器界面中常规的面板功能&#xff0c;更好的使用Cocos Creator 编辑器。一、编辑器界面常规面板划分Cocos Creater编辑器默认样式如上&#xff0c;主要包含&#xff1a;1、工具栏&#xff0…

Elixir通过Onvif协议控制IP摄像机,扩展ExOnvif的摄像头连续移动功能 ContinuousMove

Elixir 通过Onvif 对IP设备进行控制时&#xff0c;可以使用 ExOnvif 库。ExOnvif官方文档 此文章仅提供了ContinuousMove的控制方式及示例。 Elixir Onvif协议控制IP设备的其他命令&#xff0c;可以参考以下链接 绝对移动 【AbsoluteMove】 调用指定预置位 【GotoPreset】 …

android studio JNI 环境配置实现 java 调用 c/c++

1、在 app 级的 build.gradle 文件配置两个地方 android{ defaultConfig{ // 在 defaultConfig 里配置下面代码 externalNativeBuild { cmake { cppFlags "-frtti -fexceptions"//添加对 c 的异常处理支持 …

静态时序分析详解之时序路径类型

目录 一、概览 二、时序路径 2.1 数据路径 2.2 时钟路径 2.3 时钟门控路径 2.4 异步路径 2.5 关键路径 2.6 False路径 2.7 单周期路径 2.8 多周期路径 2.9 最长路径和最短路径 三、参考资料 一、概览 ​ ​静态时序分析通过模拟最差条件下分析所有的时序路径&am…

SpringBoot埋点功能技术实现方案深度解析:架构设计、性能优化与扩展性实践

SpringBoot埋点功能技术实现方案深度解析&#xff1a;架构设计、性能优化与扩展性实践 1. 原理剖析与技术实现细节 1.1 埋点技术基本原理 埋点&#xff08;Tracking&#xff09;是通过在代码中植入特定逻辑&#xff0c;收集用户行为数据、系统运行状态和业务指标的技术手段。在…

自建prometheus监控腾讯云k8s集群

自建prometheus监控腾讯云k8s集群 使用场景 k8s集群&#xff08;腾讯云容器服务&#xff09; promtheus (外部自建服务) 腾讯云提供了容器内部自建 Prometheus 监控 TKE 集群的文档&#xff0c;参考。 当前的环境promethues建在k8S外的云服务器上&#xff0c;与上面链接文…

2025高教社国赛数学建模C题参考论文(含模型和代码)

2025 年高教社杯大学生数学建模竞赛 C 题参考论文 目录 NIPT 的时点选择与胎儿的异常判定 摘要 1 问题重述 2 问题分析 2.1 问题 1 分析 2.2 问题 2 分析 2.3 问题 3 分析 2.4 问题 4 分析 3 模型假设与符号定义 3.1 模型假设 4. 孕周在 10-25 周内检测有…

iOS开发环境搭建及打包流程

一、下载xcode 直接去苹果商店的appstore下载就行 二、clone项目 1.登录xcode苹果账号或对应代码仓库账号 2.clone项目 3.安装设备真机环境&#xff08;未安装过的话&#xff09; 三.安装cocoapods 1. 检查并更新 Ruby 环境 CocoaPods 是基于 Ruby 编写的&#xff0c;因此…

数据结构之链表(单向链表与双向链表)

一&#xff0c;链表描述链表是一种常见的重要的数据结构,是动态地进行存储分配的一种结构。常用于需存储的数据的数目无法事先确定。1.链表的一般结构链表的组成&#xff1a; 头指针&#xff1a;存放一个地址&#xff0c;该地址指向一个元素 结点&#xff1a;用户需要的实际数据…

从反向代理到负载均衡:Nginx + Tomcat 构建高可用Web服务架构

从反向代理到负载均衡&#xff1a;Nginx Tomcat 构建高可用Web服务架构 文章目录从反向代理到负载均衡&#xff1a;Nginx Tomcat 构建高可用Web服务架构一、基础铺垫&#xff1a;什么是反向代理&#xff1f;1.1 反向代理的核心原理1.2 Nginx反向代理实战配置步骤1&#xff1a…

Simulink中使用Test sequence单元测试

一、Tips 在对simulink模型进行Test sequence单元测试时&#xff0c;如果采取书写测试用例的话&#xff0c;有以下操作。 1、使用”fprintf(‘time%f\n’, t);“来打印当前step的时间&#xff1b; 二、数据类型转换 1、double类型 -> boolean类型 clc; clear all;% 1、doubl…

【mysql】SQL自连接:什么时候需要,什么时候不需要?

SQL自连接:什么时候需要,什么时候不需要? 通过具体示例和对比解析,彻底搞懂SQL自连接的使用场景 在处理SQL查询时,尤其是当表中存在自引用关系(如referee_id引用同一张表的id)时,很多开发者会疑惑:这个查询到底需不需要自连接?本文将通过多个具体示例,带你彻底弄清何…

「美」创新在于人,而不是产品 - AxureMost 落葵网

添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 第一章&#xff1a;创新的心理学 创新与心理安全 蜡烛问题&#xff1a;卡尔邓克尔的蜡烛问题实验揭示了创造性思维的重要性。通过颠覆对盒子用途的先入为主观念&#xff0c;参与者能够找到创新性的解决方案…