在这里插入图片描述

Celery在Django中的应用

  • 一、项目配置
  • 二、异步任务
    • 2.1 普通用法
      • 2.1.1 通过delay
      • 2.1.2 通过apply_async
    • 2.2 高级用法
      • 2.2.1 任务回调(Callback)
      • 2.2.2 任务链(Chaining)
      • 2.2.3 任务组(Group)
      • 2.2.4 任务和弦(Chord)
  • 三、定时任务
  • 四、启动celery
    • 4.1 命令行方式
    • 4.2 脚本方式(容易出问题,建议用命令行方式,很多默认配置内置好)
      • 4.2.1 启动worker(脚本方式博主暂时没找到好方法能捕获任务结果)
      • 4.2.2 启动beat
      • 4.2.3 合并启动worker和beat
  • 五、监控管理
    • 5.1 celery inspect
    • 5.2 celery control
    • 5.3 celery event
    • 5.4 celery multi
    • 5.5 celery purge
    • 5.6 celery flower

一、项目配置

1.1 确认celery及django版本相对应,本文使用django3.2celery5.5
1.2 创建一个名为CeleryStudydjango项目,以及一个名为test1_app,目录结构如下:
在这里插入图片描述
1.3 配置celery的setting参数(大部分不需要全局配置,可以针对tasks单独配置)

CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'django-db'  # django-db(使用 Django 数据库存储结果)
CELERY_ACCEPT_CONTENT = ['json'] # 指定 Celery 接受的任务序列化格式(避免反序列化安全问题)。
CELERY_TASK_SERIALIZER = 'json' # 指定任务的序列化方式
CELERY_RESULT_SERIALIZER = 'json' # 指定结果的序列化方式
CELERY_TIMEZONE = TIME_ZONE # 设置 Celery 的时区(影响定时任务的调度时间)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 指定定时任务调度器的后端
# 默认内存调度:celery.beat:PersistentScheduler(需配合 beat_schedule_filename)
# Redis 调度:celery.beat:RedisScheduler(需安装 celery-redis-scheduler)
CELERYD_CONCURRENCY = 4 # Worker 并发数(默认 CPU 核心数)
CELERY_BEAT_SCHEDULE = {  # 一般在celery.py文件配置'every-10-seconds': {'task': 'myapp.tasks.debug','schedule': 10.0,},
}
CELERY_BEAT_MAX_LOOP_INTERVAL = 300  # 秒, Beat 调度器的最大循环间隔(默认 5 分钟)
CELERY_TASK_TIME_LIMIT = 300  # 硬超时 5 分钟(任务被强制终止)
CELERY_TASK_SOFT_TIME_LIMIT = 240  # 软超时 4 分钟(触发 `SoftTimeLimitExceeded`)
CELERY_TASK_DEFAULT_RETRY_DELAY = 60  # 任务重试间隔 1 分钟
CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(levelname)s] %(message)s' # 自定义 Worker 日志格式
...... # 等等等等等等, 还有一大堆配置

1.4 celery全局配置

# celery.pyimport os
from celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryStudy.settings')  # 其作用是为 Django 提供配置文件的定位信息,确保框架能正确加载项目的各项设置app = Celery('CeleryStudy')		# celery实例,一般命名为项目名称
app.config_from_object('django.conf:settings', namespace='CELERY')	# celery实例从setting中CELERY开头的配置获取app.autodiscover_tasks()  # 自动发现并注册项目中定义的tasks,会发现 @shared_task 和 @app.task

1.5 修改项目init文件,通过给外部导入

# __init__.pyfrom .celery import app as celery_app__all__ = ("celery_app",)

1.6 在app中新建tasks写入任务逻辑

# tasks.py
"""
@app.task 是“专属任务”,绑定到具体应用,适合简单场景。
@shared_task 是“共享任务”,解耦于应用,适合复杂架构。
"""写法一:
from celery import Celery
app = Celery('proj')@app.task  # 绑定到当前 `app` 实例
def add(x, y):return x + y写法二: 
import time
from celery import shared_task@shared_task
def test_add(x, y):time.sleep(2)return x + y@shared_task
def pre_task_test(x):# 定时任务return x

二、异步任务

2.1 普通用法

