目录
Python实例题
题目
要求:
解题思路:
代码实现:
Python实例题
题目
基于 Flask 的在线聊天系统
要求:
- 使用 Flask 框架构建一个实时在线聊天系统,支持以下功能:
- 用户注册、登录和个人资料管理
- 一对一实时聊天功能
- 群聊功能
- 消息通知和未读消息提示
- 在线用户状态显示
- 使用 Flask-SocketIO 实现实时通信。
- 使用 SQLite 数据库存储用户、聊天记录等信息。
- 添加美观的前端界面,支持响应式设计。
解题思路:
- 使用 Flask 蓝图组织代码结构。
- 通过 Flask-Login 处理用户认证。
- 使用 Flask-SocketIO 实现实时消息推送。
- 使用 Flask-SQLAlchemy 管理数据库操作。
- 结合 Bootstrap 实现美观的前端界面。
代码实现:
from flask import Flask, render_template, request, redirect, url_for, flash, session, Blueprint
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from flask_socketio import SocketIO, emit, join_room, leave_room, send
import os
import time
from datetime import datetime# 初始化 Flask 应用
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(app.root_path, 'chat.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False# 初始化数据库
db = SQLAlchemy(app)# 初始化 Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'auth.login'# 初始化 SocketIO
socketio = SocketIO(app, async_mode='eventlet')# 数据模型
class User(UserMixin, db.Model):id = db.Column(db.Integer, primary_key=True)username = db.Column(db.String(80), unique=True, nullable=False)password_hash = db.Column(db.String(120), nullable=False)avatar = db.Column(db.String(100), default='default.png')online = db.Column(db.Boolean, default=False)last_seen = db.Column(db.DateTime)messages_sent = db.relationship('Message', foreign_keys='Message.sender_id',backref='sender', lazy='dynamic')messages_received = db.relationship('Message', foreign_keys='Message.recipient_id',backref='recipient', lazy='dynamic')def set_password(self, password):self.password_hash = generate_password_hash(password)def check_password(self, password):return check_password_hash(self.password_hash, password)def get_unread_messages_count(self):return Message.query.filter_by(recipient_id=self.id, read=False).count()class Message(db.Model):id = db.Column(db.Integer, primary_key=True)sender_id = db.Column(db.Integer, db.ForeignKey('user.id'))recipient_id = db.Column(db.Integer, db.ForeignKey('user.id'))content = db.Column(db.Text, nullable=False)timestamp = db.Column(db.DateTime, default=datetime.utcnow)read = db.Column(db.Boolean, default=False)class ChatRoom(db.Model):id = db.Column(db.Integer, primary_key=True)name = db.Column(db.String(100), nullable=False)is_group = db.Column(db.Boolean, default=False)creator_id = db.Column(db.Integer, db.ForeignKey('user.id'))members = db.relationship('User', secondary='chat_room_member', backref='chat_rooms')class ChatRoomMember(db.Model):id = db.Column(db.Integer, primary_key=True)chat_room_id = db.Column(db.Integer, db.ForeignKey('chat_room.id'))user_id = db.Column(db.Integer, db.ForeignKey('user.id'))joined_at = db.Column(db.DateTime, default=datetime.utcnow)class GroupMessage(db.Model):id = db.Column(db.Integer, primary_key=True)chat_room_id = db.Column(db.Integer, db.ForeignKey('chat_room.id'))sender_id = db.Column(db.Integer, db.ForeignKey('user.id'))content = db.Column(db.Text, nullable=False)timestamp = db.Column(db.DateTime, default=datetime.utcnow)# 用户加载回调
@login_manager.user_loader
def load_user(user_id):return User.query.get(int(user_id))# 认证蓝图
auth = Blueprint('auth', __name__)@auth.route('/register', methods=['GET', 'POST'])
def register():if request.method == 'POST':username = request.form['username']password = request.form['password']if User.query.filter_by(username=username).first():flash('用户名已存在', 'error')return redirect(url_for('auth.register'))user = User(username=username)user.set_password(password)db.session.add(user)db.session.commit()flash('注册成功,请登录', 'success')return redirect(url_for('auth.login'))return render_template('register.html')@auth.route('/login', methods=['GET', 'POST'])
def login():if request.method == 'POST':username = request.form['username']password = request.form['password']user = User.query.filter_by(username=username).first()if user and user.check_password(password):login_user(user)user.online = Trueuser.last_seen = datetime.utcnow()db.session.commit()# 存储用户会话信息session['user_id'] = user.idflash('登录成功', 'success')return redirect(url_for('main.index'))else:flash('用户名或密码错误', 'error')return render_template('login.html')@auth.route('/logout')
@login_required
def logout():# 更新用户在线状态user = User.query.get(current_user.id)user.online = Falseuser.last_seen = datetime.utcnow()db.session.commit()logout_user()flash('已注销', 'success')return redirect(url_for('auth.login'))# 主蓝图
main = Blueprint('main', __name__)@main.route('/')
@login_required
def index():# 获取所有用户(除当前用户外)users = User.query.filter(User.id != current_user.id).order_by(User.username).all()# 获取当前用户的所有聊天组chat_rooms = ChatRoom.query.join(ChatRoomMember).filter(ChatRoomMember.user_id == current_user.id).all()# 获取未读消息数量unread_counts = {}for user in users:unread_counts[user.id] = Message.query.filter_by(recipient_id=current_user.id,sender_id=user.id,read=False).count()return render_template('index.html', users=users, chat_rooms=chat_rooms, unread_counts=unread_counts)@main.route('/user/<int:user_id>/chat')
@login_required
def user_chat(user_id):recipient = User.query.get_or_404(user_id)# 将当前对话的消息标记为已读Message.query.filter_by(sender_id=recipient.id,recipient_id=current_user.id,read=False).update({Message.read: True})db.session.commit()# 获取所有用户(除当前用户外)users = User.query.filter(User.id != current_user.id).order_by(User.username).all()# 获取当前用户的所有聊天组chat_rooms = ChatRoom.query.join(ChatRoomMember).filter(ChatRoomMember.user_id == current_user.id).all()# 获取未读消息数量unread_counts = {}for user in users:unread_counts[user.id] = Message.query.filter_by(recipient_id=current_user.id,sender_id=user.id,read=False).count()return render_template('user_chat.html', recipient=recipient, users=users, chat_rooms=chat_rooms, unread_counts=unread_counts)@main.route('/group/<int:group_id>/chat')
@login_required
def group_chat(group_id):chat_room = ChatRoom.query.get_or_404(group_id)# 检查当前用户是否是该组的成员is_member = ChatRoomMember.query.filter_by(chat_room_id=group_id,user_id=current_user.id).first() is not Noneif not is_member:flash('你不是该组的成员', 'error')return redirect(url_for('main.index'))# 获取所有用户(除当前用户外)users = User.query.filter(User.id != current_user.id).order_by(User.username).all()# 获取当前用户的所有聊天组chat_rooms = ChatRoom.query.join(ChatRoomMember).filter(ChatRoomMember.user_id == current_user.id).all()# 获取未读消息数量unread_counts = {}for user in users:unread_counts[user.id] = Message.query.filter_by(recipient_id=current_user.id,sender_id=user.id,read=False).count()return render_template('group_chat.html', chat_room=chat_room, users=users, chat_rooms=chat_rooms, unread_counts=unread_counts)@main.route('/create-group', methods=['GET', 'POST'])
@login_required
def create_group():if request.method == 'POST':group_name = request.form['group_name']member_ids = request.form.getlist('members')if not group_name:flash('组名不能为空', 'error')return redirect(url_for('main.create_group'))# 创建新组group = ChatRoom(name=group_name, is_group=True, creator_id=current_user.id)db.session.add(group)db.session.commit()# 添加创建者为成员member = ChatRoomMember(chat_room_id=group.id, user_id=current_user.id)db.session.add(member)# 添加其他成员for member_id in member_ids:if int(member_id) != current_user.id:member = ChatRoomMember(chat_room_id=group.id, user_id=member_id)db.session.add(member)db.session.commit()flash('组创建成功', 'success')return redirect(url_for('main.group_chat', group_id=group.id))# 获取所有用户(除当前用户外)users = User.query.filter(User.id != current_user.id).order_by(User.username).all()return render_template('create_group.html', users=users)@main.route('/profile', methods=['GET', 'POST'])
@login_required
def profile():if request.method == 'POST':username = request.form['username']avatar = request.form['avatar']if username != current_user.username and User.query.filter_by(username=username).first():flash('用户名已存在', 'error')return redirect(url_for('main.profile'))current_user.username = usernamecurrent_user.avatar = avatardb.session.commit()flash('个人资料已更新', 'success')return redirect(url_for('main.profile'))return render_template('profile.html')# SocketIO 事件处理
@socketio.on('connect')
def handle_connect():user_id = session.get('user_id')if user_id:user = User.query.get(user_id)if user:user.online = Trueuser.last_seen = datetime.utcnow()db.session.commit()# 通知所有在线用户更新用户状态update_user_status(user)@socketio.on('disconnect')
def handle_disconnect():user_id = session.get('user_id')if user_id:user = User.query.get(user_id)if user:user.online = Falseuser.last_seen = datetime.utcnow()db.session.commit()# 通知所有在线用户更新用户状态update_user_status(user)@socketio.on('private_message')
def handle_private_message(data):sender_id = session.get('user_id')recipient_id = data['recipient_id']content = data['content']if sender_id and recipient_id and content:# 保存消息到数据库message = Message(sender_id=sender_id,recipient_id=recipient_id,content=content)db.session.add(message)db.session.commit()# 发送消息给接收者recipient = User.query.get(recipient_id)if recipient.online:emit('new_message', {'sender_id': sender_id,'recipient_id': recipient_id,'content': content,'timestamp': message.timestamp.strftime('%Y-%m-%d %H:%M:%S')}, room=str(recipient_id))# 发送消息给发送者自己emit('new_message', {'sender_id': sender_id,'recipient_id': recipient_id,'content': content,'timestamp': message.timestamp.strftime('%Y-%m-%d %H:%M:%S')}, room=str(sender_id))@socketio.on('join_room')
def handle_join_room(data):room_id = data['room_id']join_room(room_id)@socketio.on('leave_room')
def handle_leave_room(data):room_id = data['room_id']leave_room(room_id)@socketio.on('group_message')
def handle_group_message(data):sender_id = session.get('user_id')chat_room_id = data['chat_room_id']content = data['content']if sender_id and chat_room_id and content:# 保存消息到数据库group_message = GroupMessage(chat_room_id=chat_room_id,sender_id=sender_id,content=content)db.session.add(group_message)db.session.commit()# 发送消息到群组emit('new_group_message', {'sender_id': sender_id,'chat_room_id': chat_room_id,'content': content,'timestamp': group_message.timestamp.strftime('%Y-%m-%d %H:%M:%S')}, room=str(chat_room_id))def update_user_status(user):"""通知所有在线用户更新用户状态"""emit('user_status_update', {'user_id': user.id,'online': user.online,'last_seen': user.last_seen.strftime('%Y-%m-%d %H:%M:%S') if user.last_seen else ''}, broadcast=True)# 注册蓝图
app.register_blueprint(main)
app.register_blueprint(auth)# 创建数据库表
with app.app_context():db.create_all()if __name__ == '__main__':socketio.run(app, debug=True)