springboot 分片上传文件 - postgres(BLOB存储)
- 方案一(推荐)
接收完整文件,后端自动分片并存储(多线程 大文件)
/*** 接收完整文件,后端自动分片并存储(多线程 大文件)* @param file* @return* @throws Exception*/public String uploadChunkFile(MultipartFile file) throws Exception {String uploadId = UUID.randomUUID().toString();long fileSize = file.getSize();long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);if (totalChunks <= 0) {return "文件大小异常,无法分片";}// 1. 创建临时目录存储分片(大文件避免内存溢出)File tempDir = Files.createTempDirectory("file-chunk-").toFile();//设置JVM退出时自动删除该目录tempDir.deleteOnExit();try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;int chunkIndex = 0;// 2. 先将所有分片写入临时文件(流式处理,不占大量内存)while ((bytesRead = inputStream.read(buffer)) != -1) {File chunkFile = new File(tempDir, uploadId + "-" + chunkIndex);try (FileOutputStream fos = new FileOutputStream(chunkFile)) {fos.write(buffer, 0, bytesRead); // 只写实际读取的字节}chunkIndex++;}// 3. 一次性提交所有分片任务,使用工具类等待完成ThreadPoolUtils.getNewInstance().submitBatchTasks((int) totalChunks, taskIndex -> {try {// 读取临时分片文件(每个任务只加载自己的分片数据)File chunkFile = new File(tempDir, uploadId + "-" + taskIndex);byte[] chunkData = Files.readAllBytes(chunkFile.toPath());// 存储到数据库FileUploadEntity entity = new FileUploadEntity();entity.setId(IdGenerator.nextId());entity.setUploadId(uploadId);entity.setChunkSize((long) chunkData.length);entity.setChunkNum(totalChunks);entity.setChunkFile(chunkData);entity.setChunkIndex(taskIndex);fileUploadMapper.insertFile(entity);} catch (IOException e) {throw new RuntimeException("分片" + taskIndex + "存储失败", e);}});} catch (Exception e) {log.error("文件分片上传失败", e);throw new RuntimeException("文件分片上传失败");} finally {// 4. 清理临时文件deleteDir(tempDir);}return "文件已成功分片存储,uploadId: " + uploadId;}// 递归删除临时目录private boolean deleteDir(File dir) {if (dir.isDirectory()) {File[] children = dir.listFiles();if (children != null) {for (File child : children) {deleteDir(child);}}}return dir.delete();}
- 方案二
接收完整文件,后端自动分片并存储(多线程 小文件)。。。 大文件可能会内存溢出
/*** 接收完整文件,后端自动分片并使用多线程存储 (多线程 小文件)* @param file* @return* @throws IOException, InterruptedException*/
public String uploadChunkFile(MultipartFile file) throws IOException, InterruptedException {// 生成唯一上传ID,用于标识同一文件的所有分片String uploadId = UUID.randomUUID().toString();String fileName = file.getOriginalFilename();long fileSize = file.getSize();// 计算总分片数long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);if (totalChunks <= 0) {return "文件大小异常,无法分片";}// 读取文件所有分片数据到内存(小文件适用,大文件建议使用磁盘临时文件)List<byte[]> chunkDataList = new ArrayList<>();try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;while ((bytesRead = inputStream.read(buffer)) != -1) {byte[] chunkData = new byte[bytesRead];System.arraycopy(buffer, 0, chunkData, 0, bytesRead);chunkDataList.add(chunkData);}}// 获取线程池工具类实例ThreadPoolUtils threadPool = ThreadPoolUtils.getNewInstance();// 提交批量分片任务并等待完成threadPool.submitBatchTasks((int) totalChunks, chunkIndex -> {byte[] currentChunkData = chunkDataList.get(chunkIndex);long currentChunkSize = currentChunkData.length;// 存储分片数据到数据库FileUploadEntity fileUpload = new FileUploadEntity();fileUpload.setId(IdGenerator.nextId());fileUpload.setUploadId(uploadId);fileUpload.setChunkSize(currentChunkSize);fileUpload.setChunkNum(totalChunks);fileUpload.setChunkFile(currentChunkData);fileUpload.setChunkIndex(chunkIndex);fileUploadMapper.insertFile(fileUpload);});return "文件已成功分片存储,uploadId: " + uploadId;
}
- 方案三
接收完整文件,后端自动分片并存储 (单线程) 。。。。上传大文件时间太久
/*** 接收完整文件,后端自动分片并存储* @param file* @return* @throws IOException*/public String uploadChunkFileBackup(MultipartFile file) throws IOException {// 生成唯一上传ID,用于标识同一文件的所有分片String uploadId = UUID.randomUUID().toString();String fileName = file.getOriginalFilename();long fileSize = file.getSize();// 计算总分片数long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);List<FileUploadEntity> list = new ArrayList<>();try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;int chunkIndex = 0;// 循环读取文件并分片while ((bytesRead = inputStream.read(buffer)) != -1) {// 处理最后一个可能小于标准分片大小的分片byte[] chunkData = new byte[bytesRead];System.arraycopy(buffer, 0, chunkData, 0, bytesRead);// 获取分片实际大小(字节数)long chunkActualSize = bytesRead; // 这就是当前分片的实际大小// 存储当前分片
// saveChunk(uploadId, chunkIndex, totalChunks, chunkData, fileSize, fileName);// 存储当前分片FileUploadEntity fileUpload = new FileUploadEntity();fileUpload.setId(IdGenerator.nextId());fileUpload.setUploadId(uploadId);fileUpload.setChunkSize(chunkActualSize);fileUpload.setChunkNum(totalChunks);fileUpload.setChunkFile(chunkData);fileUpload.setChunkIndex(chunkIndex);fileUploadMapper.insertFile(fileUpload);
// list.add(fileUpload);chunkIndex++;}}//批量添加
// int batchSize = 500;
// for (int i = 0; i < list.size(); i += batchSize) {
// int end = Math.min(i + batchSize, list.size());
// List<FileUploadEntity> subList = list.subList(i, end);
// fileUploadMapper.batchInsert(subList);
// }return "文件已成功分片存储,uploadId: " + uploadId;}
-
方案四
接收完整文件,后端自动分片并使用 (多线程)线程池未封装
/*** 接收完整文件,后端自动分片并使用 (多线程)线程池未封装* @param file* @return* @throws IOException, InterruptedException*/
// @Overridepublic String uploadChunkFile(MultipartFile file) throws IOException, InterruptedException {// 生成唯一上传ID,用于标识同一文件的所有分片String uploadId = UUID.randomUUID().toString();String fileName = file.getOriginalFilename();long fileSize = file.getSize();// 计算总分片数long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);// 创建线程池,核心线程数可根据服务器配置调整// 通常设置为CPU核心数 * 2 + 1int corePoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;ExecutorService executorService = Executors.newFixedThreadPool(corePoolSize);// 使用CountDownLatch等待所有线程完成CountDownLatch countDownLatch = new CountDownLatch((int) totalChunks);try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;int chunkIndex = 0;// 循环读取文件并分片while ((bytesRead = inputStream.read(buffer)) != -1) {// 处理最后一个可能小于标准分片大小的分片byte[] chunkData = new byte[bytesRead];System.arraycopy(buffer, 0, chunkData, 0, bytesRead);long chunkActualSize = bytesRead;// 捕获当前变量的快照,避免线程安全问题final int currentChunkIndex = chunkIndex;final byte[] currentChunkData = chunkData;final long currentChunkSize = chunkActualSize;// 提交分片存储任务到线程池executorService.submit(() -> {try {FileUploadEntity fileUpload = new FileUploadEntity();fileUpload.setId(IdGenerator.nextId());fileUpload.setUploadId(uploadId);fileUpload.setChunkSize(currentChunkSize);fileUpload.setChunkNum(totalChunks);fileUpload.setChunkFile(currentChunkData);fileUpload.setChunkIndex(currentChunkIndex);fileUploadMapper.insertFile(fileUpload);} finally {// 无论是否发生异常,都减少计数器countDownLatch.countDown();}});chunkIndex++;}// 等待所有分片处理完成countDownLatch.await();} finally {// 关闭线程池executorService.shutdown();}return "文件已成功分片存储,uploadId: " + uploadId;}
-
方案五
大对象(Large Object)方案
/*** 大对象(Large Object)方案** PostgreSQL 的大对象(Large Object)机制要求:* 二进制数据通过LargeObjectManager写入,返回一个OID(数字类型的对象 ID)* 表中只存储这个OID,而不是直接存储二进制数据* 读取时通过OID从大对象管理器中获取数据* @param file* @return*/@Overridepublic String uploadLargeObjectFile(MultipartFile file) {if (file.isEmpty()) {return "请选择文件";}try {long fileSize = file.getSize();String fileName = file.getOriginalFilename();long largeObjectId = postgresLargeObjectUtil.createLargeObject(file.getInputStream());FileUploadEntity fileUpload = new FileUploadEntity();fileUpload.setId(IdGenerator.nextId());fileUpload.setUploadId(String.valueOf(largeObjectId));fileUpload.setChunkSize(fileSize);fileUpload.setChunkNum(fileSize);fileUpload.setChunkFile(null);fileUpload.setChunkIndex(2);fileUploadMapper.insertLargeObjectFile(fileUpload);return "大文件上传成功!文件名:" + fileName + ",大小:" + fileSize + "字节";}catch (Exception e) {log.error("上传大文件失败:{}", e);return "上传失败:" + e.getMessage();}}//下载@Overridepublic void downloadFile(Long fileId, HttpServletResponse response) {FileUploadEntity fileEntity = fileUploadMapper.getFileById(fileId);long oid = Long.valueOf( fileEntity.getUploadId());try {response.reset();response.setContentType("application/octet-stream");String filename = "fileName.zip";response.addHeader("Content-Disposition", "attachment; filename=" + URLEncoder.encode(filename, "UTF-8"));ServletOutputStream outputStream = response.getOutputStream();postgresLargeObjectUtil.readLargeObject(oid, outputStream);}catch (Exception e) {log.error("下载文件失败:{}", e);}}
-
方案六
文件字节上传
/*** 文件字节上传* @param file* @return*/@Overridepublic String uploadFileByte(MultipartFile file) {if (file.isEmpty()) {return "请选择文件";}try {// 获取文件信息String fileName = file.getOriginalFilename();long fileSize = file.getSize();byte[] fileData = file.getBytes(); // 小文件直接获取字节数组// 执行插入(大文件建议用流:file.getInputStream())String sql = "INSERT INTO system_upload_test (id, upload_id, chunk_size, chunk_num, chunk_file, chunk_index) VALUES (?, ?, ?, ?, ?, ?)";jdbcTemplate.update(sql,111L,"2222",222L,3L,fileData,33L);return "文件上传成功!";} catch (Exception e) {e.printStackTrace();return "文件上传失败:" + e.getMessage();}}// 大文件用流:file.getInputStream()public String uploadBigFile(MultipartFile file) throws Exception {// 1. 定义 SQL(注意:字段顺序和占位符对应)String sql = "INSERT INTO user_qgcgk_app.system_upload_test " +"(id, upload_id, chunk_size, chunk_num, chunk_file, chunk_index) " +"VALUES (?, ?, ?, ?, ?, ?)";// 2. 准备参数(确保 InputStream 未关闭)Long id = 1795166209435262976L;String uploadId = "3333";Long chunkSize = 7068L;Long chunkNum = 7068L;InputStream chunkInputStream = file.getInputStream(); // 你的 InputStream(如 FileInputStream、ServletInputStream)Integer chunkIndex = 2;try {// 3. 执行 SQL:通过 PreparedStatementSetter 手动绑定参数jdbcTemplate.update(sql, new PreparedStatementSetter() {@Overridepublic void setValues(PreparedStatement ps) throws SQLException {// 绑定非流参数(按顺序,类型匹配)ps.setLong(1, id); // 第1个参数:id(Long)ps.setString(2, uploadId); // 第2个参数:upload_id(String)ps.setLong(3, chunkSize); // 第3个参数:chunk_size(Long)ps.setLong(4, chunkNum); // 第4个参数:chunk_num(Long)// 关键:绑定 InputStream 到 bytea 字段(第5个参数)// 第三个参数传 -1 表示“未知流长度”,PostgreSQL 支持此模式ps.setBinaryStream(5, chunkInputStream, file.getSize());ps.setInt(6, chunkIndex); // 第6个参数:chunk_index(Int)}});} finally {// 4. 执行完成后关闭流,释放资源if (chunkInputStream != null) {chunkInputStream.close();}}return "上传成功!";}
- 方案七
无临时文件+多线(减少IO操作)
/*** 无临时文件+多线程+批量插入的分片上传*/public String uploadChunkFile(MultipartFile file) throws Exception {// 生成唯一上传IDString uploadId = UUID.randomUUID().toString();long fileSize = file.getSize();long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);if (totalChunks <= 0) {return "文件大小异常,无法分片";}try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;int chunkIndex = 0;// 批量插入缓冲区(每10个分片一批)List<FileUploadEntity> batchList = new ArrayList<>(10);// 计数器:等待所有批量任务完成CountDownLatch latch = new CountDownLatch((int) Math.ceil((double) totalChunks / 10));// 流式读取并处理分片while ((bytesRead = inputStream.read(buffer)) != -1) {// 复制当前分片数据(避免buffer被覆盖)byte[] chunkData = Arrays.copyOfRange(buffer, 0, bytesRead);// 创建分片实体FileUploadEntity entity = new FileUploadEntity();entity.setId(IdGenerator.nextId());entity.setUploadId(uploadId);entity.setChunkSize((long) chunkData.length);entity.setChunkNum(totalChunks);entity.setChunkFile(chunkData);entity.setChunkIndex(chunkIndex);batchList.add(entity);chunkIndex++;// 批量条件:满10个分片或最后一个分片if (batchList.size() >= 10 || chunkIndex == totalChunks) {// 复制当前批次(避免线程安全问题)List<FileUploadEntity> currentBatch = new ArrayList<>(batchList);// 提交批量插入任务ThreadPoolUtils.getNewInstance().executor(() -> {try {fileUploadMapper.batchInsert(currentBatch);} finally {latch.countDown(); // 任务完成,计数器减1}});batchList.clear(); // 清空缓冲区}}// 等待所有批量任务完成(最多等待5分钟)boolean allCompleted = latch.await(5, java.util.concurrent.TimeUnit.MINUTES);if (!allCompleted) {throw new BusinessException("文件分片上传超时,请重试");}} catch (Exception e) {log.error("文件分片上传失败,uploadId:{}", uploadId, e);// 失败时清理已上传的分片(可选)
// fileUploadMapper.deleteByUploadId(uploadId);throw new BusinessException("文件分片上传失败:" + e.getMessage());}return "文件已成功分片存储,uploadId: " + uploadId;}/*** 无临时文件+多线程+单条插入的分片上传*/public String uploadChunkFile(MultipartFile file) throws Exception {String uploadId = UUID.randomUUID().toString();long fileSize = file.getSize();long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);if (totalChunks <= 0) {return "文件大小异常,无法分片";}try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;int chunkIndex = 0;// 用于等待所有分片完成CountDownLatch latch = new CountDownLatch((int) totalChunks);// 边读取边提交分片任务,无需临时文件while ((bytesRead = inputStream.read(buffer)) != -1) {// 复制当前分片数据(避免buffer被下一次read覆盖)byte[] chunkData = Arrays.copyOfRange(buffer, 0, bytesRead);final int currentIndex = chunkIndex;// 提交异步任务ThreadPoolUtils.getNewInstance().executor(() -> {try {// 直接用内存中的分片数据写入数据库FileUploadEntity entity = new FileUploadEntity();entity.setId(IdGenerator.nextId());entity.setUploadId(uploadId);entity.setChunkSize((long) chunkData.length);entity.setChunkNum(totalChunks);entity.setChunkFile(chunkData);entity.setChunkIndex(currentIndex);fileUploadMapper.insertFile(entity);} catch (Exception e) {throw new RuntimeException("分片" + currentIndex + "存储失败", e);} finally {latch.countDown();}});chunkIndex++;}// 等待所有分片完成latch.await();} catch (Exception e) {log.error("文件分片上传失败", e);throw new BusinessException("文件分片上传失败");}return "文件已成功分片存储,uploadId: " + uploadId;}
- 工具类PostgreSQL大对象工具类```java
import lombok.extern.slf4j.Slf4j;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.SQLException;/*** PostgreSQL大对象工具类* @author: zrf* @date: 2025/08/25 16:09*/
@Slf4j
@Component
public class PostgresLargeObjectUtil {private final JdbcTemplate jdbcTemplate;public PostgresLargeObjectUtil(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}/*** 从输入流创建大对象并返回OID*/@Transactionalpublic long createLargeObject(InputStream inputStream) throws SQLException {// 获取数据库连接并关闭自动提交Connection connection = jdbcTemplate.getDataSource().getConnection();connection.setAutoCommit(false);try {// 获取PostgreSQL大对象管理器LargeObjectManager lobjManager = connection.unwrap(org.postgresql.PGConnection.class).getLargeObjectAPI();// 创建大对象,返回OIDlong oid = lobjManager.createLO(LargeObjectManager.READ | LargeObjectManager.WRITE);// 打开大对象并写入数据try (LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.WRITE)) {OutputStream outputStream = largeObject.getOutputStream();byte[] buffer = new byte[8192];int bytesRead;while ((bytesRead = inputStream.read(buffer)) != -1) {outputStream.write(buffer, 0, bytesRead);}}connection.commit();return oid;} catch (Exception e) {connection.rollback();throw new SQLException("创建大对象失败", e);} finally {connection.close();}}/*** 根据OID读取大对象内容到输出流*/public void readLargeObject(long oid, OutputStream outputStream) throws Exception {Connection connection = jdbcTemplate.getDataSource().getConnection();connection.setAutoCommit(false);try {LargeObjectManager lobjManager = connection.unwrap(org.postgresql.PGConnection.class).getLargeObjectAPI();try (LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ)) {InputStream inputStream = largeObject.getInputStream();byte[] buffer = new byte[8192];int bytesRead;while ((bytesRead = inputStream.read(buffer)) != -1) {outputStream.write(buffer, 0, bytesRead);}}connection.commit();} catch (Exception e) {log.error("读取大对象失败", e);} finally {connection.close();}}/*** 删除大对象(释放磁盘空间)*/@Transactionalpublic void deleteLargeObject(long oid) throws SQLException {Connection connection = jdbcTemplate.getDataSource().getConnection();connection.setAutoCommit(false);try {LargeObjectManager lobjManager = connection.unwrap(org.postgresql.PGConnection.class).getLargeObjectAPI();lobjManager.delete(oid);connection.commit();} catch (Exception e) {connection.rollback();throw new SQLException("删除大对象失败", e);} finally {connection.close();}}
}
线程池工具类
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;/*** @Author:zrf* @Date:2023/8/14 10:05* @description:线程池工具类*/
public class ThreadPoolUtils {/*** 系统可用计算资源*/private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();/*** 核心线程数*/private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));/*** 最大线程数*/private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;/*** 空闲线程存活时间*/private static final int KEEP_ALIVE_SECONDS = 30;/*** 工作队列*/private static final BlockingQueue<Runnable> POOL_WORK_QUEUE = new LinkedBlockingQueue<>(128);/*** 工厂模式*/private static final MyThreadFactory MY_THREAD_FACTORY = new MyThreadFactory();/*** 饱和策略*/private static final ThreadRejectedExecutionHandler THREAD_REJECTED_EXECUTION_HANDLER = new ThreadRejectedExecutionHandler.CallerRunsPolicy();/*** 线程池对象*/private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR;/*** 声明式定义线程池工具类对象静态变量,在所有线程中同步*/private static volatile ThreadPoolUtils threadPoolUtils = null;/*** 初始化线程池静态代码块*/static {THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(//核心线程数CORE_POOL_SIZE,//最大线程数MAXIMUM_POOL_SIZE,//空闲线程执行时间KEEP_ALIVE_SECONDS,//空闲线程执行时间单位TimeUnit.SECONDS,//工作队列(或阻塞队列)POOL_WORK_QUEUE,//工厂模式MY_THREAD_FACTORY,//饱和策略THREAD_REJECTED_EXECUTION_HANDLER);}/*** 线程池工具类空参构造方法*/private ThreadPoolUtils() {}/*** 获取线程池工具类实例* @return*/public static ThreadPoolUtils getNewInstance(){if (threadPoolUtils == null) {synchronized (ThreadPoolUtils.class) {if (threadPoolUtils == null) {threadPoolUtils = new ThreadPoolUtils();}}}return threadPoolUtils;}/*** 执行线程任务* @param runnable 任务线程*/public void executor(Runnable runnable) {THREAD_POOL_EXECUTOR.execute(runnable);}/*** 执行线程任务-有返回值* @param callable 任务线程*/public <T> Future<T> submit(Callable<T> callable) {return THREAD_POOL_EXECUTOR.submit(callable);}/*** 提交批量任务并等待所有任务完成* @param totalTasks 总任务数量* @param taskConsumer 任务消费者(接收任务索引,处理具体任务逻辑)* @throws InterruptedException 等待被中断时抛出*/public void submitBatchTasks(int totalTasks, Consumer<Integer> taskConsumer) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(totalTasks);for (int i = 0; i < totalTasks; i++) {final int taskIndex = i;// 使用现有线程池提交任务THREAD_POOL_EXECUTOR.submit(() -> {try {taskConsumer.accept(taskIndex); // 执行具体任务逻辑} finally {countDownLatch.countDown(); // 任务完成,计数器减1}});}countDownLatch.await(); // 等待所有任务完成}/*** 获取线程池状态* @return 返回线程池状态*/public boolean isShutDown(){return THREAD_POOL_EXECUTOR.isShutdown();}/*** 停止正在执行的线程任务* @return 返回等待执行的任务列表*/public List<Runnable> shutDownNow(){return THREAD_POOL_EXECUTOR.shutdownNow();}/*** 关闭线程池*/public void showDown(){THREAD_POOL_EXECUTOR.shutdown();}/*** 关闭线程池后判断所有任务是否都已完成* @return*/public boolean isTerminated(){return THREAD_POOL_EXECUTOR.isTerminated();}
}