2.1.1 通过delay

# views.pyfrom django.http import HttpResponse
from .tasks import test_add# Create your views here.def test_celery(request):result = test_add.delay(1, 5)return HttpResponse(result.task_id + ' : ' + result.status)

2.1.2 通过apply_async

# countdown: 延迟执行(秒)。
# eta: 指定具体执行时间(datetime)。
# queue: 指定任务队列。
# expires: 任务过期时间。
# retry: 是否启用重试from datetime import datetime, timedelta# 延迟 10 秒执行
test_add.apply_async(args=(1, 5), countdown=10)
# 指定具体执行时间
test_add.apply_async(args=(1, 5), eta=datetime.now() + timedelta(minutes=1))
# 指定队列和过期时间
test_add.apply_async(args=(1, 5), queue='priority', expires=3600)

2.2 高级用法

通过 signature 对象调用,预生成任务签名(task.s()),用于创建一个可序列化的任务调用对象。它允许你预定义任务及其参数,而无需立即执行,从而支持更灵活的任务组合(如链式调用、组调用等),签名对象是可序列化的,可以存储到数据库通过网络传递

sig = test_add.s(1, 5)  # 创建签名对象
sig.apply_async()       # 异步执行
sig.delay()             # 等价于 apply_async

2.2.1 任务回调(Callback)

在任务成功后触发另一个任务(通过 link 参数)

test_add.apply_async(args=(1, 5), link=send_notification.s("Task completed!"))

2.2.2 任务链(Chaining)

通过 | 符号或 chain() 将多个任务串联,前一个任务的结果作为后一个任务的输入

from celery import chain# 方法1:使用 | 符号
result = (task1.s(1, 2) | task2.s() | task3.s())()
# 方法2:使用 chain()
result = chain(task1.s(1, 2), task2.s(), task3.s())()

2.2.3 任务组(Group)

并行执行多个任务,等待所有任务完成

from celery import groupresult = group(task1.s(i) for i in range(10))()  # 并发执行 10 个 task1

2.2.4 任务和弦(Chord)

先并行执行一组任务(group),全部完成后执行一个汇总任务

from celery import chordresult = chord((task1.s(i) for i in range(10)), task2.s())()  # 10 个 task1 完成后执行 task2

三、定时任务

在celery文件中添加定时任务路由表

# celery.pyapp.conf.beat_schedule = {'task-name': {  # 任务名称(自定义)'task': 'myapp.tasks.my_task',  # 任务函数路径(需可导入)'schedule': 30,  # 执行时间规则(固定间隔)# 或 'schedule': crontab(minute='*/5'),  # Cron 表达式'args': (16, 16),  # 传递给任务的参数(可选)'options': {'queue': 'priority'},  # 其他选项(如指定队列)},# 可定义多个任务
}

通过安装pip install django-celery-beat可以实现在admin后台动态修改定时任务配置

INSTALLED_APPS = [...,'django_celery_beat',
]# 替换 Celery 的调度器
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

配置完记得迁移数据库 python manage.py migrate

在这里插入图片描述


四、启动celery

worker = 干活的(执行任务)。
beat = 发任务的(定时生成任务)。
协作关系:beat 是“计划部门”,worker是“执行部门”,两者通过 Broker(消息队列)解耦。
生产建议:分开启动,Worker 可横向扩展,Beat 保持单例
在这里插入图片描述
在这里插入图片描述

4.1 命令行方式

# 启动 Worker(处理任务)
celery -A CeleryStudy worker -l infocelery -A CeleryStudy worker -l info -P eventlet # windows环境下命令
"""
prefork 是 Celery 在 Linux 上的默认并发模型,它使用多进程(Multiprocessing)处理任务,
适合 CPU 密集型场景。但 Windows 系统不支持 fork() 系统调用,因此无法使用 prefork 池。
在 Windows 上尝试使用 prefork 会直接报错,导致 Worker 无法启动。eventlet 是一个基于协程(Coroutine)的并发库,通过绿色线程(Green Thread)实现高并发,
适合 I/O 密集型任务(如 Celery 的异步任务场景)。
在 Windows 上,eventlet 是少数可用的高性能并发池之一。
它通过非阻塞 I/O 和协程调度,避免了线程切换的开销,同时绕过了 GIL 的限制,
能显著提升任务处理效率
"""# 启动 Beat(调度任务)
celery -A CeleryStudy beat -l info# 合并启动
celery -A CeleryStudy worker --beat -l info

