目录

KV算子

parallelizePairs

mapToPair

mapValues

groupByKey

reduceByKey

sortByKey

算子应用理解

reduceByKey和groupByKey的区别

groupByKey+mapValues实现KV数据的V的操作

改进用reduceByKey

groupby通过K和通过V分组的模板代码

问题集锦

宝贵的经验


这里会讲到之前还未讲到过的KV算子。我们之前的操作都是单值操作,这一篇我们会着重讲到KV操作、行动算子和持久化等知识。

KV算子

作用:操作KV流数据,能够分别操作K和V

出现JavaPairRDD就表示出现了成对KV数据流

parallelizePairs

作用:封装Tuple2集合形成RDD

细节源码如下


 

mapToPair


作用:配合parallelizePairs方法
1.单值数据转化成KV对数据
2.Tuple元组整体转化成KV键值对形式

 


两者一起的代码

        JavaPairRDD<String, Integer> JRD = sc.parallelizePairs(Arrays.asList(a, a1, a2, a3));JRD.mapToPair( tuple -> new Tuple2<>(tuple._1, tuple._2*2)).collect().forEach(System.out::println);

mapValues

作用:K不变,操作KV流中的V,并且只要类型是JavaPairRDD就可以用此方法

        

示意图

        
代码实现

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);sc.parallelize(list).mapToPair(integer -> new Tuple2<Integer,Integer>(integer, integer * 2)).mapValues(int1 -> int1 * 2).collect().forEach(System.out::println);

这里配合一个wordcount案例加深一下理解


思考链条:
读取文件textFile --> flatmap扁平化流数据(String[] -> String)->groupby分组 ->mapValues按照V来计算


代码

//TODO 写一个wordcountJavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));JavaRDD<String> rdd = javaSparkContext.textFile("E:\\ideaProjects\\spark_project\\data\\wordcount");rdd.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {return Arrays.asList(s.split(" ")).iterator();}}).groupBy(n -> n).mapValues(iter -> {int sum = 0;for (String word : iter) {sum++;}return sum;}).collect().forEach(System.out::println);javaSparkContext.close();

 所以,整个转换过程是:
   - 输入:一行字符串(`String`)
   - 用`split`方法:将该行字符串分割成字符串数组(`String[]`)
   - 用`Arrays.asList`:将字符串数组转换为字符串列表(`List<String>`)
   - 调用列表的`iterator`方法:得到字符串的迭代器(`Iterator<String>`)
   - 在`flatMap`中,Spark会遍历这个迭代器,将每个字符串(单词)作为新元素放入结果RDD。

flatmap本质:都是将数组转换成一个可以逐个访问其元素的迭代器

groupByKey

作用:将KV对按照K对V进行分组


代码实现
 

        JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 2);Tuple2<String, Integer> c = new Tuple2<>("a", 3);Tuple2<String, Integer> d = new Tuple2<>("b", 4);javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).collect().forEach(System.out::println);System.out.println();javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).groupByKey(3).collect().forEach(System.out::println);

reduceByKey

作用:在KV对中,按照K对V进行聚合操作,(底层会在分区内进行预聚合优化)


代码实现
对二元组进行按照K对V相加的聚合操作

        

                javaSparkContext.parallelizePairs(tuple2s).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}}).collect().forEach(System.out::println);

sortByKey

        
作用:按照K进行XXX的升序/降序排列

代码实现

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> aa0 = new Tuple2<>("a", 4);Tuple2<String, Integer> aa1 = new Tuple2<>("a", 1);Tuple2<String, Integer> aa2 = new Tuple2<>("a", 2);Tuple2<String, Integer> bb1 = new Tuple2<>("b", 2);Tuple2<String, Integer> aa3 = new Tuple2<>("a", 3);Tuple2<String, Integer> bb2 = new Tuple2<>("b", 1);ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>(Arrays.asList(aa0,aa1, aa2, aa3, bb1, bb2));javaSparkContext.parallelizePairs(tuple2s).sortByKey().collect().forEach(System.out::println);javaSparkContext.close();

传入参数为false时



Comparable接口的使用

