django连接minio实现文件上传下载(提供接口示例)
- 项目环境前提
- 1.模型创建
- 2. 在 settings.py 中添加 MINIO 配置
- 3.创建 MINIO 工具类
- 4.创建序列化器
- 5. 创建视图
- 6. 配置 URL 路由
- 7.接口测试
项目环境前提
- 已安装python3.8+以上环境
- 已安装djangorestframework环境
- 已部署mysql数据库
- 已部署minio
- 所需python依赖:django-storages、minio
1.模型创建
1.这里模型创建的前提是DRF的项目框架已搭建好。创建文件上传模型字段如下所示
注:这里只展示文件上传等字段,去除了其他字段,实际开发根据项目需求添加
# 这里仅介绍文件上传,所以只展示文件上传所需字段
class ApprovalProcess(models.Model): minio_url172_1 = models.TextField(null=True, blank=True, verbose_name='url172_1')minio_url10_1 = models.TextField(null=True, blank=True, verbose_name='url10_1')minio_source_name_1 = models.CharField(null=True, blank=True, max_length=200, verbose_name='源文件名1')minio_file_name_1 = models.TextField(null=True, blank=True, verbose_name='minio文件名1')create_time = models.DateTimeField(auto_now_add=True, null=True, blank=True, verbose_name='创建时间')update_time = models.DateTimeField(auto_now=True, null=True, blank=True, verbose_name='更新时间')is_delete = models.BooleanField(default=False, verbose_name='逻辑删除')def delete(self, using=None, keep_parents=False):# 逻辑删除# 把当前模型对象的is_delete字段改为True即可self.is_delete = Trueself.save()# 配置后台管理系统每个模型的名字显示class Meta:db_table = 'approval_process'verbose_name = '审批流程表' verbose_name_plural = verbose_nameindexes = [models.Index(fields=['minio_url172_1','minio_url10_1']),]
2. 在 settings.py 中添加 MINIO 配置
# MinIO 配置信息
# 这里有两个网段IP地址,所以配置了两个,只有一个IP的看情况配置
MINIO_STORAGE_ENDPOINT_172 = '172.xx.xx.xxx:5096' # MinIO 服务器地址1
MINIO_STORAGE_ENDPOINT_10 = '10.xx.xx.xxx:5096' # MinIO 服务器地址2
MINIO_STORAGE_ACCESS_KEY = 'minio账号' # 你的minio账号
MINIO_STORAGE_SECRET_KEY = 'minio账号密码' # 你的minio账号密码
MINIO_STORAGE_USE_HTTPS = False # 如果未启用 HTTPS,则为 False
MINIO_STORAGE_MEDIA_BUCKET_NAME = 'backstickerv3' # 用于存储文件的桶名称,前提是已在minio创建好这个文件桶
3.创建 MINIO 工具类
- 在ApprovalProcess模型下创建utils文件,在该文件下创建monio_utils.py的文件,用于处理minio的文件上传和下载
- 代码如下。共三个主要函数:
1)upload_file:处理文件上传
2)download_file:处理文件直接下载
3)get_presigned_url:生成预签名URL,处理文件下载
下载文件时可根据需求选择使用download_file或者get_presigned_url
# apps/ApprovalProcess/utils/minio_utils.py
from minio import Minio
from minio.error import S3Error
from django.conf import settings
import uuid
import os
import logging
from django.http import HttpResponse
from django.http import StreamingHttpResponse
import mimetypes # 用于根据文件名猜测 MIME 类型
import urllib.parse
from datetime import timedelta
# 配置日志
logger = logging.getLogger(__name__)class MinioClient:"""MinIO 操作工具类"""def __init__(self):# 从配置中获取端点,移除协议头#要确保这些值是字符串,可以打印查看endpoint_172 = settings.MINIO_STORAGE_ENDPOINT_172.replace('http://', '').replace('https://', '')endpoint_10 = settings.MINIO_STORAGE_ENDPOINT_10.replace('http://', '').replace('https://', '')access_key=settings.MINIO_STORAGE_ACCESS_KEYsecret_key=settings.MINIO_STORAGE_SECRET_KEYsecure=settings.MINIO_STORAGE_USE_HTTPS# 初始化 172 网段客户端self.client_172 = Minio(endpoint=endpoint_172, # 使用一个端点access_key=access_key,secret_key=secret_key,secure=secure)# 初始化 10 网段客户端self.client_10 = Minio(endpoint=endpoint_10,access_key=access_key,secret_key=secret_key,secure=secure)logger.info(f"MinIO clients initialized for both networks: 172 - {endpoint_172}, 10 - {endpoint_10}")# def upload_file(self, file_obj, file_name, bucket_name, content_type='application/octet-stream'):def upload_file(self, file_obj, file_name, bucket_name, content_type='message/rfc822'):"""上传文件到 MinIOArgs:file_obj: 文件对象(如 Django 的 UploadedFile)file_name: 希望在 MinIO 中存储的文件名bucket_name: 存储桶名称content_type: 文件类型Returns:dict: 包含文件访问 URL 等信息的字典"""# 生成唯一的对象名称,避免覆盖file_extension = os.path.splitext(file_name)[1]unique_filename = f"{uuid.uuid4().hex}{file_extension}"object_name = f"approval_uploads/{unique_filename}" # 可以添加前缀分类try:# 确保存储桶存在if not self.client_172.bucket_exists(bucket_name):self.client_172.make_bucket(bucket_name)logger.info(f"Bucket '{bucket_name}' created.")# 获取文件大小# 对于 Django 的 UploadedFile,可以使用 file_obj.sizefile_size = file_obj.size# 上传文件self.client_172.put_object(bucket_name,object_name,file_obj,file_size,content_type=content_type)# 构建文件的访问 URL(路径风格)# 添加协议头(http 或 https)protocol = "https" if settings.MINIO_STORAGE_USE_HTTPS else "http"url_172 = f"{protocol}://172.xx.xx.xxx:5096/{bucket_name}/{object_name}"url_10 = f"{protocol}://10.xx.xx.xxx:5096/{bucket_name}/{object_name}"logger.info(f"File uploaded successfully: {object_name}")return {"url_172_1": url_172,"url_10_1": url_10,"file_name": unique_filename,"original_name": file_name,"object_name": object_name}except S3Error as e:logger.error(f"MinIO S3Error occurred: {e}")raise eexcept Exception as e:logger.error(f"Unexpected error during MinIO upload: {e}")raise edef download_file(self, bucket_name, object_name, file_name):"""从 MinIO 下载文件Args:bucket_name: 存储桶名称object_name: 对象名称(在 MinIO 中的路径)file_name: 下载时显示的文件名Returns:HttpResponse: 包含文件数据的 HTTP 响应"""try:# print('file_name=', file_name)# 从 MinIO 获取文件数据response = self.client_172.get_object(bucket_name, object_name)file_data = response.read()response.close()response.release_conn()# # 确保文件名有正确的扩展名# if not file_name.lower().endswith('.eml'):# # 如果文件名没有 .eml 扩展名,添加它# file_name = f"{file_name}.eml"# 对文件名进行 URL 编码,确保特殊字符正确处理encoded_filename = urllib.parse.quote(file_name)# 创建 HTTP 响应 - 使用正确的 Content-Typecontent_type = 'message/rfc822' # .eml 文件的正确 MIME 类型http_response = HttpResponse(file_data, content_type=content_type)# 设置 Content-Disposition 头,确保浏览器正确下载文件# 使用 filename* 参数并指定 UTF-8 编码来处理可能包含非 ASCII 字符的文件名http_response['Content-Disposition'] = f'attachment; filename="{encoded_filename}"; filename*=UTF-8\'\'{encoded_filename}'# 设置 Content-Lengthhttp_response['Content-Length'] = len(file_data)logger.info(f"File downloaded successfully: {object_name}")return http_responseexcept S3Error as e:logger.error(f"MinIO S3Error occurred during download: {e}")raise eexcept Exception as e:logger.error(f"Unexpected error during MinIO download: {e}")raise edef get_presigned_url(self, bucket_name, object_name, filename=None, expiry=3600, network='both'):"""生成预签名 URL(支持双网段)Args:bucket_name: 存储桶名称object_name: 对象名称expiry: URL 有效期(秒),默认 1 小时network: 网络类型,'172'、'10' 或 'both'Returns:str 或 dict: 预签名 URL 或包含两个 URL 的字典"""try:# 将秒数转换为 timedelta 对象expires_td = timedelta(seconds=expiry)# 构建响应头参数(如果提供了自定义文件名)extra_query_params = {}# print('filename=',filename)if filename:# 对文件名进行 URL 编码encoded_filename = urllib.parse.quote(filename)# 添加响应内容处置参数,指定下载文件名extra_query_params['response-content-disposition'] = f'attachment; filename="{encoded_filename}"'# print('network=',network)if network == 'both':# 生成两个网段的 URLurl_172 = self.client_172.presigned_get_object(bucket_name, object_name, expires=expires_td,extra_query_params=extra_query_params)url_10 = self.client_10.presigned_get_object(bucket_name, object_name, expires=expires_td,extra_query_params=extra_query_params)result = {'url_172': url_172,'url_10': url_10}return resultelif network == '10':# 只生成 10 网段的 URLurl_10 = self.client_10.presigned_get_object(bucket_name, object_name, expires=expires_td,extra_query_params=extra_query_params)result = {'url_172': '','url_10': url_10}return resultelse:# 默认生成 172 网段的 URLurl_172 = self.client_172.presigned_get_object(bucket_name, object_name, expires=expires_td,extra_query_params=extra_query_params)result = {'url_172': url_172,'url_10': ''}return resultexcept S3Error as e:logger.error(f"MinIO S3Error occurred generating presigned URL: {e}")raise eexcept Exception as e:logger.error(f"Unexpected error generating presigned URL: {e}")raise e # 创建全局 MinIO 客户端实例
minio_client = MinioClient()
4.创建序列化器
- 创建处理文件上传的序列化器:ApprovalProcessCreateSerializer,重写 create 方法
from rest_framework import serializers
from rest_framework.serializers import ModelSerializer
from .models import *
from django.core.validators import FileExtensionValidatorclass ApprovalProcessCreateSerializer(serializers.ModelSerializer):# 注意:这个字段仅用于接收上传的文件,不会保存在模型中(write_only=True)# upload_file = serializers.FileField(write_only=True, required=False, label="上传文件")upload_file = serializers.FileField(write_only=True,required=False,validators=[FileExtensionValidator(allowed_extensions=['eml', 'doc', 'docx','xlsx']), # 允许的文件后缀# 还可以自定义验证函数限制文件大小],label="上传文件")class Meta:model = ApprovalProcess# 排除一些字段,这些字段将通过逻辑自动填充,而不是由用户输入exclude = ['is_delete', 'create_time', 'update_time', 'minio_url172_1', 'minio_url10_1', 'minio_source_name_1', 'minio_file_name_1']def create(self, validated_data):"""重写 create 方法,处理文件上传和模型创建"""# 1. 从验证后的数据中弹出文件数据(如果存在)uploaded_file = validated_data.pop('upload_file', None)# 2. 创建 ApprovalProcess 模型实例(先不保存文件相关信息)instance = ApprovalProcess.objects.create(**validated_data)# 3. 如果上传了文件,则处理 MinIO 上传if uploaded_file:try:from .utils.minio_utils import minio_client # 在函数内部导入,避免循环导入# 调用 MinIO 工具类上传文件upload_result = minio_client.upload_file(file_obj=uploaded_file,file_name=uploaded_file.name,bucket_name='backstickerv3' # 确保与 settings 中的桶名一致,或从设置中读取)# 4. 更新实例的 MinIO 相关字段instance.minio_url172_1 = upload_result['url_172_1']instance.minio_url10_1 = upload_result['url_10_1']instance.minio_source_name_1 = upload_result['original_name']instance.minio_file_name_1 = upload_result['file_name']instance.save() # 保存文件信息到数据库except Exception as e:# 处理文件上传失败的情况# 这里可以选择记录日志、删除刚创建的实例,或者保留实例但标记文件上传失败# 例如:instance.file_upload_error = str(e); instance.save()# 暂时打印错误,生产环境应使用日志系统print(f"File upload failed for instance {instance.id}: {str(e)}")# 即使文件上传失败,也返回实例,但可能缺少文件信息return instance
5. 创建视图
- 实现创建数据的接口:create_data
注:这里的create_data包含了其他字段的校验并创建了操作记录,根据实际情况来的,不只是处理文件上传,如果想验证文件上传的,把其他数据的校验去除即可。 - 实现文件下载的接口:download_file
- 实现获取文件下载链接(预签名 URL)的接口:get_download_url
from django.shortcuts import render
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework import status
import re,os,random
import datetime,time
from .serializers import *
from .models import *
from django.conf import settings
from django.shortcuts import get_object_or_404
from django.http import StreamingHttpResponse
# from django.utils.http import urlquote
from urllib.parse import quote
from django.db.models import Q
from datetime import timedelta,date
from django.contrib.auth.decorators import login_required
from django.utils.decorators import method_decorator
from datetime import timedelta,date
from django.db import transaction
import requests
from django.utils import timezone
from rest_framework.pagination import PageNumberPagination
from django.db import transaction
import logging
import MySQLdb
from django.db.models import Subquery, OuterRef, Exists
# 配置日志
logger = logging.getLogger(__name__)class CustomPagination(PageNumberPagination):page_size = 20 # 设置每页数据量page_size_query_param = 'page_size' # 允许客户端传递页面大小参数max_page_size = 100 # 最大页面大小限制class ApprovalProcessViewSet(viewsets.ModelViewSet): # ctrl+点击ModelViewSet可以查看源代码queryset = ApprovalProcess.objects.filter(is_delete=False) # 定义视图集使用的查询集serializer_class = ApprovalProcessSerializer # 定义视图集使用的序列化器@action(methods=['post'], detail=False)def create_data(self, request):"""处理 POST 请求,创建审批流程数据(带字段校验和事务回滚)。Request Body (multipart/form-data 或 application/json):- 包含 ApprovalProcess 模型的字段(如 project, line, responser 等)- upload_file (可选): 要上传的文件Returns:- 201 Created: 成功创建,返回创建的数据(包括文件URL,如果上传了文件)- 400 Bad Request: 数据验证失败,返回错误信息- 500 Internal Server Error: 服务器内部错误(如MinIO连接失败)"""# 1. 使用序列化器验证和解析请求数据serializer = ApprovalProcessCreateSerializer(data=request.data)if not serializer.is_valid():return Response({"success": False,"message": "数据验证失败","errors": serializer.errors}, status=status.HTTP_400_BAD_REQUEST)# 2. 手动验证必填字段required_fields = ['project', 'line', 'by_class', 'task', 'pro_code', 'work_order', 'responser', 'lose_reason', 'improve_method', 'status', 'custom_name']missing_fields = []for field in required_fields:if field not in serializer.validated_data or not serializer.validated_data[field]:missing_fields.append(field)if missing_fields:return Response({"success": False,"message": "以下字段为必填项且不能为空","missing_fields": missing_fields}, status=status.HTTP_400_BAD_REQUEST)# 3. 开始事务with transaction.atomic():# 创建保存点sid = transaction.savepoint()try:# 4. 从验证数据中提取文件(如果存在)validated_data = serializer.validated_data.copy()uploaded_file = validated_data.pop('upload_file', None)# 5. 创建 ApprovalProcess 模型实例(先不包含文件信息)instance = ApprovalProcess.objects.create(**validated_data)# 6. 如果上传了文件,则处理 MinIO 上传if uploaded_file:try:from .utils.minio_utils import minio_client# 调用 MinIO 工具类上传文件upload_result = minio_client.upload_file(file_obj=uploaded_file,file_name=uploaded_file.name,bucket_name='backstickerv3')# 7. 更新实例的 MinIO 相关字段instance.minio_url172_1 = upload_result['url_172_1']instance.minio_url10_1 = upload_result['url_10_1']instance.minio_source_name_1 = upload_result['original_name']instance.minio_file_name_1 = upload_result['file_name']instance.save()except Exception as e:# 文件上传失败,回滚事务transaction.savepoint_rollback(sid)logger.error(f"文件上传失败: {str(e)}")return Response({"success": False,"message": f"文件上传失败: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)else:# 如果没有上传文件,回滚事务transaction.savepoint_rollback(sid)logger.error("文件未上传,数据创建失败")return Response({"success": False,"message": "必须上传文件才能创建数据"}, status=status.HTTP_400_BAD_REQUEST)# 创建 FlowData 操作记录try:flow_data = FlowData.objects.create(P_id=instance.id,point=1, # 默认节点,"生产创建"result=1, # 默认处理结果,"提交"user=request.data['user'], worknumber=request.data['worknumber'], remark=request.data['remark'] # 可以根据实际情况调整备注)flow_data.save()except Exception as e:# 文件上传失败,回滚事务transaction.savepoint_rollback(sid)logger.error(f"操作记录失败: {str(e)}")return Response({"success": False,"message": f"操作记录失败: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)# 8. 提交事务transaction.savepoint_commit(sid)# 9. 构建成功的响应数据response_data = {"success": True,"message": "审批流程创建成功","data": {"id": instance.id,"project": instance.project,"task": instance.task,"status": instance.status,"create_time": instance.create_time,}}# 如果上传了文件,在响应中包括文件信息if instance.minio_url172_1:response_data["data"]["file_info"] = {"original_name": instance.minio_source_name_1,"url_172": instance.minio_url172_1,"url_10": instance.minio_url10_1}return Response(response_data, status=status.HTTP_201_CREATED)except Exception as e:# 回滚事务transaction.savepoint_rollback(sid)logger.error(f"创建审批流程失败: {str(e)}")return Response({"success": False,"message": f"服务器内部错误: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)@action(methods=['get'], detail=True)def download_file(self, request, pk=None):"""下载文件接口参数:pk: 审批流程记录的主键 ID返回:- 200 OK: 文件下载- 404 Not Found: 记录或文件不存在- 500 Internal Server Error: 服务器内部错误"""try:# 获取审批流程记录approval_process = self.get_object()# print('approval_process.minio_source_name_1=',approval_process.minio_source_name_1)# 检查文件是否存在if not approval_process.minio_file_name_1:return Response({"success": False,"message": "文件不存在"}, status=status.HTTP_404_NOT_FOUND)# 从 MinIO 下载文件from .utils.minio_utils import minio_client# 构建对象名称(与上传时一致)object_name = f"approval_uploads/{approval_process.minio_file_name_1}"# 下载文件response = minio_client.download_file(bucket_name='backstickerv3',object_name=object_name,# file_name=approval_process.minio_source_name_1 or f"file_{approval_process.id}"file_name=approval_process.minio_source_name_1)return responseexcept ApprovalProcess.DoesNotExist:return Response({"success": False,"message": "审批流程记录不存在"}, status=status.HTTP_404_NOT_FOUND)except Exception as e:logger.error(f"文件下载失败: {str(e)}")return Response({"success": False,"message": f"文件下载失败: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)@action(methods=['get'], detail=True)def get_download_url(self, request, pk=None):"""获取文件下载链接(预签名 URL)参数:pk: 审批流程记录的主键 ID返回:- 200 OK: 包含下载链接的响应- 404 Not Found: 记录或文件不存在- 500 Internal Server Error: 服务器内部错误"""try:# 获取审批流程记录approval_process = self.get_object()# print('approval_process=',approval_process.id)# 检查文件是否存在if not approval_process.minio_file_name_1:return Response({"success": False,"message": "文件不存在"}, status=status.HTTP_404_NOT_FOUND)# 从 MinIO 获取预签名 URLfrom .utils.minio_utils import minio_client# 构建对象名称(与上传时一致)object_name = f"approval_uploads/{approval_process.minio_file_name_1}"filename = approval_process.minio_source_name_1network = 'both' # 三个值:10 、172 、both:生成两个网段的 URL# 生成预签名 URL(有效期 1 小时)# print('filename=',filename)presigned_url = minio_client.get_presigned_url(bucket_name='backstickerv3',object_name=object_name,expiry=3600, # 1 小时network=network,filename=filename)return Response({"success": True,"message": "获取下载链接成功","data": {"download_url_172": presigned_url['url_172'],"download_url_10": presigned_url['url_10'],"expires_in": 3600, # 有效期(秒)"file_name": approval_process.minio_source_name_1}}, status=status.HTTP_200_OK)except ApprovalProcess.DoesNotExist:return Response({"success": False,"message": "审批流程记录不存在"}, status=status.HTTP_404_NOT_FOUND)except Exception as e:logger.error(f"获取下载链接失败: {str(e)}")return Response({"success": False,"message": f"获取下载链接失败: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
6. 配置 URL 路由
- ApprovalProcess的APP下配置路由
from .views import *
from rest_framework.routers import DefaultRouter #导入默认路由器
from django.urls import path,includeurlpatterns = [
]
# 1.创建路由器
router = DefaultRouter() #有根路由
# 2.注册路由,有其他路由时,只需要注册进来即可
router.register('ApprovalProcess',ApprovalProcessViewSet)
# 3.得到生成的路由,只会自动生成标准的restful风格的增删改查功能接口路由
#查询单一:标准只会根据id来查询 写了id最后面要加/
urlpatterns += router.urls #添加到urlpatterns中即可
- 配置主路由
from django.contrib import admin
from django.urls import path,include
from rest_framework.documentation import include_docs_urlsurlpatterns = [path('admin/', admin.site.urls),path('docs/', include_docs_urls('接口文档')), #配置接口文档路由,文档标题path('api/', include('ApprovalProcess.urls')),]
7.接口测试
使用apifox进行接口测试
- 测试文件上传功能:调用create_data接口
- 测试文件下载功能:调用download_file接口
- 测试获取文件下载链接功能:调用get_download_url接口。浏览器访问链接可下载文件。