一、获取文件或目录

1. 获取某个目录下的文件


// 必须的依赖
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}// 获取某个目录下的文件路径
def list_file(conf: Configuration, dir_path: String, is_recursive: Boolean = false): Array[String] = {// 获取文件系统val fs = FileSystem.get(new java.net.URI(dir_path), conf) // 注意这里用 URI 让 Hadoop 根据 scheme 找对应 FS// 递归获取该目录下所有文件val it: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(dir_path), is_recursive)// 获取文件路径val buffer = scala.collection.mutable.ArrayBuffer[String]()while (it.hasNext) {val fileStatus = it.next()buffer += fileStatus.getPath.toString}// 关闭文件系统fs.close()// 返回结果buffer.toArray
}// 设定配置文件
val conf = new Configuration()
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem") // 读取oss的路径// 需要指定的路径
val path = "oss://aa/bb/"
val file_paths = list_file(conf, path).filter(x => x.contains("parquet"))
file_paths.foreach(println)

2.  获取某个目录下的子目录

import org.apache.hadoop.fs.{FileStatus, FileSystem, FileUtil, Path}/**
* 获取某个目录下所有子目录的路径, 以字符串数组的形式返回
*/
def getOnlineFirstDir: Array[String] = {// 获取路径val path = s"s3://aa/bb/"val filePath = new org.apache.hadoop.fs.Path( path )// 获取文件系统val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 获取所有子目录的路径val allFiles = FileUtil.stat2Paths( fileSystem.listStatus( filePath ) )val res = allFiles.filter( fileSystem.getFileStatus( _ ).isDirectory() ).map( _.toString)// 返回结果res
}

二、删除文件或目录

/*** 删除目录*/
def deletePath(spark: SparkSession, path: String): Unit = {// 1 获取文件系统val file_path = new org.apache.hadoop.fs.Path( path )val file_system = file_path.getFileSystem( spark.sparkContext.hadoopConfiguration )// 2 判断路径存在时, 则删除if (file_system.exists( file_path )) {file_system.delete( file_path, true )}
}

三、获取文件或目录大小

/*** 获取某个目录的大小(单位b字节),注意:只能在driver端使用,可以多线程来提速。*/
def get_path_size(spark: SparkSession, path: String): Long = {//取文件系统val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 获取该目录的大小,单位是字节if (fileSystem.exists( filePath )) {fileSystem.getContentSummary( filePath ).getLength} else {0}
}

四、判读文件或目录是否存在

方式一
/*** 判断目录是否存在,注意:只能在driver端使用,可以多线程来提速。问题: 对删除过的目录可能会误判*/
def pathIsExist(spark: SparkSession, path: String): Boolean = {//取文件系统val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 判断路径是否存在fileSystem.exists( filePath )
}方式二
/*** 通过目录是否大于0来判断目录是否存在(消除对删除过的目录的误判),注意:只能在driver端使用,可以多线程来提速。*/
def def pathIsExist(spark: SparkSession, path: String): Boolean = //取文件系统val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 获取该目录的大小,单位是字节val size = if (fileSystem.exists( filePath )) {fileSystem.getContentSummary( filePath ).getLength} else {0}// 返回结果size > 0}