利用自定义类型进行排序操作

        JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Artist at1 = new Artist("小王", 100);Artist at2 = new Artist("小李", 1000);Artist at3 = new Artist("小赵", 10000);Artist at4 = new Artist("小宇", 100000);Tuple2<Artist, Integer> artistIntegerTuple2 = new Tuple2<>(at1, 1);Tuple2<Artist, Integer> artistIntegerTuple3 = new Tuple2<>(at2, 2);Tuple2<Artist, Integer> artistIntegerTuple4 = new Tuple2<>(at3, 3);Tuple2<Artist, Integer> artistIntegerTuple5 = new Tuple2<>(at4, 4);JavaPairRDD<Artist, Integer> artistIntegerJavaPairRDD = javaSparkContext.parallelize(Arrays.asList(artistIntegerTuple2, artistIntegerTuple3, artistIntegerTuple4, artistIntegerTuple5)).mapToPair(t -> t);artistIntegerJavaPairRDD.sortByKey().collect().forEach(System.out::println);javaSparkContext.close();class Artist implements Serializable, Comparable<Artist> {String name;int salary;public Artist(String name, int salary) {this.name = name;this.salary = salary;}@Overridepublic int compareTo(Artist o) {return o.salary - this.salary;}@Overridepublic String toString() {return "Artist{" +"name='" + name + '\'' +", salary=" + salary +'}';}
}

coalesce

        
作用:缩减分区,不会自动进行shuffle


示意图


代码实现

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);javaSparkContext.parallelize(tuple2s).coalesce(2).collect().forEach(System.out::println);javaSparkContext.close();


repartition

        
作用:调整分区数,等价于coalesce的shuffle=true时

示意图


代码实现
 

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);javaSparkContext.parallelize(tuple2s).repartition(2).saveAsTextFile("out2");javaSparkContext.close();

算子应用理解

reduceByKey和groupByKey的区别

 

性能更高:在shuffle之前有一个预聚合的功能Combine,可以将分区中的小文件合并,减少shuffle落盘的数据量
因此在实际开发中





groupByKey+mapValues实现KV数据的V的操作

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 2);Tuple2<String, Integer> c = new Tuple2<>("a", 3);Tuple2<String, Integer> d = new Tuple2<>("b", 4);System.out.println();javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).groupByKey(3).mapValues(new Function<Iterable<Integer>, Integer>() {@Overridepublic Integer call(Iterable<Integer> v1) throws Exception {int sum = 0;for (Integer v2 : v1) {sum += v2;}return sum;}}).collect().forEach(System.out::println);javaSparkContext.close();

改进用reduceByKey

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 1);Tuple2<String, Integer> c = new Tuple2<>("a", 2);Tuple2<String, Integer> d = new Tuple2<>("b", 2);ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>(Arrays.asList(a, b, c, d));
javaSparkContext.parallelizePairs(tuple2s).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}}).collect().forEach(System.out::println);javaSparkContext.close();

