HDFS (Hadoop Distributed File System) 作为大数据生态的核心存储系统,提供了分布式、高容错、高吞吐量的数据存储能力。通过 Java API 操作 HDFS 是开发大数据应用的基础技能。本文将基于你的笔记,详细解析 HDFS Java API 的使用方法,并提供完整的代码示例和最佳实践。
一、前置准备与环境配置
1. 添加 Maven 依赖
在 pom.xml
中添加 Hadoop 客户端依赖:
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version>
</dependency>
推荐使用与生产环境一致的 Hadoop 版本(如 3.2.4)。
2. 初始化 FileSystem 对象
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.After;
import java.net.URI;public class HdfsTest {private FileSystem fileSystem;private Configuration configuration;public static final String HDFS_ENPOINT = "hdfs://ip:8020";public static final String basePath = "/test/java_api";@Beforepublic void preEnv() throws Exception {// 设置 Hadoop 用户和路径System.setProperty("HADOOP_USER_NAME", "hadoop");System.setProperty("HADOOP_HOME", "D:\\Program Files\\hadoop-3.0.0");System.setProperty("hadoop.home.dir", "D:\\document");// 配置 HDFS 连接configuration = new Configuration();fileSystem = FileSystem.get(new URI(HDFS_ENPOINT), configuration);}@Afterpublic void afterEnv() throws Exception {if (fileSystem != null) {fileSystem.close();}}
}
二、核心文件操作
1. 创建目录
@Test
public void mkdir() throws Exception {boolean result = fileSystem.mkdirs(new Path(basePath));System.out.println("创建目录: " + (result ? "成功" : "失败"));
}
2. 创建文件并写入内容
@Test
public void create() throws Exception {Path filePath = new Path(basePath + "/test.txt");try (FSDataOutputStream outputStream = fileSystem.create(filePath)) {outputStream.write("Java operation hdfs!".getBytes());System.out.println("文件创建成功: " + filePath);}
}
3. 查看文件内容
@Test
public void cat() throws Exception {Path filePath = new Path(basePath + "/test.txt");try (FSDataInputStream inputStream = fileSystem.open(filePath)) {IOUtils.copyBytes(inputStream, System.out, 1024, false);}
}
4. 文件重命名
@Test
public void rename() throws Exception {Path srcPath = new Path(basePath + "/test.txt");Path dstPath = new Path(basePath + "/renameTest.txt");boolean result = fileSystem.rename(srcPath, dstPath);System.out.println("重命名文件: " + (result ? "成功" : "失败"));
}
三、文件上传与下载
1. 本地上传至 HDFS
@Test
public void copyFromLocalFile() throws Exception {Path localPath = new Path("D:\\document\\slow_query.log");Path hdfsPath = new Path(basePath);fileSystem.copyFromLocalFile(localPath, hdfsPath);System.out.println("文件上传成功: " + localPath);
}
2. 带进度条的大文件上传
@Test
public void copyFromLocalFileWithProgress() throws Exception {Path hdfsPath = new Path(basePath + "/slow_query.log");File localFile = new File("D:\\document\\slow_query.log");try (InputStream is = new BufferedInputStream(Files.newInputStream(localFile.toPath()));FSDataOutputStream outputStream = fileSystem.create(hdfsPath, new Progressable() {long totalBytes = localFile.length();long bytesWritten = 0;int lastProgress = 0;@Overridepublic void progress() {bytesWritten += 4096;int progress = (int) ((bytesWritten * 100) / totalBytes);if (progress > lastProgress) {System.out.print("\r上传进度: " + progress + "%");lastProgress = progress;}}})) {IOUtils.copyBytes(is, outputStream, 4096, false);System.out.println("\n文件上传完成: " + hdfsPath);}
}
3. 从 HDFS 下载到本地
@Test
public void copyToLocalFile() throws Exception {Path srcPath = new Path(basePath + "/renameTest.txt");Path dstPath = new Path("E://test.txt");// 使用原始本地文件系统,避免 Windows 兼容性问题fileSystem.copyToLocalFile(false, srcPath, dstPath, true);System.out.println("文件下载成功: " + dstPath);
}
四、文件管理与元数据操作
1. 查看文件列表
@Test
public void listFiles() throws Exception {Path dirPath = new Path(basePath);RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(dirPath, true);System.out.println("文件列表:");while (iterator.hasNext()) {LocatedFileStatus status = iterator.next();System.out.printf("%s\t%s\t%d\t%d\t%s%n",status.isDirectory() ? "文件夹" : "文件",status.getReplication(),status.getBlockSize(),status.getLen(),status.getPath());}
}
2. 删除文件/目录
@Test
public void delete() throws Exception {Path path = new Path(basePath + "/slow_query.log");boolean result = fileSystem.delete(path, false); // 非递归删除System.out.println("删除文件: " + (result ? "成功" : "失败"));
}
五、高级特性与最佳实践
1. 设置文件副本系数
@Test
public void setReplication() throws Exception {Path filePath = new Path(basePath + "/test.txt");short replication = 2; // 设置副本数为2boolean result = fileSystem.setReplication(filePath, replication);System.out.println("设置副本系数: " + (result ? "成功" : "失败"));
}
2. 检查文件是否存在
@Test
public void checkFileExists() throws Exception {Path filePath = new Path(basePath + "/test.txt");boolean exists = fileSystem.exists(filePath);System.out.println("文件是否存在: " + exists);
}
3. 获取文件块位置信息
@Test
public void getFileBlockLocations() throws Exception {Path filePath = new Path(basePath + "/test.txt");FileStatus fileStatus = fileSystem.getFileStatus(filePath);BlockLocation[] locations = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());System.out.println("文件块位置信息:");for (BlockLocation location : locations) {System.out.printf("块偏移量: %d, 长度: %d, 所在节点: %s%n",location.getOffset(),location.getLength(),Arrays.toString(location.getHosts()));}
}
六、常见问题与解决方案
1. Windows 环境兼容性问题
- 错误信息:
java.io.IOException: Could not locate executable
- 解决方案:
- 设置
HADOOP_HOME
和hadoop.home.dir
系统变量 - 下载 Windows 版 Hadoop 二进制文件(如 winutils.exe)
- 设置
2. 权限不足问题
- 错误信息:
Permission denied: user=xxx, access=WRITE
- 解决方案:
// 在创建 FileSystem 时指定用户 fileSystem = FileSystem.get(new URI(HDFS_ENPOINT), configuration, "hdfs");
3. 大文件上传性能优化
- 使用带进度反馈的上传方法
- 调整缓冲区大小:
FSDataOutputStream outputStream = fileSystem.create(filePath, true, 8192); // 8KB 缓冲区
七、完整示例代码
下面是一个整合所有操作的完整示例:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import java.io.*;
import java.net.URI;
import java.nio.file.Files;public class HdfsOperationExample {private FileSystem fileSystem;private Configuration configuration;public static final String HDFS_ENPOINT = "hdfs://ip:8020";public static final String basePath = "/test/java_api";@Beforepublic void preEnv() throws Exception {System.setProperty("HADOOP_USER_NAME", "hadoop");configuration = new Configuration();fileSystem = FileSystem.get(new URI(HDFS_ENPOINT), configuration);}@Afterpublic void afterEnv() throws Exception {if (fileSystem != null) {fileSystem.close();}}@Testpublic void testAllOperations() throws Exception {// 创建目录mkdir();// 创建文件并写入内容create();// 查看文件内容System.out.println("\n文件内容:");cat();// 重命名文件rename();// 上传本地文件copyFromLocalFile();// 带进度条的上传copyFromLocalFileWithProgress();// 查看文件列表System.out.println("\n文件列表:");listFiles();// 获取文件块位置System.out.println("\n文件块位置:");getFileBlockLocations();// 删除文件delete();}// 其他方法实现...
}
八、性能调优建议
- 批量操作:避免频繁创建和关闭 FileSystem 对象
- 缓冲区设置:
// 增大缓冲区提高写入性能 FSDataOutputStream outputStream = fileSystem.create(filePath, true, 65536); // 64KB
- 异步操作:对于大文件上传,考虑使用异步回调机制
- 连接池:在生产环境中使用连接池管理 FileSystem 实例
通过本文的示例,你可以全面掌握 HDFS Java API 的使用方法。在实际开发中,建议根据业务需求选择合适的 API,并注意处理异常和资源释放,以确保程序的健壮性和性能。