一、依赖环境

1、python解释器版本:python3.7.5

2、稳定依赖包

# Celery 核心
celery==5.2.7
kombu==5.2.4
billiard==3.6.4.0
vine==5.0.0# Redis broker backend
redis==4.3.6# eventlet (如果用 -P eventlet)【windows系统可以使用】
eventlet==0.33.3
greenlet==1.1.3# 避免 importlib_metadata API 冲突
importlib_metadata<5# Django相关
Django==3.2.25
django-celery-beat==2.3.0
django-celery-results==2.4.0# 数据库客户端
PyMySQL==1.1.1
# 日志包
concurrent-log-handler==0.9.28
# 数据库连接池包
django-db-connection-pool==1.0.7
# 启动django项目
uwsgi==2.0.22

二、项目结构

  • simple_system
    • settings.py  # django 配置文件
  • celery_app 
    • config   
      • config.py   # 主要配置
      • logger_config.py   # 日志配置
      • scheduler_config.py  # 定时任务配置
    • runs
      • celery_beat.service   # 使用systemctl启动定时进程
      • celery_worker.service  # 使用systemctl启动异步进程
    • tasks
      • task_async.py  # 存放异步任务
      • task_scheduler.py # 存放定时任务
    • celery.py  # celery 项目启动入口

gitee上项目源码

https://gitee.com/liuhaizhang/simple_systemhttps://gitee.com/liuhaizhang/simple_system

三、django相关配置

settings.py

# 使用pymysql代替mysqlclient
import pymysql
pymysql.install_as_MySQLdb()# 禁用debug模式
DEBUG = False
ALLOWED_HOSTS = ["*"]# mysql配置连接池
DATABASES = {"default": {"ENGINE": "dj_db_conn_pool.backends.mysql","NAME": 'simple_system',"USER": 'root',"PASSWORD": 'gs-root',"HOST": '127.0.0.1',"PORT": 3306,"ATOMIC_REQUESTS": True,"CHARSET": "utf8","COLLATION": "utf8_bin",# django-db-connection-pool 配置参数"POOL_OPTIONS": {"POOL_SIZE": 10,  # 基础连接量"MAX_OVERFLOW": 15,  # 最大允许溢出"RECYCLE": 1 * 60 * 60,  # 闲置的连接回收时间"TIMEOUT": 30,  # 获取连接超时时间"PRE_PING": True,  # 启用连接前检查(关键配置)},}
}# 时间本地化
LANGUAGE_CODE = "zh-hans"  # 改为简体中文(可选)
TIME_ZONE = "Asia/Shanghai"  # 关键修改:设置为上海时区(北京时间)
USE_I18N = True  # 禁用国际化,不然时间就是utc时间
USE_TZ = False  # 设置True的化,存到数据库的时间都是utc时间,False就是存本地时间# Redis broker,密码是gs-Admin1,去掉 :gs-Admin1@  就是没有配置密码
REDIS_BROKER_URL = 'redis://:gs-Admin1@127.0.0.1:6379/0'
REDIS_RESULT_BACKEND = 'redis://:gs-Admin1@127.0.0.1:6379/1'

四、celery配置

4.1、配置文件

celery_app/config/config.py

# 导入 Django 的环境
import os
import django
import sys# 导入django项目的根目录,可以随意导入模块
django_root_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(0, django_root_path)
# django配置文件
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "simple_system.settings")
# 导入django环境,异步/定时任务中,就可以随意调用django中的orm,cache等服务了
django.setup()from django.conf import settingsfrom celery_app.config.scheduler_config import beat_schedule# 显示指明异步/定时任务所在的py文件路径
tasks_module = ["celery_app.tasks.task_scheduler", "celery_app.tasks.task_async"]# celery的配置
config_dict = {"broker_url": settings.REDIS_BROKER_URL,  # 'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码"result_backend": settings.REDIS_RESULT_BACKEND,"task_serializer": "json","result_serializer": "json","accept_content": ["json"],"timezone": "Asia/Shanghai","enable_utc": False,"result_expires": 1 * 60 * 60,"beat_schedule": beat_schedule,
}"""
参数解析:
accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
result_serializer:结果序列化格式,默认值为json;
timezone:配置Celery以使用自定义时区;
enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
result_expires: 异步任务结果存活时长
beat_schedule:设置定时任务
"""