groupby通过K和通过V分组的模板代码
 

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 1);Tuple2<String, Integer> c = new Tuple2<>("a", 2);Tuple2<String, Integer> d = new Tuple2<>("b", 2);System.out.println();javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).groupBy(new Function<Tuple2<String, Integer>, Integer>() {@Overridepublic Integer call(Tuple2<String, Integer> v1) throws Exception {return v1._2(); //通过Values分组 将2改为1就是通过K分组}}).collect().forEach(System.out::println);javaSparkContext.close();

数据转换图

问题集锦


1.iterator迭代器怎么迭代,它在mapValues方法中的传出类型是iterator类型,并且在将Lambda和匿名内部类互转的时候注意传出泛型即可。(其中封装了两种迭代方法)

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = sc.parallelize(list).groupBy(n -> n % 2, 2);groupByRDD.mapValues(new Function<Iterable<Integer>, Integer>() {public Integer call(Iterable<Integer> integers) {int sum = 0;Iterator<Integer> iterator = integers.iterator();while (iterator.hasNext()) {sum += iterator.next();}return sum;
//                        int sum = 0;
//                        for (Integer i : integers) {
//                            sum += i;
//                        }
//                        return sum;}}).collect().forEach(System.out::println);

宝贵的经验

1.function函数传入泛型不能修改,但是传出泛型可以修改



2.正则表达式可以通过中括号将多次分割的逻辑封装到一行代码中



3.RDD采用了和javaIO一样的设计模式-装饰者设计模式,将对象嵌套实现功能


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/917751.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/917751.shtml
英文地址,请注明出处:http://en.pswp.cn/news/917751.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度解析 TCP 三次握手与四次挥手:从原理到 HTTP/HTTPS 的应用

TCP 的三次握手和四次挥手是网络通信的基石&#xff0c;无论是 HTTP 还是 HTTPS&#xff0c;它们都依赖 TCP 提供可靠的传输层服务。本文将用万字篇幅&#xff0c;结合 Mermaid 图表和代码示例&#xff0c;深入讲解 TCP 三次握手、四次挥手的原理、过程、状态变化&#xff0c;以…

Hyper-V + Centos stream 9 搭建K8s集群(一)

一、创建虚拟机一台32G内存&#xff0c;16核心的Win11&#xff0c;已经安装了Hyper-V 管理器。然后也下载了CentOS-Stream-9-latest-x86_64-dvd1.iso的镜像文件。这里Hyper-V创建虚拟机的过程就不赘述了&#xff0c;如果出现虚拟机加载不到镜像的问题&#xff0c;先把这个使用安…

Pygame如何制作小游戏

以下是 Pygame 的详细使用指南&#xff0c;从安装到开发完整游戏的步骤说明&#xff0c;包含代码示例和最佳实践&#xff1a; 一、安装与环境配置 1. 安装 Pygame pip install pygame2. 验证安装 import pygame pygame.init() print(pygame.version.ver) # 应输出版本号&am…

@【JCIDS】【需求论证】联合能力集成与开发系统知识图谱

JCIDS(联合能力集成与开发系统)知识图谱 1. JCIDS概述 2. JCIDS的提出背景 3. JCIDS核心流程 4. JCIDS分析方法 5. JCIDS优势 6. JCIDS与采办系统的关系 7. JCIDS知识图谱结构 8. 对我的启示 9.JCIDS(联合能力集成与开发系统)相关术语列表 10. 参考文献 1. JCIDS概述 定义:…

每天学一个Linux命令(38):vi/vim

每天学一个 Linux 命令(38):vi/vim vi 和 vim(Vi IMproved)是 Linux 和 Unix 系统中功能强大的文本编辑器。vim 是 vi 的增强版,提供语法高亮、多级撤销、插件支持等更多功能。掌握 vi/vim 是 Linux 系统管理员的必备技能之一。 1. 命令简介 vi:经典的文本编辑器,几乎…

【PZ-ZU49DR-KFB】:璞致电子 UltraScale+ RFSoC 架构下的软件无线电旗舰开发平台

璞致电子 PZ-ZU49DR-KFB 开发板基于 Xilinx ZYNQ UltraScale RFSoC XCZU49DR 主控制器&#xff0c;以 "ARMFPGA 异构架构" 为核心&#xff0c;融合高带宽信号采集、高速数据处理与灵活扩展能力&#xff0c;专为专业工程师打造的软件无线电&#xff08;SDR&#xff09…

力扣106:从中序与后序遍历序列构造二叉树

力扣106:从中序与后序遍历序列构造二叉树题目思路代码题目 给定两个整数数组 inorder 和 postorder &#xff0c;其中 inorder 是二叉树的中序遍历&#xff0c; postorder 是同一棵树的后序遍历&#xff0c;请你构造并返回这颗 二叉树 。 思路 我们首先要知道中序遍历和后序…

IDEA JAVA工程入门

Maven配置&#xff1a; IDEA -> settings -> Build, Execution, Deployment -> Build Tools -> MavenMaven home pathUser setting file : 特定仓库下载依赖包&#xff0c;自动下载(界面右边M图标点开&#xff0c;)local repository &#xff08;本地仓库&#xff…

Spring依赖注入:从原理到实践的自学指南

Spring依赖注入&#xff1a;从原理到实践的自学指南 一、什么是依赖注入&#xff1f; 依赖注入&#xff08;Dependency Injection, DI&#xff09;是Spring框架实现控制反转&#xff08;IoC&#xff09;的核心手段。其核心思想是&#xff1a;对象不再自己创建依赖项&#xff…

3_软件重构_组件化开发实例方法论

1、上期回顾上次内容核心的地方有两个&#xff0c;①是C多态基类的指针指向派生类&#xff0c;用于初始化各个插件。②是使用C语言的dlopen函数“动态加载”各个插件&#xff0c;实现用户根据契约接口自定义开发插件&#xff0c;极大程度地实现了软件上的解耦。③再进一步&…

C#接口的定义与使用

第1章 接口&#xff08;interface&#xff09;是什么1.1 定义• 接口是一组“能力”或“契约”的抽象描述&#xff0c;只规定“能做什么”&#xff0c;不规定“怎么做”。• 在 C# 中&#xff0c;接口是一种完全抽象的类型&#xff08;fully abstract type&#xff09;。 • 关…

【STM32】HAL库中的实现(三):PWM(脉冲宽度调制)

&#x1f527; HAL库中的实现&#xff1a;PWM&#xff08;脉冲宽度调制&#xff09; PWM&#xff08;Pulse Width Modulation&#xff09;是基于定时器&#xff08;TIM&#xff09;产生的周期性脉冲信号&#xff0c;广泛应用于&#xff1a;① 电机调速&#xff1b;② LED 亮度控…

GitHub 趋势日报 (2025年08月03日)

&#x1f680; GitHub 趋势日报 (2025年08月03日) &#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图751dyad362LLMs-from-scratch291…

Java后端高频面试题

Java后端高频面试题 目录 Java集合框架Java并发编程JVM相关MySQL数据库Redis缓存Spring框架 Java集合框架 HashMap的数据结构是什么&#xff0c;为什么在JDK8要引入红黑树&#xff1f; HashMap数据结构&#xff1a; JDK7&#xff1a;数组 链表JDK8&#xff1a;数组 链表…

37. line-height: 1.2 与 line-height: 120% 的区别

概述 line-height 是 CSS 中用于控制文本行间距的重要属性。虽然 line-height: 1.2 和 line-height: 120% 看似相同&#xff0c;但它们在计算方式上存在关键区别&#xff0c;尤其是在继承和计算值方面。1. 计算方式不同写法类型计算方式说明line-height: 1.2无单位&#xff08;…

蓝桥杯----DS1302实时时钟

&#xff08;六&#xff09;、DS1302实时时钟1、原理&#xff08;图 二十六&#xff09;DS1302通过三线串行接口与单片机进行通信。微控制器可以通过设置RST引脚为高电平来使能DS1302&#xff0c;并通过SCK引脚提供串行时钟信号&#xff0c;然后通过I/O引脚进行数据的读写操作。…

C++对象访问有访问权限是不是在ide里有效

在C中&#xff0c;对象的访问权限&#xff08;即公有&#xff08;public&#xff09;、保护&#xff08;protected&#xff09;和私有&#xff08;private&#xff09;成员的访问&#xff09;是编译时的一部分&#xff0c;而不是运行时。这意味着&#xff0c;无论是在IDE&#…

CubeMX安装芯片包

1.点击HELP2.选择公理嵌入式软件包3.选择并下载芯片包

【面向对象】面向对象七大原则

设计模式 设计模式是什么&#xff1f; 设计模式是一种针对于反复提出问题的解决方案&#xff0c;是经过长时间经验和试错而总结出来的一套业务流程&#xff1b; 其目的是为了提高代码的可重用性和可维护性&#xff0c;让代码更容易让人理解&#xff0c;保证代码可靠性&#…

《计算机“十万个为什么”》之 面向对象 vs 面向过程:编程世界的积木与流水线

《计算机“十万个为什么”》之 面向对象 vs 面向过程&#xff1a;编程世界的积木与流水线 &#x1f916; 想象你要造一辆汽车&#x1f527;&#xff1a; 面向过程 按手册一步步拧螺丝&#xff1a;拧紧螺栓A → 安装轮胎B → 焊接车架C 面向对象 召唤汽车人战队&#xff1a;引…