下面提供一个基于Python的数字化管理系统框架,使用现代技术栈实现。这个系统将包含设备监控、故障管理、装配配套表生成、生产计划管理等功能。
系统架构
数字化生产管理系统
├── 设备监控模块
├── 故障管理模块
├── 产品装配管理模块
├── 生产计划管理模块
├── 数据集成接口
└── 报表导出模块
技术栈
后端: Python + FastAPI (RESTful API)
前端: Vue.js (可选) 或直接提供API供上层系统调用
数据库: PostgreSQL (关系型) + Redis (缓存)
任务队列: Celery (异步任务处理)
文件处理: openpyxl (Excel操作)
监控: Prometheus + Grafana (可选)
核心代码实现
1. 主应用框架 (main.py)
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routers import devices, faults, products, productionapp = FastAPI(title="数字化生产管理系统", version="1.0.0")# 允许跨域
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 包含各模块路由
app.include_router(devices.router)
app.include_router(faults.router)
app.include_router(products.router)
app.include_router(production.router)@app.get("/")
async def root():return {"message": "数字化生产管理系统 API"}
2. 设备监控模块 (devices.py)
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
from datetime import datetime
import psycopg2
import redisrouter = APIRouter(prefix="/devices", tags=["设备管理"])# Redis连接
r = redis.Redis(host='localhost', port=6379, db=0)# 数据库连接配置
DB_CONFIG = {"host": "localhost","database": "production_db","user": "postgres","password": "password"
}class Device(BaseModel):id: strname: strstatus: strlast_heartbeat: datetimeip_address: str@router.get("/", response_model=List[Device])
async def get_online_devices():"""获取所有在线设备状态"""conn = psycopg2.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("SELECT * FROM devices WHERE status = 'online'")devices = cursor.fetchall()cursor.close()conn.close()return [Device(id=d[0], name=d[1], status=d[2], last_heartbeat=d[3], ip_address=d[4]) for d in devices]@router.get("/{device_id}/status")
async def get_device_status(device_id: str):"""获取特定设备状态"""# 先从Redis缓存获取status = r.get(f"device:{device_id}:status")if status:return {"device_id": device_id, "status": status.decode()}# 缓存没有则查数据库conn = psycopg2.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("SELECT status FROM devices WHERE id = %s", (device_id,))result = cursor.fetchone()cursor.close()conn.close()if not result:raise HTTPException(status_code=404, detail="Device not found")# 更新缓存r.setex(f"device:{device_id}:status", 30, result[0])return {"device_id": device_id, "status": result[0]}
3. 故障管理模块 (faults.py)
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
from datetime import datetime
import psycopg2router = APIRouter(prefix="/faults", tags=["故障管理"])class Fault(BaseModel):id: intdevice_id: strfault_code: strdescription: strtimestamp: datetimeresolved: boolclass FaultCreate(BaseModel):device_id: strfault_code: strdescription: str@router.post("/", response_model=Fault)
async def report_fault(fault: FaultCreate):"""报告设备故障"""conn = psycopg2.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("INSERT INTO faults (device_id, fault_code, description, timestamp, resolved) ""VALUES (%s, %s, %s, %s, %s) RETURNING id",(fault.device_id, fault.fault_code, fault.description, datetime.now(), False))fault_id = cursor.fetchone()[0]conn.commit()# 更新设备状态为故障cursor.execute("UPDATE devices SET status = 'fault' WHERE id = %s",(fault.device_id,))conn.commit()cursor.close()conn.close()return {"id": fault_id,**fault.dict(),"timestamp": datetime.now(),"resolved": False}@router.get("/unresolved", response_model=List[Fault])
async def get_unresolved_faults():"""获取未解决的故障列表"""conn = psycopg2.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("SELECT * FROM faults WHERE resolved = FALSE")faults = cursor.fetchall()cursor.close()conn.close()return [Fault(id=f[0], device_id=f[1], fault_code=f[2], description=f[3], timestamp=f[4], resolved=f[5]) for f in faults]
4. 产品装配管理模块 (products.py)
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
import psycopg2
from openpyxl import Workbook
from io import BytesIO
from fastapi.responses import StreamingResponserouter = APIRouter(prefix="/products", tags=["产品装配管理"])class Component(BaseModel):code: strname: strquantity: intclass ProductAssembly(BaseModel):product_id: strproduct_name: strcomponents: List[Component]@router.get("/{product_id}/assembly", response_model=ProductAssembly)
async def get_product_assembly(product_id: str):"""获取产品装配配套表"""conn = psycopg2.connect(**DB_CONFIG)cursor = conn.cursor()# 获取产品信息cursor.execute("SELECT name FROM products WHERE id = %s", (product_id,))product = cursor.fetchone()if not product:raise HTTPException(status_code=404, detail="Product not found")# 获取零部件列表cursor.execute("SELECT c.code, c.name, pc.quantity ""FROM product_components pc ""JOIN components c ON pc.component_id = c.id ""WHERE pc.product_id = %s", (product_id,))components = cursor.fetchall()cursor.close()conn.close()return ProductAssembly(product_id=product_id,product_name=product[0],components=[Component(code=c[0], name=c[1], quantity=c[2]) for c in components])@router.get("/{product_id}/assembly/excel")
async def export_product_assembly_excel(product_id: str):"""导出产品装配配套表为Excel"""assembly = await get_product_assembly(product_id)# 创建Excel文件wb = Workbook()ws = wb.activews.title = "装配配套表"# 写入标题ws.append(["产品编号", assembly.product_id])ws.append(["产品名称", assembly.product_name])ws.append([])ws.append(["零部件代号", "零部件名称", "数量"])# 写入零部件数据for comp in assembly.components:ws.append([comp.code, comp.name, comp.quantity])# 保存到内存excel_file = BytesIO()wb.save(excel_file)excel_file.seek(0)# 返回文件下载return StreamingResponse(excel_file,media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",headers={"Content-Disposition": f"attachment; filename=assembly_{product_id}.xlsx"})
5. 生产计划管理模块 (production.py)
from fastapi import APIRouter, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import List
from datetime import date
import psycopg2
import openpyxl
import csv
import iorouter = APIRouter(prefix="/production", tags=["生产计划管理"])class MasterPlan(BaseModel):id: strproduct_id: strquantity: intstart_date: dateend_date: datepriority: intclass WorkshopPlan(BaseModel):id: strmaster_plan_id: strworkshop_id: strproduct_id: strquantity: intplanned_start: dateplanned_end: dateactual_start: date = Noneactual_end: date = Nonestatus: str = "pending" # pending, in_progress, completed@router.post("/generate-workshop-plans")
async def generate_workshop_plans(master_plan_id: str):"""根据主生产计划生成车间级生产计划"""conn = psycopg2.connect(**DB_CONFIG)cursor = conn.cursor()# 获取主生产计划cursor.execute("SELECT * FROM master_production_plans WHERE id = %s", (master_plan_id,))master_plan = cursor.fetchone()if not master_plan:raise HTTPException(status_code=404, detail="Master plan not found")# 这里应该有更复杂的逻辑来分配车间任务# 简化为直接为每个车间创建相同的任务cursor.execute("SELECT id FROM workshops")workshops = cursor.fetchall()plans = []for workshop in workshops:cursor.execute("INSERT INTO workshop_plans ""(master_plan_id, workshop_id, product_id, quantity, planned_start, planned_end, status) ""VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id",(master_plan_id,workshop[0],master_plan[1], # product_idmaster_plan[2], # quantitymaster_plan[3], # start_datemaster_plan[4], # end_date"pending"))plan_id = cursor.fetchone()[0]plans.append(plan_id)conn.commit()cursor.close()conn.close()return {"message": f"Generated {len(plans)} workshop plans", "plan_ids": plans}@router.post("/import-workshop-plans/excel")
async def import_workshop_plans_excel(file: UploadFile = File(...)):"""通过Excel批量导入车间生产计划"""contents = await file.read()wb = openpyxl.load_workbook(io.BytesIO(contents))ws = wb.activeplans = []for row in ws.iter_rows(min_row=2, values_only=True): # 假设第一行是标题if not row[0]: # 跳过空行continueplan = {"master_plan_id": row[0],"workshop_id": row[1],"product_id": row[2],"quantity": row[3],"planned_start": row[4],"planned_end": row[5]}plans.append(plan)# 保存到数据库conn = psycopg2.connect(**DB_CONFIG)cursor = conn.cursor()for plan in plans:cursor.execute("INSERT INTO workshop_plans ""(master_plan_id, workshop_id, product_id, quantity, planned_start, planned_end, status) ""VALUES (%s, %s, %s, %s, %s, %s, %s)",(plan["master_plan_id"],plan["workshop_id"],plan["product_id"],plan["quantity"],plan["planned_start"],plan["planned_end"],"pending"))conn.commit()cursor.close()conn.close()return {"message": f"Imported {len(plans)} workshop plans successfully"}@router.get("/workshop-plans/{workshop_id}", response_model=List[WorkshopPlan])
async def get_workshop_plans(workshop_id: str):"""获取车间生产计划及执行情况"""conn = psycopg2.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("SELECT id, master_plan_id, workshop_id, product_id, quantity, ""planned_start, planned_end, actual_start, actual_end, status ""FROM workshop_plans WHERE workshop_id = %s",(workshop_id,))plans = cursor.fetchall()cursor.close()conn.close()return [WorkshopPlan(id=p[0],master_plan_id=p[1],workshop_id=p[2],product_id=p[3],quantity=p[4],planned_start=p[5],planned_end=p[6],actual_start=p[7],actual_end=p[8],status=p[9]) for p in plans]
6. 数据库模型 (SQL)
-- 设备表
CREATE TABLE devices (id VARCHAR(50) PRIMARY KEY,name VARCHAR(100) NOT NULL,status VARCHAR(20) NOT NULL CHECK (status IN ('online', 'offline', 'fault', 'maintenance')),last_heartbeat TIMESTAMP,ip_address VARCHAR(50)
);-- 故障表
CREATE TABLE faults (id SERIAL PRIMARY KEY,device_id VARCHAR(50) REFERENCES devices(id),fault_code VARCHAR(50) NOT NULL,description TEXT,timestamp TIMESTAMP NOT NULL,resolved BOOLEAN NOT NULL DEFAULT FALSE
);-- 产品表
CREATE TABLE products (id VARCHAR(50) PRIMARY KEY,name VARCHAR(100) NOT NULL
);-- 零部件表
CREATE TABLE components (id VARCHAR(50) PRIMARY KEY,code VARCHAR(50) NOT NULL,name VARCHAR(100) NOT NULL
);-- 产品-零部件关联表
CREATE TABLE product_components (product_id VARCHAR(50) REFERENCES products(id),component_id VARCHAR(50) REFERENCES components(id),quantity INTEGER NOT NULL,PRIMARY KEY (product_id, component_id)
);-- 主生产计划表
CREATE TABLE master_production_plans (id VARCHAR(50) PRIMARY KEY,product_id VARCHAR(50) REFERENCES products(id),quantity INTEGER NOT NULL,start_date DATE NOT NULL,end_date DATE NOT NULL,priority INTEGER NOT NULL
);-- 车间表
CREATE TABLE workshops (id VARCHAR(50) PRIMARY KEY,name VARCHAR(100) NOT NULL
);-- 车间生产计划表
CREATE TABLE workshop_plans (id VARCHAR(50) PRIMARY KEY,master_plan_id VARCHAR(50) REFERENCES master_production_plans(id),workshop_id VARCHAR(50) REFERENCES workshops(id),product_id VARCHAR(50) REFERENCES products(id),quantity INTEGER NOT NULL,planned_start DATE NOT NULL,planned_end DATE NOT NULL,actual_start DATE,actual_end DATE,status VARCHAR(20) NOT NULL CHECK (status IN ('pending', 'in_progress', 'completed', 'delayed'))
);