五、parquet文的行组信息


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.parquet.column.statistics.Statistics
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.{BlockMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.api.Binaryimport java.{lang, util}// 获取某个目录下的文件路径
def list_file(conf: Configuration, dir_path: String, is_recursive: Boolean = false): Array[String] = {// 获取文件系统val fs = FileSystem.get(new java.net.URI(dir_path), conf) // 注意这里用 URI 让 Hadoop 根据 scheme 找对应 FS// 递归获取该目录下所有文件val it: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(dir_path), is_recursive)// 获取文件路径val buffer = scala.collection.mutable.ArrayBuffer[String]()while (it.hasNext) {val fileStatus = it.next()buffer += fileStatus.getPath.toString}// 关闭文件系统fs.close()// 返回结果buffer.toArray
}// 某个文件某列的行组信息
def print_row_groupp(conf: Configuration, file_name: String, col_name: String): Unit = {// 读取元数据val parquetFilePath = new Path(file_name)val inputFile: HadoopInputFile = HadoopInputFile.fromPath(parquetFilePath, conf)val footer: ParquetMetadata = ParquetFileReader.open(inputFile).getFooter// 遍历每个行组,并手动添加索引val blocks: util.List[BlockMetaData] = footer.getBlocksfor (i <- 0 until blocks.size()) {val block = blocks.get(i)println(s"Row Group #${i}:")println(s"  - Total Rows: ${block.getRowCount}")println(s"  - Total Size: ${block.getTotalByteSize} bytes")// 遍历每个列块block.getColumns.forEach { columnChunkMetaData =>val columnPath = columnChunkMetaData.getPath.toDotString// 过滤目标列if (columnPath == col_name) {val statistics: Statistics[_] = columnChunkMetaData.getStatisticsprintln(s"  Column: $columnPath")if (statistics != null) {// 获取最小值和最大值并解码val minValue = statistics.genericGetMin match {case b: Binary => b.toStringUsingUTF8case l: lang.Long => l.toStringcase i: Integer => i.toStringcase other => other.toString}val maxValue = statistics.genericGetMax match {case b: Binary => b.toStringUsingUTF8case l: lang.Long => l.toStringcase i: Integer => i.toStringcase other => other.toString}println(s"    - Min Value: $minValue")println(s"    - Max Value: $maxValue")println(s"    - Null Count: ${statistics.getNumNulls}")} else {println("    - No statistics available for this column.")}println("    ------")}}println("======================")}}// 某个文件的行组数
def get_row_group_size(conf: Configuration, file_name: String): Int = {// 读取元数据val parquetFilePath = new Path(file_name)val inputFile: HadoopInputFile = HadoopInputFile.fromPath(parquetFilePath, conf)val footer: ParquetMetadata = ParquetFileReader.open(inputFile).getFooter// 行组数footer.getBlocks.size()
}// 设定配置文件
val conf = new Configuration()
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")// 需要指定的路径
val path = "oss://aa/bb/"
val file_paths = list_file(conf, path).filter(x => x.contains("parquet"))
file_paths.foreach(println)// 获取第一个文件的行组信息
val first_file = file_paths(0)
print_row_groupp(conf, first_file, "odid")// 统计行组数
for (file_path <- file_paths) {val file_index = file_path.split("part-")(1).split("-")(0)println(file_index + " = " + get_row_group_size(conf, file_path))
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/96398.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/96398.shtml
英文地址,请注明出处:http://en.pswp.cn/web/96398.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《UE5_C++多人TPS完整教程》学习笔记52 ——《P53 FABRIK 算法(FABRIK IK)》

本文为B站系列教学视频 《UE5_C多人TPS完整教程》 —— 《P53 FABRIK 算法&#xff08;FABRIK IK&#xff09; 的学习笔记&#xff0c;该系列教学视频为计算机工程师、程序员、游戏开发者、作家&#xff08;Engineer, Programmer, Game Developer, Author&#xff09; Stephen …

HttpServletRequest vs ServletContext 全面解析

HttpServletRequest vs ServletContext 全面解析 一、 核心区别概览特性HttpServletRequest (请求对象)ServletContext (Servlet上下文/应用对象)作用域请求范围应用范围生命周期从客户端发出请求开始&#xff0c;到服务器返回响应结束。从Web应用启动&#xff08;部署&#xf…

Java后端工程师如何学AI

Java后端工程师如何学AI 目录 前言为什么Java后端工程师要学习AIAI学习路径规划基础知识体系实践项目建议学习资源推荐学习时间规划常见问题与解决方案职业发展建议总结 前言 随着人工智能技术的快速发展&#xff0c;AI已经不再是计算机科学专业的专属领域。作为Java后端工…

Django REST Framework 中 @action 装饰器详解

概述 action 装饰器是 Django REST Framework (DRF) 中 ViewSet 的一个核心功能&#xff0c;用于定义自定义路由方法。它允许开发者在标准的 CRUD 操作&#xff08;list、create、retrieve、update、destroy&#xff09;之外&#xff0c;创建符合特定业务需求的接口&#xff0c…

【重磅更新】RetroBoard 全面升级,让敏捷回顾更高效、更安全、更贴心!

​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​…

中州养老:华为云设备管理接口开发全流程

需求分析点击同步数据时,要把华为云的数据拉取到我们的系统中对于新增设备操作,实际上这些参数与华为云产品我们添加设备时的参数是一样的表结构设计E-R图数据库字段接口分析对于设备中的数据,我们既要再IOT平台存储,又要在数据库中存储.之所以保存两份数据的原因:IOT平台中只是…

Llama-Factory微调Qwen2.5-VL从数据集制作到部署记录

Llama-Factory微调Qwen2.5-VL从数据集制作到部署记录 电脑环境配置&#xff1a; 1.ubuntu24 2.3090(24G) 3.Cuda12.9 一、数据集制作 我的数据集主要是对图像内容进行描述 1.Label-studio制作数据集 这是最原始的从零开始制作数据集的方法&#xff0c;不建议这样做&#xff01;…

【蓝桥杯真题67】C++数位和为偶数的数 第十五届蓝桥杯青少年创意编程大赛 算法思维 C++编程选拔赛真题解

C++数位和为偶数的数 第十五届蓝桥杯青少年创意编程大赛C++选拔赛真题 博主推荐 所有考级比赛学习相关资料合集【推荐收藏】 1、C++专栏 电子学会C++一级历年真题解析 电子学会C++二级历年真题解析

【计算机网络 | 第11篇】宽带接入技术及其发展历程

文章目录宽带接入技术详解数字传输系统技术演进早期电话网的传输技术演变数字传输系统技术演进&#xff1a;从碎片到统一宽带接入技术 ADSLADSL的基本原理与非对称特性DMT调制技术&#xff1a;多子信道并行传输ADSL接入网组成电话分离器的设计原理与优势ADSL的升级&#xff1a;…

(论文速读)SCSegamba:用于结构裂纹分割的轻量级结构感知视觉曼巴

论文题目&#xff1a;SCSegamba: Lightweight Structure-Aware Vision Mamba for Crack Segmentation in Structures&#xff08;用于结构裂纹分割的轻量级结构感知视觉曼巴&#xff09;会议&#xff1a;CVPR2025摘要&#xff1a;不同场景下的结构裂缝像素级分割仍然是一个相当…

《苏超风云》亮相时尚大赏,成短剧行业发展新风向

当男频短剧凭借《一品布衣》五天横扫10亿播放的数据宣告逆袭&#xff0c;短剧市场格局正经历深刻洗牌。风口之下&#xff0c;头条视听、中皋文旅、国内时尚视觉与短视频创作领域的头部厂牌“大湾视频”携手下场&#xff0c;打造精品男频短剧《苏超风云》&#xff0c;剑指2025年…

HTML5新年元旦网站源码

新年主题网站开发概述 本项目基于HTML5、CSS3与JavaScript技术栈&#xff0c;打造了一个功能丰富、交互体验流畅的新年主题网站&#xff0c;涵盖文化展示、互动娱乐与社交分享三大核心模块&#xff0c;通过现代化前端技术实现沉浸式节日氛围营造。 1.1、核心功能架构 网站采…

CentOS 7 下iscsi存储服务配置验证

一、环境说明 centos7服务器*2服务器ip&#xff1a;服务端10.10.10.186 客户端10.10.10.184服务端存储卷sda1提前关闭防火墙&#xff0c;或开放默认 iSCSI 使用 3260 端口 二、服务端&#xff08;Target&#xff09;配置 安装 iSCSI target 服务 yum install -y targetcli syst…

立即数、栈、汇编与C函数的调用

一、立即数在 ARM 架构中&#xff0c;立即数是指在指令中直接编码的常量值&#xff0c;而不是通过寄存器或内存引用的值立即数的特点编码限制&#xff1a;ARM指令是固定长度的&#xff08;32位&#xff09;&#xff0c;因此立即数不能占用太多位数。典型的算术和逻辑指令通常只…

贪心算法与动态规划:数学原理、实现与优化

贪心算法与动态规划&#xff1a;数学原理、实现与优化 引言&#xff1a;算法选择的本质 在计算机科学领域&#xff0c;算法选择的本质是对问题特征的数学建模与求解策略的匹配。贪心算法与动态规划作为两种经典的优化算法&#xff0c;分别在不同问题域展现出独特优势。本文将从…

Leetcode 刷题记录 21 —— 技巧

Leetcode 刷题记录 21 —— 技巧 本系列为笔者的 Leetcode 刷题记录&#xff0c;顺序为 Hot 100 题官方顺序&#xff0c;根据标签命名&#xff0c;记录笔者总结的做题思路&#xff0c;附部分代码解释和疑问解答&#xff0c;01~07为C语言&#xff0c;08及以后为Java语言&#xf…

Android Studio Meerkat | 2024.3.1 Gradle Tasks不展示

把这两个开关打开&#xff0c;然后刷新gradle文件

Java中方法重写与重载的区别

目录 1. 方法重载 (Overload) 什么是方法重载&#xff1f; 重载的特点&#xff1a; 重载的示例&#xff1a; 重载的调用&#xff1a; 2. 方法重写 (Override) 什么是方法重写&#xff1f; 重写的特点&#xff1a; 重写的示例&#xff1a; 重写的调用&#xff1a; 3.…

微信小程序发送订阅消息-一次订阅,一直发送消息。

实现思路长期订阅要求太高&#xff0c;需要政府、公共交通等单位才有资格&#xff0c;所以只能使用一次性订阅。 就像是买奶茶&#xff0c;下单以后&#xff0c;会弹出让用户订阅消息那种。以买奶茶为例:用户第一次下单成功&#xff0c;点击了订阅消息。&#xff08;一般都有三…

408 Request Timeout:请求超时,服务器等待客户端发送请求的时间过长。

408 Request Timeout 是 HTTP 状态码之一&#xff0c;表示客户端在发送请求时&#xff0c;服务器等待的时间过长&#xff0c;最终放弃了处理该请求。此问题通常与网络延迟、客户端配置、服务器设置或者应用程序的性能有关。1. 常见原因1.1 客户端问题网络连接延迟或不稳定&…