以下是一个使用fastapi上传视频的接口,记录一下工程化后端程序的业务逻辑
重点是代码如何抽离
项目结构优化
project/
├── .env # 环境变量配置
├── app/
│ ├── __init__.py
│ ├── main.py # 主应用入口
│ ├── core/ # 核心配置
│ │ ├── __init__.py
│ │ ├── config.py # 应用配置
│ │ ├── database.py # 数据库配置
│ │ └── middleware.py # 中间件配置
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ └── video.py # 视频模型
│ ├── schemas/ # Pydantic模型
│ │ ├── __init__.py
│ │ └── video.py # 视频响应模型
│ ├── services/ # 业务逻辑
│ │ ├── __init__.py
│ │ └── video_service.py # 视频服务
│ ├── utils/ # 工具函数
│ │ ├── __init__.py
│ │ └── file_utils.py # 文件处理工具
│ └── routers/ # 路由模块
│ ├── __init__.py
│ ├── video.py # 视频路由
│ └── train.py # 训练路由
├── static/ # 静态文件
│ └── videos/ # 视频存储
└── alembic/ # 数据库迁移
1. 数据库配置 (app/core/database.py)
from tortoise import Tortoise
from tortoise.contrib.fastapi import register_tortoise
from app.core.config import settingsasync def init_db():"""初始化数据库连接"""await Tortoise.init(db_url=settings.DATABASE_URL,modules={'models': ['app.models']},generate_schemas=False # 禁用自动建表(适配已有表))def setup_database(app):"""注册数据库到FastAPI应用"""register_tortoise(app,db_url=settings.DATABASE_URL,modules={'models': ['app.models']},generate_schemas=False, # 禁用自动建表add_exception_handlers=True)
2. 应用配置 (app/core/config.py)
import os
from pydantic import BaseSettingsclass Settings(BaseSettings):DATABASE_URL: str = "mysql://user:password@localhost:3306/video_service"UPLOAD_DIR: str = "static/videos"ALLOWED_EXTENSIONS: set = {".mp4", ".mov", ".avi", ".mkv", ".webm"}MAX_FILE_SIZE_MB: int = 2000MAX_FILE_SIZE: int = MAX_FILE_SIZE_MB * 1024 * 1024class Config:env_file = ".env"settings = Settings()
3. 数据模型 (app/models/video.py)
from tortoise.models import Model
from tortoise import fieldsclass VideoRecord(Model):id = fields.IntField(pk=True)original_filename = fields.CharField(max_length=255)saved_filename = fields.CharField(max_length=255)server_path = fields.CharField(max_length=512)file_size = fields.BigIntField()file_type = fields.CharField(max_length=10)upload_time = fields.DatetimeField(auto_now_add=True)unique_id = fields.UUIDField()class Meta:table = "video_records" # 指定已有表名table_description = None # 禁用自动字段修改
4. Pydantic模型 (app/schemas/video.py)
from datetime import datetime
from pydantic import BaseModelclass VideoUploadResponse(BaseModel):message: strsaved_filename: strpath: strabsolute_path: stroriginal_filename: strsize: intrecord_id: intupload_time: datetimefile_type: str
5. 文件工具 (app/utils/file_utils.py)
import os
import uuid
from datetime import datetime
from app.core.config import settingsdef ensure_upload_dir():"""确保上传目录存在"""os.makedirs(settings.UPLOAD_DIR, exist_ok=True)def is_valid_video(filename: str) -> bool:"""检查文件扩展名是否为允许的视频格式"""return any(filename.lower().endswith(ext) for ext in settings.ALLOWED_EXTENSIONS)def generate_new_filename(original_filename: str) -> str:"""生成唯一的带时间戳的新文件名"""ext = os.path.splitext(original_filename)[1]timestamp = datetime.now().strftime("%Y%m%d%H%M%S")unique_id = uuid.uuid4().hex[:6]return f"video_{timestamp}_{unique_id}{ext}"
6. 视频服务 (app/services/video_service.py)
import os
import logging
from fastapi import HTTPException
from app.models.video import VideoRecord
from app.utils.file_utils import generate_new_filename, is_valid_video
from app.core.config import settingslogger = logging.getLogger(__name__)async def upload_video(file):"""视频上传服务逻辑"""# 验证文件类型if not is_valid_video(file.filename):raise HTTPException(400, f"不支持的文件格式。仅支持: {', '.join(settings.ALLOWED_EXTENSIONS)}")# 计算文件大小file_size = 0while chunk := await file.read(10 * 1024 * 1024): # 10MB chunksfile_size += len(chunk)await file.seek(0)# 检查文件大小if file_size > settings.MAX_FILE_SIZE:raise HTTPException(413, f"文件太大,最大允许 {settings.MAX_FILE_SIZE_MB}MB")# 生成新文件名new_filename = generate_new_filename(file.filename)save_path = os.path.join(settings.UPLOAD_DIR, new_filename)abs_save_path = os.path.abspath(save_path)file_ext = os.path.splitext(file.filename)[1].lower()# 保存文件try:with open(save_path, "wb") as buffer:while chunk := await file.read(10 * 1024 * 1024):buffer.write(chunk)logger.info(f"视频保存成功: {abs_save_path}")except Exception as e:logger.error(f"文件保存失败: {abs_save_path}, 错误: {str(e)}")if os.path.exists(save_path):os.remove(save_path)raise HTTPException(500, f"文件保存失败: {str(e)}")finally:await file.close()# 创建数据库记录try:video_record = await VideoRecord.create(original_filename=file.filename,saved_filename=new_filename,server_path=abs_save_path,file_size=file_size,file_type=file_ext)logger.info(f"数据库记录创建成功,ID: {video_record.id}")except Exception as e:if os.path.exists(save_path):os.remove(save_path)logger.error(f"数据库操作失败: {str(e)}")raise HTTPException(500, f"数据库操作失败: {str(e)}")return {"saved_filename": new_filename,"path": f"/{settings.UPLOAD_DIR}/{new_filename}","absolute_path": abs_save_path,"original_filename": file.filename,"size": file_size,"record_id": video_record.id,"upload_time": video_record.upload_time,"file_type": file_ext}
7. 视频路由 (app/routers/video.py)
from fastapi import APIRouter, UploadFile, File
from fastapi.responses import JSONResponse
from app.services.video_service import upload_video
from app.schemas.video import VideoUploadResponserouter = APIRouter()@router.post("/upload-video", response_model=VideoUploadResponse)
async def api_upload_video(file: UploadFile = File(...)):"""视频上传接口"""result = await upload_video(file)return JSONResponse(status_code=200,content={"message": "视频上传并记录成功",**result})
8. 主应用 (app/main.py)
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from contextlib import asynccontextmanager
from app.core import config, database, middleware
from app.routers import video, trainsettings = config.settings@asynccontextmanager
async def lifespan(app: FastAPI):# 初始化数据库await database.init_db()yieldapp = FastAPI(lifespan=lifespan)# 注册中间件
app.add_middleware(middleware.CORSMiddleware)
app.middleware("http")(middleware.log_request_middleware)# 挂载静态目录
app.mount("/static", StaticFiles(directory=settings.UPLOAD_DIR), name="static")# 包含路由
app.include_router(video.router, prefix='/api/video')
app.include_router(train.router, prefix='/api/train')if __name__ == '__main__':import uvicornuvicorn.run("app.main:app", host='0.0.0.0', port=4010, reload=True)
9. 中间件 (app/core/middleware.py)
import time
import logging
from fastapi.middleware.cors import CORSMiddleware
from fastapi import Request# 配置日志
logging.basicConfig(filename='request_logs.log', format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)def setup_cors(app):app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["GET", "POST"],allow_headers=["*"],)async def log_request_middleware(request: Request, call_next):start_time = time.time()response = await call_next(request)process_time = time.time() - start_timelog_message = (f"Method: {request.method}, Path: {request.url.path}, "f"Status: {response.status_code}, Time: {process_time:.2f}s, "f"URL: {request.url}")logging.info(log_message)return response
10. 环境文件 (.env)
# 数据库配置
DATABASE_URL=mysql://user:password@localhost:3306/video_service# 文件配置
UPLOAD_DIR=static/videos
ALLOWED_EXTENSIONS=.mp4,.mov,.avi,.mkv,.webm
MAX_FILE_SIZE_MB=2000
适配已有数据库表的要点
1. 禁用自动建表:
# database.pygenerate_schemas=False # 禁用自动创建表结构
2. 模型严格匹配表结构:
# models/video.pyclass VideoRecord(Model):# 字段与现有表严格对应class Meta:table = "video_records" # 指定物理表名table_description = None # 禁用ORM元数据修改
3. 手动验证表结构:
在应用启动后,建议运行以下脚本验证模型与表结构一致性:
from app.models.video import VideoRecordfrom tortoise import run_asyncasync def check_table():conn = Tortoise.get_connection("default")columns = await conn.execute_query("DESCRIBE video_records")print("现有表字段:", [col['Field'] for col in columns])if __name__ == "__main__":run_async(check_table())