celery_app/config/config_logger.py

import os
from pathlib import Path
import logging
from logging.handlers import RotatingFileHandler# 项目根目录的绝对路径
BASE_DIR = Path(__file__).resolve().parent.parent.parent
# celery日志的绝对路径
LOGS_PATH = os.path.join(BASE_DIR, "logs")
if not os.path.exists(LOGS_PATH):os.makedirs(LOGS_PATH)# 日志配置(集成ConcurrentRotatingFileHandler)
LOGGING_CONFIG = {"version": 1,"disable_existing_loggers": False,"formatters": {"verbose": {"format": "[{asctime}][{levelname}][{filename}:{module}:{lineno:d}:{funcName}]:{message}","datefmt": "%Y-%m-%d %H:%M:%S","style": "{",},},"handlers": {# 使用第三方的ConcurrentRotatingFileHandler"concurrent_file": {"level": "INFO",# 第三方处理器的类路径(固定写法)"class": "concurrent_log_handler.ConcurrentRotatingFileHandler",# 日志文件路径"filename": os.path.join(LOGS_PATH, "celery.log"),# 单个文件最大大小(5MB)"maxBytes": 10 * 1024 * 1024,# 保留备份文件数量"backupCount": 15,# 编码格式"encoding": "utf-8",# 日志格式"formatter": "verbose",# 可选:多进程安全的额外参数(如锁定超时) 版本≥ 0.9.19才能使用(linux/unix系统才能使用)# "lockTimeout": 5,  # 等待日志锁的超时时间(秒)},},"loggers": {# Celery相关日志会使用上述处理器"celery.worker": {"handlers": ["concurrent_file"],"level": "INFO","propagate": True,},# 任务模块的日志也会被捕获"celery_app.tasks": {"handlers": ["concurrent_file"],"level": "INFO","propagate": False,  # 避免重复日志},},
}

celery_app/config/config_scheduler.py

from celery.schedules import crontab
from datetime import timedelta# 定时任务
beat_schedule = {  # 定时任务配置# 名字随意命名"add-func-3-seconds": {# 执行add_task下的addy函数"task": "celery_app.tasks.task_scheduler.add_func",  # 任务函数的导入路径# 每3秒执行一次"schedule": timedelta(minutes=30),# add函数传递的参数"args": (10, 21),},# 名字随意起"add-func-5-minutes": {"task": "celery_app.tasks.task_scheduler.add_func",  # 任务函数的导入路径# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务"schedule": crontab(minute="5"),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行"args": (19, 22),  # 定时任务需要的参数},# 查询orm表"task-ope_model-3-seconds":{# 执行add_task下的addy函数"task": "celery_app.tasks.task_scheduler.ope_model",  # 任务函数的导入路径# 每3秒执行一次"schedule": timedelta(minutes=30),},"task-rename_uwsgi_backup_log":{# 执行add_task下的addy函数"task": "celery_app.tasks.task_scheduler.rename_uwsgi_backup_log",  # 任务函数的导入路径# 每天凌晨零点执行一次"schedule": crontab(hour=0,minute=0),}
}

4.2、systemctl文件

celery_app/runs/celery_beat.service

[Unit]
Description=Celery Beat Service[Service]
# 运行用户(替换为你的用户名和组)
User=root
Group=root# 项目根目录ls,看到celery_app包
WorkingDirectory=/home/lhz/simple_system# 启动命令(使用虚拟环境中 celery 的绝对路径)
ExecStart=/home/lhz/venv/simple_system/bin/celery -A celery_app.celery beat -l info# 进程退出后自动重启
Restart=always
RestartSec=5# 环境变量(确保虚拟环境和项目依赖正常加载)
Environment="PATH=/home/lhz/venv/simple_system/bin"
Environment="PYTHONPATH=/home/lhz/simple_system"  # 确保项目能被导入[Install]
WantedBy=multi-user.target

celery_app/runs/celery_worker.service

[Unit]
Description=Celery Worker Service[Service]
# 运行用户和组
User=root
Group=root# 项目根目录,里面能看到 celery_app 包
WorkingDirectory=/home/lhz/simple_system# 启动命令(使用虚拟环境中的 celery)
ExecStart=/home/lhz/venv/simple_system/bin/celery -A celery_app.celery worker --loglevel=INFO# 进程退出后自动重启
Restart=always
RestartSec=5# 环境变量(确保虚拟环境和项目依赖正常加载)
Environment="PATH=/home/lhz/venv/simple_system/bin"
Environment="PYTHONPATH=/home/lhz/simple_system"[Install]
WantedBy=multi-user.target

4.3、任务模块

celery/tasks/task_async.py

from celery import shared_task
from celery_app.celery import logger@shared_task(bind=True)
def push_template_message(self):for i in range(10):logger.info(f"定时任务开始执行(任务ID:{self.request.id}),i={i}")

celery/tasks/task_scheduler.py

import os.pathfrom celery import shared_task
from home.models import UserModel
from celery_app.celery import logger
from celery_app.config.logger_config import LOGS_PATH
from datetime import datetime@shared_task(bind=True)
def add_func(self,a,b):for i in range(10):logger.info(f"定时任务开始执行(任务ID:{self.request.id}),a+b+i={a+b+i}")@shared_task(bind=True)
def ope_model(self):objs = UserModel.objects.all()for obj in objs:logger.info(f"异步任务开始执行(任务ID:{self.request.id}),{obj.account}")@shared_task(bind=True)
def rename_uwsgi_backup_log(self):# 将 根目录/logs中uwsgi.log.172384中的时间戳转成日期name_list = os.listdir(LOGS_PATH)name_pref = 'uwsgi.log.'task_id = f"任务ID={self.request.id}"for name in name_list:if name.startswith(name_pref):_,time_str = name.rsplit('.',1)try:time_int = int(time_str)date_str = datetime.fromtimestamp(int(time_int)).strftime("%Y-%m-%d_%H-%M-%S")new_name = f'{name_pref}{date_str}'old_path = os.path.join(LOGS_PATH,name)new_path = os.path.join(LOGS_PATH,new_name)if not os.path.exists(new_path):os.rename(old_path,new_path)logger.info(f'{task_id},{name} 重命名成{new_name}')else:for i in range(10):new_name += f'-{i+1}'new_path = os.path.join(LOGS_PATH,new_name)if not os.path.exists(new_path):os.rename(old_path, new_path)logger.info(f'{task_id},{name} 重命名成{new_name}')breakelse:logger.info(f"{task_id},{name}进行10次重命名都失败了")except Exception as e:logger.error(f"{task_id},{name}重命名失败")

4.4、celery主模块

celery_app/celery.py

from datetime import datetime
import logging.config
from celery import Celery
from celery_app.config.config import config_dict,tasks_module
from celery_app.config.logger_config import LOGGING_CONFIG# 实例化celery对象,传递一个名字
celery_app = Celery("celery_app")# 显式指定 任务函数所在的模块
celery_app.autodiscover_tasks(tasks_module)# 通过dictConfig加载日志配置
logging.config.dictConfig(LOGGING_CONFIG)# 导入配置消息
celery_app.conf.update(**config_dict)# 日志记录器,给任务使用
logger = logging.getLogger("celery_app.tasks")

五、测试使用

5.1、基本准备

1、安装好mysql、redis等

2、给user表创建3条记录

        get请求:/home/user/  

3、在视图中调用异步任务

        get请求:/home/test/

5.2、启动命令

异步任务启动

1、windows系统 (在django项目根目录下执行)

celery -A celery_app.celery worker --loglevel=info  -P  eventlet

2、linux系统(在django项目根目录下执行)

celery -A celery_app.celery worker --loglevel=info 

启动成功:

定时任务启动

windows/linux系统(在项目根目录下执行)

 celery -A celery_app.celery beat -l info

启动成功:

推送了定时任务:

六、迁移到自己项目

只需要将celery_app整个包都迁移即可

1、修改celery_app/config/config.py

# simple_system 改成 自己的项目名
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "simple_system.settings")

