使用Spark Shell探索RDD

启动并使用Scala Spark Shell

在终端窗口,启动Scala Spark shell:

spark-shell --master local

查看对象:

scala> sc
scala> spark

输入spark.[TAB]然后可以看到所有可用的方法。

读并显示文本文件

查看文本$DATA_EXERCISE/frostroad.txt

读取本地文件来创建RDD。Spark并没有读文件,直到你执行了action操作,比如统计数据集行数。

尝试执行collect操作来显示RDD的所有数据。

输入mydata.[TAB]可以看到所有可用的转换操作。

输入exit退出。

操作代码

[root@master ~]# spark-shell --master localscala> val fr_rdd = sc.textFile("file:/root/dataExercise/frostroad.txt")
fr_rdd: org.apache.spark.rdd.RDD[String] = file:/root/dataExercise/frostroad.txt MapPartitionsRDD[1] at textFile at <console>:24scala> fr_rdd.take(2)
res0: Array[String] = Array(Two roads diverged in a yellow wood,, And sorry I could not travel both)scala> fr_rdd.collect()scala> fr_rdd.collect().foreach(println)scala> fr_rdd.
++                    first                    max                  take              
aggregate             flatMap                  min                  takeAsync         
barrier               fold                     name                 takeOrdered       
cache                 foreach            

使用RDD来转换数据集

探索Web日志文件

日志文件/dw/weblogs示例:

22.252.151.164 - 23251 [15/Mar/2014:00:00:30 +0100] "GET /KBDOC-00150.html HTTP/1.0" 200 19203 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F11L"
22.252.151.164 - 23251 [15/Mar/2014:00:00:30 +0100] "GET /theme.css HTTP/1.0" 200 10684 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F11L"

从数据文件创建RDD。

创建只包含请求图片JPG文件的RDD。

使用take查看前10行数据。

在日志中返回每行的长度。

把每一行映射成一个数组,查看前5条。

定义新的RDD,日志文件的每一行只包含IP地址。

最后,保存IP地址列表到/dw/iplist

在终端窗口或Hue文件浏览器,列出/dw/iplist目录内容。你可以看到多个part-xxxxx文件。查看文件内容确认结果是正确的。

如果需要节约内存,可以停止cloudera服务

service cloudera-scm-server stop

service cloudera-scm-agent stop