4.2 脚本方式(容易出问题,建议用命令行方式,很多默认配置内置好)

注:涉及django自定义管理命令,自己创建一个commands_app

4.2.1 启动worker(脚本方式博主暂时没找到好方法能捕获任务结果)

# CeleryStudy/commands_app/management/commands/run_celery_worker.py
from django.core.management.base import BaseCommand
from CeleryStudy.celery import app as celery_appclass Command(BaseCommand):def handle(self, *args, **options):worker = celery_app.Worker(hostname='worker1@%h',  # Worker 名称pool='eventlet',        # 进程池类型(prefork/solo/gevent)concurrency=4,         # 并发数loglevel='INFO',       # 日志级别logfile='/var/log/celery/worker.log',  # 日志文件(可选))worker.start()
python manage.py run_celery_worker

4.2.2 启动beat

# CeleryStudy/commands_app/management/commands/run_celery_beat.py
from django.core.management.base import BaseCommand
from CeleryStudy.celery import app as celery_appclass Command(BaseCommand):def handle(self, *args, **options):beat = celery_app.Beat(loglevel='INFO',logfile='/var/log/celery/beat.log',  # 日志文件(可选)scheduler='django_celery_beat.schedulers:DatabaseScheduler',  # 使用数据库调度)beat.run()
python manage.py run_celery_beat

4.2.3 合并启动worker和beat

# CeleryStudy/commands_app/management/commands/run_celery.py
from django.core.management.base import BaseCommand
from threading import Thread
from CeleryStudy.celery import app as celery_appclass Command(BaseCommand):def handle(self, *args, **options):# 配置定时任务(可选)celery_app.conf.beat_schedule = {'add-every-10-seconds': {'task': 'proj.celery.debug_task','schedule': 10.0,'args': (16, 16),},}# 启动 Workerworker = celery_app.Worker(hostname='worker1@%h',pool='prefork',concurrency=4,loglevel='INFO',)# 启动 Beatbeat = celery_app.Beat(loglevel='INFO',scheduler='django_celery_beat.schedulers:DatabaseScheduler',)"""在后台线程中运行 Beat如果直接调用 beat.run(),它会阻塞主线程,导致 Worker 无法启动因此,需要通过线程(Thread)将 Beat 放在后台运行,避免阻塞主线程"""beat_thread = Thread(target=beat.run)beat_thread.daemon = Truebeat_thread.start()# 启动 Workerworker.start()
python manage.py run_celery

五、监控管理

5.1 celery inspect

作用:检查 Worker 状态、任务信息等(无需停止服务)。

celery -A proj inspect active          # 查看正在执行的任务
celery -A proj inspect registered      # 查看已注册的任务列表
celery -A proj inspect scheduled       # 查看待执行的定时任务(需 Beat 运行)
celery -A proj inspect reserved        # 查看 Worker 已获取但未执行的任务
celery -A proj inspect stats           # 查看 Worker 统计信息(如任务处理数)

5.2 celery control

作用:动态控制 Worker 行为(如关闭、重启、调整并发数)。

celery -A proj control shutdown        # 优雅关闭所有 Worker
celery -A proj control add_consumer Q1 # 动态添加监听队列 Q1
celery -A proj control cancel_consumer Q1 # 动态移除监听队列 Q1
celery -A proj control pool_grow 10    # 增加 Worker 并发数到 10
celery -A proj control pool_shrink 5   # 减少 Worker 并发数到 5

5.3 celery event

作用:监控 Celery 事件(如任务开始、成功、失败),可用于自定义仪表盘。

celery -A proj events                  # 启动事件监控(输出到终端)
celery -A proj events -d dump          # 以 JSON 格式输出事件
celery -A proj events -f events.log    # 将事件记录到文件

5.4 celery multi

作用:同时启动多个 Worker 或 Beat 实例(适用于分布式部署)。

celery multi start w1 w2 -A proj -l info -Q high,low  # 启动两个 Worker,分别监听不同队列
celery multi stop w1 w2                               # 停止指定 Worker

5.5 celery purge

作用:清空消息队列中的所有任务

celery -A proj purge -Q celery       # 清空默认队列
celery -A proj purge -Q high,low    # 清空多个队列

5.6 celery flower

作用:启动基于 Web 的监控仪表盘(需单独安装 flower 包)。

pip install flower
celery -A CeleryStudy flower --port=5555     # 访问 http://localhost:5555

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/93254.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/93254.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/93254.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DeepSeek生成的高精度大数计算器

# 高精度计算器(精确显示版)1. **精确显示优化**:- 新增print_mpfr()函数专门处理MPFR数值的打印- 自动移除多余的尾随零和小数点- 确保所有浮点结果都以完整十进制形式显示,不使用科学计数法2. **浮点精度修复**:- 所…

08--深入解析C++ list:高效操作与实现原理

1. list介绍1.1. list概述template < class T, class Alloc allocator<T> > class list;Lists are sequence containers that allow constant time insert and erase operations anywhere within the sequence, and iteration in both directions.概述&#xff1…

GraphQL从入门到精通完整指南

目录 什么是GraphQLGraphQL核心概念GraphQL Schema定义语言查询(Queries)变更(Mutations)订阅(Subscriptions)Schema设计最佳实践服务端实现客户端使用高级特性性能优化实战项目 什么是GraphQL GraphQL是由Facebook开发的一种API查询语言和运行时。它为API提供了完整且易于理…

使用 Dockerfile 与 Docker Compose 结合+Docker-compose.yml 文件详解

使用 Dockerfile 与 Docker Compose 结合的完整流程 Dockerfile 用于定义单个容器的构建过程&#xff0c;而 Docker Compose 则用于编排多个容器。以下是结合使用两者的完整方法&#xff1a; 1. 创建 Dockerfile 在项目目录中创建 Dockerfile 定义应用镜像的构建过程&#xff1…

15 ABP Framework 开发工具

ABP Framework 开发工具 概述 该页面详细介绍了 ABP Framework 提供的开发工具和命令行界面&#xff08;CLI&#xff09;&#xff0c;用于创建、管理和定制 ABP 项目。ABP CLI 是主要开发工具&#xff0c;支持项目脚手架、模块添加、数据库迁移管理及常见开发任务自动化。 ABP …

力扣top100(day02-01)--链表01

160. 相交链表 /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode(int x) {* val x;* next null;* }* }*/ public class Solution {/*** 查找两个链表的相交节点* param headA 第一个…

LLM 中 语音编码与文本embeding的本质区别

直接使用语音编码,是什么形式,和文本的区别 直接使用语音编码的形式 语音编码是将模拟语音信号转换为数字信号的技术,其核心是对语音的声学特征进行数字化表征,直接承载语音的物理声学信息。其形式可分为以下几类: 1. 基于波形的编码(保留原始波形特征) 脉冲编码调制…

模型选择与调优

一、模型选择与调优在机器学习中&#xff0c;模型的选择和调优是一个重要的步骤&#xff0c;它直接影响到最终模型的性能1、交叉验证在任何有监督机器学习项目的模型构建阶段&#xff0c;我们训练模型的目的是从标记的示例中学习所有权重和偏差的最佳值如果我们使用相同的标记示…

vue+Django农产品推荐与价格预测系统、双推荐+机器学习预测+知识图谱

vueflask农产品推荐与价格预测系统、双推荐机器学习价格预测知识图谱文章结尾部分有CSDN官方提供的学长 联系方式名片 文章结尾部分有CSDN官方提供的学长 联系方式名片 关注B站&#xff0c;有好处&#xff01;编号: D010 技术架构: vueflaskmysqlneo4j 核心技术&#xff1a; 基…

数据分析小白训练营:基于python编程语言的Numpy库介绍(第三方库)(下篇)

衔接上篇文章&#xff1a;数据分析小白训练营&#xff1a;基于python编程语言的Numpy库介绍&#xff08;第三方库&#xff09;&#xff08;上篇&#xff09;&#xff08;十一&#xff09;数组的组合核心功能&#xff1a;一、生成基数组np.arange().reshape() 基础运算功能&…

负载因子(Load Factor) :哈希表(Hash Table)中的一个关键性能指标

负载因子&#xff08;Load Factor&#xff09; 是哈希表&#xff08;Hash Table&#xff09;中的一个关键性能指标&#xff0c;用于衡量哈希表的空间利用率和发生哈希冲突的可能性。一&#xff1a;定义负载因子&#xff08;通常用希腊字母 λ 表示&#xff09;的计算公式为&…

监控插件SkyWalking(一)原理

一、介绍 1、简介 SkyWalking 是一个 开源的 APM&#xff08;Application Performance Monitoring&#xff0c;应用性能监控&#xff09;和分布式追踪系统&#xff0c;主要用于监控、追踪、分析分布式系统中的调用链路、性能指标和日志。 它由 Apache 基金会托管&#xff0c;…

【接口自动化测试】---自动化框架pytest

目录 1、用例运行规则 2、pytest命令参数 3、pytest配置文件 4、前后置 5、断言 6、参数化---对函数的参数&#xff08;重要&#xff09; 7、fixture 7.1、基本用法 7.2、fixture嵌套&#xff1a; 7.3、请求多个fixture&#xff1a; 7.4、yield fixture 7.5、带参数…

Flink Stream API 源码走读 - socketTextStream

概述 本文深入分析了 Flink 中 socketTextStream() 方法的源码实现&#xff0c;从用户API调用到最终返回 DataStream 的完整流程。 核心知识点 1. socketTextStream 方法重载链 // 用户调用入口 env.socketTextStream("hostname", 9999)↓ 补充分隔符参数 env.socket…

待办事项小程序开发

1. 项目规划功能需求&#xff1a;添加待办事项标记完成/未完成删除待办事项分类或标签管理&#xff08;可选&#xff09;数据持久化&#xff08;本地存储&#xff09;2. 实现功能添加待办事项&#xff1a;监听输入框和按钮事件&#xff0c;将输入内容添加到列表。 标记完成/未完…

【C#】Region、Exclude的用法

在 C# 中&#xff0c;Region 和 Exclude 是与图形编程相关的概念&#xff0c;通常在使用 System.Drawing 命名空间进行 GDI 绘图时出现。它们主要用于定义和操作二维空间中的区域&#xff08;几何区域&#xff09;&#xff0c;常用于窗体裁剪、控件重绘、图形绘制优化等场景。 …

机器学习 - Kaggle项目实践(3)Digit Recognizer 手写数字识别

Digit Recognizer | Kaggle 题面 Digit Recognizer-CNN | Kaggle 下面代码的kaggle版本 使用CNN进行手写数字识别 学习到了网络搭建手法学习率退火数据增广 提高训练效果。 使用混淆矩阵 以及对分类出错概率最大的例子单独拎出来分析。 最终以99.546%正确率 排在 86/1035 …

新手如何高效运营亚马逊跨境电商:从传统SP广告到DeepBI智能策略

"为什么我的广告点击量很高但订单转化率却很低&#xff1f;""如何避免新品期广告预算被大词消耗殆尽&#xff1f;""为什么手动调整关键词和出价总是慢市场半拍&#xff1f;""竞品ASIN投放到底该怎么做才有效&#xff1f;""有没有…

【论文阅读 | CVPR 2024 | UniRGB-IR:通过适配器调优实现可见光-红外语义任务的统一框架】

论文阅读 | CVPR 2024 | UniRGB-IR&#xff1a;通过适配器调优实现可见光-红外语义任务的统一框架​1&&2. 摘要&&引言3.方法3.1 整体架构3.2 多模态特征池3.3 补充特征注入器3.4 适配器调优范式4 实验4.1 RGB-IR 目标检测4.2 RGB-IR 语义分割4.3 RGB-IR 显著目…

Hyperf 百度翻译接口实现方案

保留 HTML/XML 标签结构&#xff0c;仅翻译文本内容&#xff0c;避免破坏富文本格式。采用「HTML 解析 → 文本提取 → 批量翻译 → 回填」的流程。百度翻译集成方案&#xff1a;富文本内容翻译系统 HTML 解析 百度翻译 API 集成 文件结构 app/ ├── Controller/ │ └──…