2、修改自己项目的settings.py

# Redis broker,密码是gs-Admin1,去掉 :gs-Admin1@  就是没有配置密码
REDIS_BROKER_URL = 'redis://:gs-Admin1@127.0.0.1:6379/0'
REDIS_RESULT_BACKEND = 'redis://:gs-Admin1@127.0.0.1:6379/1'

3、celery_app/tasks/task_scheduler.py

# 如果没有UserModel ,就不引入和删除此方法@shared_task(bind=True)
def ope_model(self):objs = UserModel.objects.all()for obj in objs:logger.info(f"异步任务开始执行(任务ID:{self.request.id}),{obj.account}")

4、celery_app/runs 中两个配置,根据实际情况修改

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/93621.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/93621.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/93621.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu18.04 LTS +RTL 8125 出现安装完系统后没有网络问题

Ubuntu18.04 LTS RTL 8125 出现安装完系统后没有网络问题问题描述最终解决方案1.下载对应的Realtek网卡驱动&#xff0c;使用命令lspci查看网卡信息安装网卡3.重启电脑记录过程1.内核升级方式1&#xff09;下载新的内核驱动2&#xff09;安装内核驱动3&#xff09;重启电脑4&am…

集成电路学习:什么是ARM CortexM处理器核心

ARM Cortex-M是ARM公司专为微控制器( Microcontroller)设计的处理器核心系列,它以其高性能、低功耗和易于开发的特点,在嵌入式系统和微控制器领域得到了广泛应用。以下是关于ARM Cortex-M的详细介绍: 一、ARM Cortex-M的概述 ARM Cortex-M系列处理器是基于ARM架构的高能效…