scala> val logs = sc.textFile("/dw/weblogs")
logs: org.apache.spark.rdd.RDD[String] = /dw/weblogs MapPartitionsRDD[1] at textFile at <console>:24scala> logs.take(2)
res0: Array[String] = Array(34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /ifruit_3a_sales.html HTTP/1.0" 200 11416 "http://www.loudacre.com"  "Loudacre Mobile Browser Titanic 2300", 34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /theme.css HTTP/1.0" 200 14933 "http://www.loudacre.com"  "Loudacre Mobile Browser Titanic 2300")scala> val jpglogs = logs.filter(line => line.contains(".jpg"))scala> jpglogs.take(2)
res1: Array[String] = Array(34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /ifruit_3a.jpg HTTP/1.0" 200 12554 "http://www.loudacre.com"  "Loudacre Mobile Browser Titanic 2300", 242.13.139.123 - 66694 [01/Mar/2014:23:54:48 +0100] "GET /sorrento_f10l.jpg HTTP/1.0" 200 649 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F10L")scala> val logsLen = logs.map(line => line.length)
logsLen: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:25scala> logsLen.take(5)
res2: Array[Int] = Array(161, 150, 148, 154, 160)scala> val logs_split = logs.map(line => line.split(" "))
logs_split: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:25scala> logs_split.take(2)scala> val ip_list = logs_split.map(ar => ar(0))
ip_list: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:25scala> ip_list.take(2)
res5: Array[String] = Array(34.28.1.122, 34.28.1.122)scala> ip_list.saveAsTextFile("/dw/iplist")

使用Spark处理数据文件

检查数据

检查$DATA_EXERCISE/activations里的数据,每个XML文件包含了客户在指定月份活跃的设备数据。

拷贝数据到HDFS的/dw目录

样本数据示例:

<activations><activation timestamp="1225499258" type="phone"><account-number>316</account-number><device-id>d61b6971-33e1-42f0-bb15-aa2ae3cd8680</device-id><phone-number>5108307062</phone-number><model>iFruit 1</model></activation>…
</activations>

处理文件

读取XML文件并抽取账户号和设备型号,把结果保存到/dw/account-models,格式为account_number:model

输出示例:

1234:iFruit 1
987:Sorrento F00L
4566:iFruit 1
…

提供了解析XML的函数如下:

// Stub code to copy into Spark Shellimport scala.xml._// Given a string containing XML, parse the string, and 
// return an iterator of activation XML records (Nodes) contained in the stringdef getActivations(xmlstring: String): Iterator[Node] = {val nodes = XML.loadString(xmlstring) \\ "activation"nodes.toIterator
}// Given an activation record (XML Node), return the model name
def getModel(activation: Node): String = {(activation \ "model").text
}// Given an activation record (XML Node), return the account number
def getAccount(activation: Node): String = {(activation \ "account-number").text
}

操作命令

上传文件 hdfs dfs -put $DATA_EXERCISE/activations /dw

scala> val xmls = sc.wholeTextFiles("/dw/activations")
xmls: org.apache.spark.rdd.RDD[(String, String)] = /dw/activations MapPartitionsRDD[12] at wholeTextFiles at <console>:27scala> xmls.take(1)
res9: Array[(String, String)] =
Array((hdfs://master:8020/dw/activations/2013-01.xml,<activations><activation timestamp="1359697709" type="phone"><account-number>97349</account-number><device-id>e17ff6a8-0899-4a87-972b-30230ebfa6b9</device-id><phone-number>4247767545</phone-number><model>iFruit 4</model></activation><activation timestamp="1359697637" type="phone"><account-number>97068</account-number><device-id>49beb012-d410-40c8-84b6-a5753b68c607</device-id><phone-number>5592763034</phone-number><model>Ronin S1</model></activation><activation timestamp="1359696681" type="phone"><account-number>82601</account-number><device-id>ed58b95d-a7f3-4333-be06-d53890ef1a08</device-id><phone-number>503470...
scala> scala> val xmls_flat = xmls.flatMap(pair => getActivations(pair._2))
xmls_flat: org.apache.spark.rdd.RDD[scala.xml.Node] = MapPartitionsRDD[13] at flatMap at <console>:30scala> xmls_flat.take(1)
res10: Array[scala.xml.Node] =                                                  
Array(<activation type="phone" timestamp="1359697709"><account-number>97349</account-number><device-id>e17ff6a8-0899-4a87-972b-30230ebfa6b9</device-id><phone-number>4247767545</phone-number><model>iFruit 4</model></activation>)scala> val acc_model = xmls_flat.map(act => getAccount(act)+":"+getModel(act))
acc_model: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at map at <console>:32scala> acc_model.take(1)
res11: Array[String] = Array(97349:iFruit 4)scala> acc_model.saveAsTextFile("/dw/account-models")

使用Pair RDD来连接2个数据集

探索Web日志文件

使用map-reduce,统计每个用户的请求。

  • 使用map创建Pair RDD,User ID作为key,整数1作为value(user ID是每行的第三个字段)

  • 汇总每个用户的value

使用countByKey来确定对不同的频率有多少用户访问了网站。即有多少用户访问了1次、两次或者三次等等

  • 使用map来倒转key和value,类似于:
(userID, count) => (count, userID)
  • 使用countByKey来返回(频率:用户数)键值对的Map

创建一个RDD,用户id为key,用户访问的ip地址作为value。 - 提示:Map为(userid,ipaddress)并使用groupByKey。

scala> val logs = sc.textFile("/dw/weblogs")
logs: org.apache.spark.rdd.RDD[String] = /dw/weblogs MapPartitionsRDD[17] at textFile at <console>:27scala> val user_reqs = logs.map(line => line.split(" ")).map(ar => (ar(2),1)).reduceByKey((v1,v2) => v1+v2)
user_reqs: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:28scala> user_reqs.take(2)
res15: Array[(String, Int)] = Array((92694,4), (49368,4))                       scala> user_reqs.map(pair => (pair._2, pair._1)).countByKey()
res18: scala.collection.Map[Int,Long] = Map(138 -> 6, 170 -> 1, 5 -> 19, 120 -> 6, 10 -> 228, 142 -> 9, 14 -> 20, 110 -> 2, 152 -> 6, 164 -> 1, 106 -> 3, 132 -> 10, 116 -> 10, 
…………)scala> val user_ips = logs.map(line => line.split(" ")).map(ar => (ar(2),ar(0))).groupByKey()
user_ips: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[32] at groupByKey at <console>:28scala> user_ips.take(2)
res20: Array[(String, Iterable[String])] = Array((92694,CompactBuffer(84.206.178.154, 84.206.178.154, 84.206.178.154, 84.206.178.154)), (49368,CompactBuffer(34.179.95.142, 34.179.95.142, 34.179.95.142, 34.179.95.142)))   scala> for (pair <- user_ips.take(2)){| println("userid:"+pair._1)| for (ip <- pair._2) println("-- ip:"+ip)| }
userid:92694
-- ip:84.206.178.154
-- ip:84.206.178.154
-- ip:84.206.178.154
-- ip:84.206.178.154
userid:49368
-- ip:34.179.95.142
-- ip:34.179.95.142
-- ip:34.179.95.142
-- ip:34.179.95.142
解析:
map(line => line.split(" ")) - 将每行日志按空格分割成数组
map(ar => (ar(2),1)) - 提取第三个字段(索引2)作为用户ID,创建键值对(userID, 1)格式假设为:[IP地址] [其他字段] [用户ID] [其他字段]...
reduceByKey((v1,v2) => v1+v2) - 对相同用户ID的值进行累加
结果得到(userID, 总请求次数)的Pair RDD示例输出:
Array((92694,4), (49368,4))3. 统计访问频率分布
scala
user_reqs.map(pair => (pair._2, pair._1)).countByKey()
详细步骤:
map(pair => (pair._2, pair._1)) - 反转键值对,从(userID, count)变为(count, userID)
countByKey() - 统计每个访问次数(count)有多少不同的用户
返回一个Map,键是访问次数,值是对应的用户数量示例输出:
Map(138 -> 6, 170 -> 1, 5 -> 19, 120 -> 6, ...)表示:
有6个用户访问了138次
有1个用户访问了170次
有19个用户访问了5次
有6个用户访问了120次5. 打印用户IP信息
scala
for (pair <- user_ips.take(2)){println("userid:"+pair._1)for (ip <- pair._2) println("-- ip:"+ip)
}
详细步骤:
user_ips.take(2) - 获取前两个用户的记录
对于每个用户:
打印用户ID
遍历该用户的所有IP地址并打印

连接账户数据和Web日志文件

上传账户数据$DATA_EXERCISE/accounts到HDFS的/dw/accounts目录。 第一个字段是用户ID。其他字段包含了账户明细,比如创建日期,姓名等。

// 加载账户数据
val accounts = sc.textFile("/dw/accounts/accounts")// 创建Pair RDD (userid -> 账户信息数组)
val accountsRDD = accounts.map(line => {val fields = line.split(",")(fields(0), fields) // (userid, Array[账户信息])
})// 查看前2条账户数据
accountsRDD.take(2).foreach { case (userid, fields) =>println(s"UserID: $userid, Name: ${fields(3)} ${fields(4)}")
}

用map,将accounts里的line作为input,经过计算得到(fields(0), fields)的输出,其中fields是将每行的line用逗号分割成数组

输入:
accounts 是一个 RDD[String],每行代表一个账户记录
示例输入行:"32438,2012-08-22 20:40:31.0,\N,Violet,Searcy,2601 Twin Oaks Drive..."处理逻辑:
line.split(","):将每行字符串按逗号,分割成一个字符串数组fields
示例结果:Array("32438", "2012-08-22 20:40:31.0", "\N", "Violet", "Searcy", ...)
fields(0):取数组的第一个元素(用户ID)
(fields(0), fields):创建一个键值对(Tuple2)输出:
返回一个新的 Pair RDD,类型为 RDD[(String, Array[String])]
示例输出元素:("32438", Array("32438", "2012-08-22...))
数据转换示例:
输入行:
text
"32438,2012-08-22 20:40:31.0,\N,Violet,Searcy,2601 Twin Oaks Drive..."
经过转换后变成:
scala
("32438",  // 用户ID作为keyArray(    // 所有字段作为value"32438","2012-08-22 20:40:31.0","\\N","Violet","Searcy","2601 Twin Oaks Drive",...)
)

连接weblog数据和账户数据,得到以user ID为key的数据集。包含了用户账户信息和网页数量。

基于账户数据创建RDD,由key/value-array对(userid, [values…])组成。

// 这是第一步已经创建的RDD
val user_reqs = logs.map(line => line.split(" ")).map(ar => (ar(2), 1)).reduceByKey(_ + _)
第一步:logs.map(line => line.split(" "))
作用:将原始日志的每一行按空格分割成数组输入:line(原始日志行,如 "34.28.1.122 - 65255 [01/Mar/2014:23:57:51 ...")处理:split(" ") 按空格分割输出:字符串数组 Array(34.28.1.122, -, 65255, [01/Mar/2014:23:57:51, ...)关键字段位置:
ar(0):IP地址(如 34.28.1.122)
ar(1):分隔符(-)
ar(2):用户ID(如 65255)← 这是我们需要的字段第二步:.map(ar => (ar(2), 1))
作用:生成 (用户ID, 1) 的键值对输入:上一步的数组 ar处理:取 ar(2)(用户ID)作为 key,固定值 1 作为 value输出:Pair RDD 格式 (String, Int),如 ("65255", 1)为什么是1:每个日志行代表一次访问,用 1 表示一次计数,方便后续聚合。第三步:.reduceByKey(_ + _)
作用:对相同用户ID的计数求和输入:上一步的 (用户ID, 1) 对处理:
reduceByKey 将相同 key(用户ID)的 value 合并
_ + _ 是简写,等价于 (v1, v2) => v1 + v2输出:最终统计结果 (用户ID, 总访问次数),如 ("65255", 42)数据流示例
假设原始日志有3行:text
"1.1.1.1 - 101 ..."  // 用户101访问1次
"2.2.2.2 - 102 ..."  // 用户102访问1次 
"3.3.3.3 - 101 ..."  // 用户101再访问1次
分割后:scala
Array("1.1.1.1", "-", "101", ...)  
Array("2.2.2.2", "-", "102", ...)
Array("3.3.3.3", "-", "101", ...)
生成计数对:scala
("101", 1)
("102", 1)
("101", 1)
聚合结果:scala
("101", 2)  // 用户101总计2次
("102", 1)  // 用户102总计1次

连接PairRDD和上一步计算的userid/hitcount键值对数据集

// 连接账户数据和点击量数据
val joinedData = accountsRDD.join(user_reqs)// 查看连接后的数据结构
joinedData.take(2).foreach { case (userid, (accountFields, count)) =>println(s"UserID: $userid, Count: $count, Name: ${accountFields(3)} ${accountFields(4)}")
}
第一步:accountsRDD.join(user_reqs)
作用:基于用户ID关联账户信息和访问次数
输入:
accountsRDD: (String, Array[String]) ← (用户ID, 账户字段数组)
user_reqs: (String, Int) ← (用户ID, 访问次数)处理:
通过 用户ID(key) 内连接(inner join)两个RDD
自动匹配两个RDD中相同的用户ID输出:
新RDD格式:(String, (Array[String], Int))
结构示例:("32438", (Array("32438", "2012-08-22..."), 4))
第二步:joinedData.take(2).foreach
作用:查看前2条连接结果并格式化输出数据解构:
scala
case (userid, (accountFields, count)) =>// userid: 用户ID(String)// accountFields: 账户字段数组(Array[String])// count: 访问次数(Int)
字段索引(根据账户数据格式):
accountFields(3): 用户的名(如 "Violet")
accountFields(4): 用户的姓(如 "Searcy")输出示例:
text
UserID: 32438, Count: 4, Name: Violet Searcy
UserID: 32439, Count: 25, Name: Eunice Myers数据流详解
假设有以下数据:
账户数据(accountsRDD):
scala
("32438", Array("32438", "2012-08-22", "\\N", "Violet", "Searcy", ...))
("32439", Array("32439", "2012-12-15", "...", "Eunice", "Myers", ...))访问次数(user_reqs):
scala
("32438", 4)
("32439", 25)连接后结果(joinedData):
scala
("32438", (Array("32438", "2012-08-22", ..., "Violet", "Searcy"), 4)
)
("32439",(Array("32439", "2012-12-15", ..., "Eunice", "Myers"), 25)
)
完整数据映射图
accountsRDD (用户ID -> 账户详情)       user_reqs (用户ID -> 访问次数)
+-----------------------------+       +---------------------+
| "32438" -> [...,Violet,...] |       | "32438" -> 4        |
| "32439" -> [...,Eunice,...] | JOIN  | "32439" -> 25       |
+-----------------------------+       +---------------------+|v
joinedData (用户ID -> (账户详情, 访问次数))
+--------------------------------------------------+
| "32438" -> ([...,Violet,...], 4) -> "Violet 4"   |
| "32439" -> ([...,Eunice,...], 25) -> "Eunice 25" |
+--------------------------------------------------+

显示用户ID,点击量和姓名,比如:

userid1 4 Jack Cheng
userid2 25 John Doe
// 格式化输出:userid, 点击量, 姓名
val formattedResults = joinedData.map { case (userid, (accountFields, count)) => s"$userid $count ${accountFields(3)} ${accountFields(4)}"
}// 显示前10条结果
formattedResults.take(10).foreach(println)
1. 输入数据结构
joinedData 的格式为:
RDD[(String, (Array[String], Int))]
即每个元素是:
(用户ID, (账户字段数组, 访问次数))示例数据:
scala
("32438", (Array("32438", "2012-08-22", "\\N", "Violet", "Searcy", ...), 4)
)
2. map 操作
作用:对 joinedData 的每个元素进行转换模式匹配:
case (userid, (accountFields, count)) 解构嵌套元组:
userid:用户ID(如 "32438")
accountFields:账户字段数组
count:访问次数(如 4)3. 字符串模板
s"$userid $count ${accountFields(3)} ${accountFields(4)}"字段索引:
accountFields(3):名字(如 "Violet")
accountFields(4):姓氏(如 "Searcy")输出格式:
用户ID 访问次数 名 姓
示例:"32438 4 Violet Searcy"
数据转换流程
原始数据 → 提取字段 → 格式化字符串("32438", (Array["32438",...,"Violet","Searcy",...], 4)) 
→ 提取 "32438"、4、"Violet"、"Searcy"  
→ 拼接成 "32438 4 Violet Searcy"

编写和运行Spark应用

编写一个简单的程序来统计web日志文件中JPG请求的数量。文件名将作为参数传递到程序中。

使用Python编写Spark应用

创建CountJPGs.py文件,实现统计JPG请求的功能。

import sys
from pyspark import SparkContextif __name__ == "__main__":if len(sys.argv) < 2:print >> sys.stderr, "Usage: CountJPGs.py <logfile>"exit(-1)# TODO: 从参数读取日志文件,实现统计JPG请求的功能
import sys
from pyspark import SparkContextif __name__ == "__main__":if len(sys.argv) < 2:print >> sys.stderr, "Usage: CountJPGs.py <logfile>"exit(-1)sc = SparkContext()logfile = sys.argv[1]sc.setLogLevel("WARN")count = sc.textFile(logfile).filter(lambda line: '.jpg' in line).count()print "Number of JPG requests: ", countsc.stop()

运行程序

在终端中运行Python程序:

# 使用spark-submit运行
spark-submit --master local[*] CountJPGs.py /dw/weblogs

运行并查看结果:

测试成功后,程序将输出统计的JPG请求数量。

提交Spark应用到集群

默认,spark-submit在本地运行应用。在这个部分,运行应用到YARN集群上。

重新运行程序,指定—master yarn参数

从运行日志中找到应用ID,并运行使用当前ID来查看结果

spark-submit --master yarn --deploy-mode cluster CountJPGs1.py /dw/weblogs

配置Spark应用

使用之前的实验中使用的CountJPGs.py程序

在命令行设置配置选项

重新运行CountJPGs.py程序,指定应用名'Count JPGs'

访问RM UI并注意命令行指定的应用名

在属性文件中设置配置选项

使用文本编辑器,创建$CODE_EXERCISE/myspark.conf文件,并添加如下配置:

spark.app.name "My Spark App1"
spark.master yarn
spark.executor.memory 600M

使用属性文件myspark.conf重新运行应用

当应用正在运行,查看YARN UI并确认Spark应用名正确的显示为"My Spark App1"

设置日志级别

修改/etc/spark/conf/log4j.properties

编辑log4j.properties。第一行替换为DEBUG:

log4j.rootCategory=DEBUG, console

重新运行Spark应用。

spark-submit --master local[*] CountJPGs.py /dw/weblogs

注意到输出包含INFO和DEBUG消息,比如

编辑log4j.properties文件,替换DEBUG为WARN并重新运行。注意只有WARN消息出来。

在Spark应用UI中查看Jobs和Stages

探索基于文件RDD的分区

启动Spark Shell,为了模拟实际中的多节点集群,使用2个线程运行在本地模式

使用Hue或命令行重新查看账户数据集(/dw/accounts/)。注意文件数量

在数据集中基于单个文件创建RDD,比如,/dw/accounts/part-m-00000。然后调用toDebugString。在结果RDD中有多少分区?

重复这个流程,但指定最小3个分区:sc.textFile(filename,3)。RDD是否有3个分区?

最后,基于账户数据集的所有文件创建RDD。比较一下文件数和RDD的分区数?

设置作业

创建accounts RDD,key是账户id,value是姓名

创建userreqs RDD,统计每个用户页面点击总数

通过user ID进行连接,并基于名字、姓和点击总量重构新的RDD

打印出accounthits.toDebugString的结果并查看输出,基于这个信息,看是否能确定:

  1. 作业中有多少stage?
  2. Stage之间的依赖关系?
  3. 每个stage包含多少tasks?

运行作业

通过浏览器查看Spark应用UI,http://master:4040

在Spark应用UI中检查Job

在Spark UI中,确保选择了Jobs标签。

重新运行shell并通过执行action(saveAsTextFile)来启动作业

重新加载Spark UI Jobs页面。

点击job description来查看stages。

点击stages来查看stage详情。

当作业完成后,返回Jobs标签查看执行的任务的最终统计和作业花费的时间。

持久化RDD

在上一个实验的基础上来完成后面的步骤

统计用户点击量大于5的账户数量

调用accounthits.persist()来缓存RDD

在浏览器中,查看Spark应用UI并选择Storage标签。现在你已经标记RDD被持久化,但是还没有执行action操作使得它持久化。

在Spark Shell,执行count操作

查看RDD的toDebugString。注意输出包含了选择的持久化级别。

重新加载Storage标签,注意持久化的RDD显示出来。点击RDD ID查看分区和持久化的明细。

点击executors标签并使用的内存量和可用的工作节点。

scala> val accounthits = joined.filter(pair => pair._2._1 > 5)scala> accounthits.persist()
res0: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> accounthits.count()
res1: Long = 3721scala> accounthits.toDebugString
res2: String =
(15) MapPartitionsRDD[15] at filter at <console>:25 [Memory Deserialized 1x Replicated]|        CachedPartitions: 15; MemorySize: 643.5 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B|   MapPartitionsRDD[13] at join at <console>:27 [Memory Deserialized 1x Replicated]|   MapPartitionsRDD[12] at join at <console>:27 [Memory Deserialized 1x Replicated]|   CoGroupedRDD[11] at join at <console>:27 [Memory Deserialized 1x Replicated]|   ShuffledRDD[4] at reduceByKey at <console>:25 [Memory Deserialized 1x Replicated]+-(15) MapPartitionsRDD[3] at map at <console>:25 [Memory Deserialized 1x Replicated]|   MapPartitionsRDD[2] at map at <console>:25 [Memory Deserialized 1x Replicated]|   /dw/weblogs MapPartitionsRDD[1] at textFile at <console>:24 [Memory De...
--清理缓存
scala> accounthits.unpersist()
res3: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevelscala> accounthits.persist(StorageLevel.DISK_ONLY)
res4: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> accounthits.count()

Spark SQL处理Hive表数据

数据探索任务

读取 Hive 表 db_userapp.user_base_info,并查看表结构和部分数据。

检查新的DataFrame的schema

创建新的DataFrame,选择 user_id 和 user_name 列

将DataFrame转换为Pair RDD,字段为 user_id 和 user_name 列

scala> val df_user = spark.read.table("db_userapp.user_base_info")
scala> df_user.printSchema
root|-- user_id: long (nullable = true)|-- user_name: string (nullable = true)|-- age: integer (nullable = true)|-- gender_id: integer (nullable = true)|-- edu_id: integer (nullable = true)|-- marital_id: integer (nullable = true)|-- property_cert_count: integer (nullable = true)|-- car_count: integer (nullable = true)scala> val df_user2 = df_user.select($"user_id",$"user_name")scala> df_user.select($"age"+10).show(4)
+----------+
|(age + 10)|
+----------+
|        95|
|        36|
|        28|
|        80|
+----------+
only showing top 4 rowsscala> df_user.sort($"age".desc).show(4)
+-------+---------+---+---------+------+----------+-------------------+---------+
|user_id|user_name|age|gender_id|edu_id|marital_id|property_cert_count|car_count|
+-------+---------+---+---------+------+----------+-------------------+---------+
|     27|   宋凌寒| 85|        1|     5|         4|                  0|        0|
|      1|   周雅芙| 85|        1|     4|         2|                  0|        0|
|     41|   黄冰萍| 85|        1|     6|         3|                  4|        0|
|     22|   韩映雪| 85|        2|     3|         3|                  0|        0|
+-------+---------+---+---------+------+----------+-------------------+---------+
only showing top 4 rowsscala> df_user.registerTempTable("tbl_user")
warning: there was one deprecation warning; re-run with -deprecation for detailsscala> spark.sql("select user_name,age from tbl_user limit 3").show()
+---------+---+
|user_name|age|
+---------+---+
|   周雅芙| 85|
|   王从梦| 26|
|   孙忆翠| 18|
+---------+---+scala> val rdd_user = df_user.rdd.map(row => (row(0).toString,row(1).toString))

关于代码 val rdd_user = df_user.rdd.map(row => (row(0).toString,row(1).toString)的解析
1. 原始RDD数据结构
scala
Array([1,周雅芙,85,1,4,2,0,0],  // 第一行[2,王从梦,26,1,3,1,0,0]   // 第二行
)
每行是一个org.apache.spark.sql.Row对象字段顺序对应DataFrame的Schema:
scala
root|-- user_id: long (index 0)|-- user_name: string (index 1)|-- age: integer (index 2)|-- gender_id: integer (index 3)|-- edu_id: integer (index 4)|-- marital_id: integer (index 5)|-- property_cert_count: integer (index 6)|-- car_count: integer (index 7)2. 转换操作详解
原始代码:
scala
val rdd_user = df_user.rdd.map(row => (row(0).toString, row(1).toString)
)
对第一行 [1,周雅芙,85,...] 的处理:
row(0) → 取第0个字段:1 (Long型)
.toString → 转为字符串:"1"
row(1) → 取第1个字段:"周雅芙" (已是String)
最终生成键值对:("1", "周雅芙")对第二行 [2,王从梦,26,...] 的处理:
同理生成:("2", "王从梦")3. 转换后的RDD内容
执行rdd_user.take(2)会得到:
Array(("1", "周雅芙"), ("2", "王从梦")
)

数据变换任务

新增年龄分组字段:根据age字段,将用户分为“未成年”(<18)、“青年”(18-29)、“中年”(30-49)、“老年”(≥50)四类,新增age_group字段。

拼接姓名与年龄:将user_nameage字段拼接为新字段name_age,格式如“张三-25”。

筛选有房且有车的用户:筛选property_cert_count>0且car_count>0的用户,输出其user_iduser_nameproperty_cert_countcar_count

val df_user3 = df_user.withColumn(
"age_group",
when(col("age") <18, "未成年")
.when(col("age") <30, "青年")
.when(col("age") <50, "中年")
.otherwise("老年")
)
年龄分组代码的解析:
2.1 withColumn 方法
功能:向DataFrame添加新列或替换现有列参数:
第一个参数:新列名(此处为"age_group")
第二个参数:列表达式(此处为when-otherwise条件链)2.2 when 条件表达式
工作方式:类似SQL的CASE WHEN语句
链式调用:多个when可以串联,最后以otherwise结束
执行顺序:从上到下依次判断,第一个满足的条件即返回对应值2.3 col("age") 列引用
指向DataFrame中的age列
所有比较操作都基于该列值3. 条件逻辑分解
条件判断	分组标签	对应年龄段
age < 18	未成年	小于18岁
age between 18 and 29	青年	18-29岁(含)
age between 30 and 49	中年	30-49岁(含)
以上都不满足(otherwise)	老年	50岁及以上4. 执行过程示例
以原始数据中的两行为例:
[1,周雅芙,85,...]  // age=85
[2,王从梦,26,...]  // age=26
第一行处理:
85 < 18? → 否
85 between 18-29? → 否
85 between 30-49? → 否
执行otherwise → "老年"第二行处理:
26 < 18? → 否
26 between 18-29? → 是 → "青年"

统计分析任务

统计不同学历(edu_id)用户的数量。

统计不同性别(gender_id)用户的平均年龄。

统计拥有房产证数量大于0的用户比例。

将处理结果保存为 Parquet 和 JSON 格式到 HDFS 指定目录。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/88036.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/88036.shtml
英文地址,请注明出处:http://en.pswp.cn/web/88036.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【R语言】Can‘t subset elements that don‘t exist.

Error in select(): ℹ In argument: all_of(label_col). Caused by error in all_of(): ! Cant subset elements that dont exist. ✖ Element Label doesnt exist. Run rlang::last_trace() to see where the error occurred.原文中文解释涉及关键词Error in select()报错发生…

Spring的依赖注入(xml)

引入 首先先明白&#xff0c;依赖注入描述的是在容器中建立bean与bean之间的依赖关系&#xff0c;本质就是将一个类中和别的类解耦的方式&#xff0c;就是把别的类&#xff0c;写在成员变量位置&#xff0c;再对外提供可以给成员变量赋值的方法&#xff0c;外界就直接调用来给…

docker运行的一些常用命令

docker images 显示可以加载的镜像docker ps 显示运行的docker容器 加-a显示所有的容器docker run --name 容器名字 -d 镜像名字docker start 容器名/ID 开启容器docker stop 容器名/ID 关闭容器docker exec -it dock…

Django跨域

步骤 1&#xff1a;安装 django-cors-headerspip install django-cors-headers步骤 2&#xff1a;修改 Django 配置 在 settings.py 中添加&#xff1a;INSTALLED_APPS [...,"corsheaders", # 新增 ]MIDDLEWARE [...,"corsheaders.middleware.CorsMiddleware…

20250706-10-Docker快速入门(下)-Harbor镜像仓库_笔记

一、Harbor镜像仓库搭建与使用1. Harbor概述&#xfeff;&#xfeff;定义: 由VMWare公司开源的容器镜像仓库系统技术基础: 在Docker Registry基础上进行企业级扩展核心特性:提供管理用户界面(GUI)基于角色的访问控制(RBAC)支持&#xfeff;AD/LDAP\mathrm{AD}/\mathrm{LDAP}AD…

JavaScript之数组方法详解

JavaScript之数组方法详解一、数组的创建与基础特性1.1 数组的创建方式1.2 数组的核心特性二、修改原数组的方法2.1 添加/删除元素2.1.1 push()&#xff1a;尾部添加元素2.1.2 pop()&#xff1a;尾部删除元素2.1.3 unshift()&#xff1a;头部添加元素2.1.4 shift()&#xff1a;…

品牌增长困局突围:大模型时代,AI 如何帮我的品牌少走弯路?

AI时代对企业战略的冲击与机遇 在当今瞬息万变的商业环境中&#xff0c;大模型的崛起正以前所未有的力量重塑着各行各业的竞争格局。传统的市场营销、品牌传播模式正在被颠覆&#xff0c;消费者获取信息、认知品牌的方式发生了根本性变化。如果说过去十年是“互联网”的时代&am…

从单体到微服务:Spring Cloud 开篇与微服务设计

一、单体架构的核心痛点与微服务化目标 1. 单体架构的致命缺陷问题表现后果可维护性差百万行代码耦合&#xff0c;修改一处需全量测试迭代周期长&#xff0c;创新停滞扩展性受限无法按模块独立扩缩容&#xff08;如订单模块需扩容时&#xff0c;用户模块被迫一起扩容&#xff0…

篇二 OSI七层模型,TCP/IP四层模型,路由器与交换机原理

一 前言 本章节主要介绍OSI七层模型&#xff0c;TCP/IP四层模型划分&#xff0c;以及日常使用的路由器&#xff0c;交换机的一些基础知识 二 OSI 七层 OSI&#xff08;Open Systems Interconnection Model&#xff09;即开放式系统互联模型&#xff0c;是国际标准化组织提出的&…

【JavaSE面试篇】Java集合部分高频八股汇总

目录 概念 1. 说说Java中的集合&#xff1f; 2. Java中的线程安全的集合有什么&#xff1f; 3. Collections和Collection的区别&#xff1f; 4. 集合遍历的方法有哪些&#xff1f; List 5. 讲一下java里面list的几种实现&#xff0c;几种实现有什么不同&#xff1f; 6.…

利用低空无人机影像进行树种实例分割

在本项先导研究中,我们开发了一个基于低空无人机影像的本地树种机器学习实例分割模型,用于生态调查。该实例分割包括单株树冠的描绘和树种的分类。我们利用无人机影像对20个树种及其对应的学名进行了训练,并收集了这些树种的学名用于机器学习。为了评估该机器学习模型的准确…

二、Flutter基础

目录1. 什么是Widget&#xff1f;Flutter中的Widget分为哪几类&#xff1f;2. StatelessWidget和StatefulWidget的区别3. StatefulWidget生命周期4. 什么是BuildContext&#xff1f;5. 如何优化Widget重建&#xff1f;6. Flutter布局机制7. Row/Column的主轴和交叉轴8. Expande…

设计模式笔记_创建型_建造者模式

1. 建造者模式介绍 建造者模式是一种创建型设计模式&#xff0c;旨在通过将复杂对象的构建过程与其表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。它通常用于构造步骤固定但具体实现可能变化的对象。 1.1 功能&#xff1a; 封装复杂对象的创建过程&#xff1a;适…

【ROS2 自动驾驶学习】03-ROS2常用命令

目录 1. ros2 pkg list 2. ros2 node list 3. ros2 node info 节点名称 4. ros2 topic list 5. ros2 topic info 话题名 6. ros2 topic type 话题名 7. ros2 topic find 消息类型 8. ros2 service list 9. ros2 service type 服务名称 10. ros2 service find 服…

MyBatis-Plus:提升数据库操作效率的利器

在Java开发中&#xff0c;MyBatis是一个非常流行的持久层框架&#xff0c;它简化了数据库操作&#xff0c;提供了灵活的SQL映射功能。然而&#xff0c;随着项目规模的扩大和业务复杂度的增加&#xff0c;开发者需要更高效、更便捷的方式来处理数据库操作。MyBatis-Plus应运而生…

App爬虫实战篇-以华为真机手机爬取集换社的app为例

前言 在开始学习这篇文章之前,建议你先按照之前2篇文章(App爬虫工具篇-Appium安装和App爬虫工具篇-appium配置),配置必要的环境,才可以继续完成本章节内容。 电脑连接手机 可以通过usb连接电脑。如果通过adb devices命令,发现没有连接上,就需要手动配置一些信息 华为…

Vue3组合式API应用:状态共享与逻辑复用最佳实践

Vue3组合式API应用&#xff1a;状态共享与逻辑复用最佳实践 在Vue3中&#xff0c;组合式API的引入为我们提供了一种全新的方式来编写Vue组件&#xff0c;并有效地解决了混入和繁琐逻辑复用的问题。本文将为您介绍如何在Vue3中使用组合式API来实现状态共享与逻辑复用的最佳实践&…

在linux 上使用tcpdump监听http 端口的报文并分析

这里写目录标题 1. 使用 tcpdump(原始报文捕获)观察:报文翻译与分析(按行解释)第一段:客户端请求报文HTTP 请求头JSON 请求体第二段:服务器响应报文HTTP 响应头响应体关键问题分析在 Linux 上监听 HTTP 端口的报文,有多种工具可以实现。以下是几种常用方法的详细说明:…

XSStrike 进行 XSS 漏洞测试

XSStrike 是一个功能强大的 XSS 漏洞测试工具&#xff0c;专为检测、验证和利用反射型、存储型、DOM型 XSS 漏洞而设计&#xff0c;适合配合手工测试&#xff0c;也可用于自动化发现。 &#x1f6e0;️ 1. 安装 XSStrike 确保系统中有 Python3 和 git&#xff1a; git clone ht…

any实现(基于LLVM中libcxx实现分析)

本文根据LLVM中libcxx的实现&#xff0c;分析了std::any和std::variant的具体实现。 1 简介 在 C17 标准中&#xff0c;std::any提供了一种类型安全的方式来存储任意类型的值。它使用类型擦除&#xff08;type erasure&#xff09;技术实现&#xff0c;使得一个对象可以包含任…