Apache Ignite 的分布式原子类型(Atomic Types)

以下的内容是关于 Apache Ignite 的分布式原子类型&#xff08;Atomic Types&#xff09;&#xff0c;主要包括 IgniteAtomicLong 和 IgniteAtomicReference。它们是 跨集群节点的“全局共享变量”&#xff0c;支持线程安全、原子性操作&#xff0c;即使多个节点同时访问也能保…

Leetcode 08 java

283. 移动零 提示 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。 示例 1: 输入: nums [0,1,0,3,12] 输出: [1,3,12,0,0] 示例 2: 输…

LeetCode 56 - 合并区间

思路 排序&#xff1a;将所有区间按起始点从小到大排序。贪心合并&#xff1a;初始化一个结果列表&#xff0c;放入第一个区间。然后遍历剩余区间&#xff0c;将当前区间与结果列表中的最后一个区间比较&#xff1a; 若重叠&#xff08;当前区间起点 ≤ 结果区间终点&#xff0…

DNS 正向查找与反向查找

DNS 区域是 DNS 中基本的组织单元&#xff0c;为域名定义了管理和权威边界。一个 DNS 区域通常包含一系列 DNS 资源记录&#xff0c;包括名称到地址的映射&#xff08;正向查找&#xff09;和地址到名称的映射&#xff08;反向查找&#xff09;。这些区域对于高效管理和解析网络…

Oracle ERP FORM开发 — 新增查询条件

1 根据值来查询具体流程步骤看第2节&#xff0c;这里提供核心的增加查询条件的触发器代码&#xff1a;1.1 可完全匹配的值比如“是”&#xff0c;“否”&#xff0c;“物料”&#xff0c;“”汽车 等等这些可以直接通过对应的值匹配&#xff0c;特点就是词语&#xff0c;短小。…

Flutter实现列表功能

在Flutter中,可以通过ListView和ListTile等组件来实现类似Android中RecyclerView和Adapter的功能。以下是一个通用的设计架构,用于设计列表数据: 1. 定义数据模型 首先,定义一个数据模型类,用于存储列表中每一项的数据。例如: class ItemModel {final String title;fi…

2.1、Redis的单线程本质和多线程的操作

Redis的单线程本质 1. 核心单线程部分 #mermaid-svg-iFErSltthPIEsuiP {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-iFErSltthPIEsuiP .error-icon{fill:#552222;}#mermaid-svg-iFErSltthPIEsuiP .error-text{fil…

文件权限值的表示方法

文章目录字符表示方法8 进制数值表示方法字符表示方法 Linux表示说明Linux表示说明r--只读-w-仅可写--x仅可执行rw-可读可写-wx可写可执行r-x可读可执行rwx可读可写可执行---无权限 8 进制数值表示方法 权限符号8进制2进制r4100w2010x1001rw6110rx5101wx3011rwx7111---0000

【38】WinForm入门到精通 ——WinForm平台为AnyCPU 无法切换为x64,也无法添加 x64及其他平台

WinForm 是 Windows Form 的简称&#xff0c;是基于 .NET Framework 平台的客户端&#xff08;PC软件&#xff09;开发技术&#xff0c;是 C# 语言中的一个重要应用。.NET 提供了大量 Windows 风格的控件和事件&#xff0c;可以直接拿来使用。本专栏内容是按照标题序号逐渐深入…

门控激活函数:GLU/GTU/Swish/HSwish/Mish/SwiGLU

10 门控激活函数 10.1 GLU&#xff1a;门控线性单元函数Gated Linear Unit10.2 GTU&#xff1a;门控Tanh单元函数Gated Tanh Unit自门控激活函数&#xff08;Self-gated activation function&#xff09;是一种通过自身机制动态调节信息流动的激活函数&#xff0c;其核心在于模…

Linux内核IPv4多播路由深度解析:从数据结构到高效转发

多播路由是网络通信的核心技术之一&#xff0c;Linux内核通过精密的多层设计实现了高性能的IPv4多播路由功能。本文将深入剖析其核心实现机制&#xff0c;揭示其高效运转的秘密。一、核心数据结构&#xff1a;路由系统的基石1. 多播路由表&#xff08;struct mr_table&#xff…

ffmpeg-7.1.1 下载安装 windows 版,MP4 转 m3u8 切片,遇到报错 Unrecognized option ‘vbsf‘的解决办法

工作中偶尔会需要造指定大小的文档文件&#xff0c;不要求内容&#xff0c;可以随意填充任意无毒内容&#xff0c;所以打算用ts文件填充&#xff0c;现记录下过程。一、下载 ffmpeg废话不多说&#xff0c;上链接&#xff0c;https://ffmpeg.org/会跳转新页面&#xff0c;向下拉…

Linux网络编程:网络基础概念(下)

目录 前言&#xff1a; 一、网络传输基本流程 1.1、认识MAC地址 1.2、认识IP地址 二、socket编程预备 2.1、端口号 2.2、传输层的代表 2.3、网络字节序 2.4、sockaddr 结构 总结&#xff1a; 前言&#xff1a; 大家好&#xff0c;上一篇文章&#xff0c;我们说到了…

亚马逊广告进阶指南:如何优化流量实现新品快速起量

“新品上架如何快速获取精准流量&#xff1f;”“如何降低ACOS同时提升转化率&#xff1f;”“竞品流量拦截有哪些高效方法&#xff1f;”“关键词广告和ASIN广告如何协同投放&#xff1f;”“人工投放效果不稳定&#xff0c;AI工具真的能解决问题吗&#xff1f;”如果你也在思…

路径平滑优化算法--Clothoid 路径平滑

路径平滑优化算法–Clothoid 路径平滑 文章目录路径平滑优化算法--Clothoid 路径平滑0 为什么选 Clothoid&#xff1f;1 数学基础&#xff1a;严谨推导&#xff08;Mathematical Foundation&#xff09;可视化解释1.1 曲率线性假设1.2 切向角&#xff08;Heading Angle&#…

PCB学习笔记(一)

文章目录一、PCB的制作过程了解1.1 覆铜板一、核心作用&#xff1a;制作印制电路板&#xff08;PCB&#xff09;二、不同类型覆铜板的针对性用途三、延伸应用1.2 覆铜板和信号线的关系一、覆铜板不是“全是铜”&#xff0c;原始结构是“绝缘基材铜箔”二、信号线就是铜&#xf…

【19】C# 窗体应用WinForm ——【列表框ListBox、复选列表框CheckedListBox】属性、方法、实例应用

文章目录9 复选列表框CheckedListBox10. 列表框ListBox10.1 实例&#xff1a;买菜10.2 实例&#xff1a;购菜 应用二WinForm 是 Windows Form 的简称&#xff0c;是基于 .NET Framework 平台的客户端&#xff08;PC软件&#xff09;开发技术&#xff0c;是 C# 语言中的一个重要…

新注册企业信息查询“数据大集网”:驱动企业增长的源头活水

商贸繁荣的齐鲁大地上&#xff0c;“赶大集”曾是千年传承的民间智慧。而今天&#xff0c;一场以新注册企业信息为核心的“数据大集”正悄然重塑商业生态——数据大集网&#xff0c;以“聚天下好数&#xff0c;促万企互联”为使命&#xff0c;将分散的企业信息转化为精准商机&a…