5

基于Redis配置Celery

 3 years ago
source link: https://note.qidong.name/2020/08/celery-with-redis/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

基于Redis配置Celery

2020-08-27 20:54:47 +08  字数:1517  标签: Python

作为一个分布式异步计算框架,Celery虽然常用于Web框架中,但也可以单独使用。 虽然常规搭配的消息队列是RabbitMQ,但是由于某些情况下系统已经包含了Redis,那就可以复用。

以下撇开Web框架,介绍基于Redis配置Celery任务的方法。

pip install celery[redis]

项目结构

$ tree your_project
your_project
├── __init__.py
├── main.py
├── celery.py
└── tasks.py

0 directories, 4 files

其中,main.py是触发Task的业务代码。 当然,文件名可以随意改。 celery.pyCelery的app定义的位置,tasks.py是Task定义的位置,文件名不建议修改。

配置Celery

celery.py中写入如下代码:

from celery import Celery

from .settings import REDIS_URL

APP = Celery(
    main=__package__,
    broker=REDIS_URL,
    backend=REDIS_URL,
    include=[f'{__package__}.tasks'],
)

APP.conf.update(task_track_started=True)

其中,REDIS_URL从同一的配置settings.py中引入, 形式大概是redis://localhost:6379/0。 这里既用Redis来当broker,又用来当backend。 即,既当消息队列,又当结果反馈的数据库(默认仅保存1天)。

include=,需要填一个下游worker的包名列表。 这里选择了同一个包的tasks.py文件。

额外设置的task_track_started,是命令Worker反馈STARTED状态。 默认情况下,是无法知道任务什么时候开始执行的。

编写任务并调用

tasks.py文件中,添加异步任务的实现。

from .celery import APP

@APP.task
def do_sth():
    pass

在需要发起任务的地方,用.apply_async可以触发异步调用。 即,实际只是向消息队列发送消息,真正的执行操作在远程。

from celery.result import AsyncResult

from .tasks imprt do_sth

result = do_sth.apply_async()
assert isinstance(result, AsyncResult)

运行Worker:

celery -A your_project worker

运行原理

一次Task从触发到完成,序列图如下:

mainRedisworkerdo_sth.apply_async()AsyncResult(task_id)AsyncResult(task_id).statePENDINGKeep quering new messagesA task of do_sthAsyncResult(task_id).stateSTARTEDdo_sth()Write the resultAsyncResult(task_id).stateSUCCESSKeep quering new messagesNonemainRedisworker

其中,main代表业务代码主进程。 它可能是Django、Flask这类Web服务,也可能是一个其它类型的进程。 worker就是指Celery的Worker。

main发送消息后,会得到一个AsyncResult,其中包含task_id。 仅通过task_id,也可以自己构造一个AsyncResult,查询相关信息。 其中,代表运行过程的,主要是state

worker会持续保持对Redis(或其它消息队列,如RabbitMQ)的关注,查询新的消息。 如果获得新消息,将其消费后,开始运行do_sth。 运行完成会把返回值对应的结果,以及一些运行信息,回写到Redis(或其它backend,如Django数据库等)上。 在系统的任何地方,通过对应的AsyncResult(task_id)就可以查询到结果。

Celery Task的状态

以下是状态图:

PENDINGSTARTEDSUCCESSFAILURERETRYREVOKED

其中,除SUCCESS外,还有失败(FAILURE)、取消(REVOKED)两个结束状态。 而RETRY则是在设置了重试机制后,进入的临时等待状态。

另外,如果保存在Redis的结果信息被清理(默认仅保存1天),那么任务状态又会变成PENDING。 这在设计上是个巨大的问题,使用时要做对应容错。

常见控制操作

result = AsyncResult(task_id)
# 阻塞等待返回
result.wait()
# 取消任务
result.revoke()
# 删除任务记录
result.forget()

有时,在业务主进程中需要等待异步运行的结果,这时需要使用wait。 如果要取消一个排队中、或已执行的任务,则可以使用revoke。 即使任务已经执行完成,也可以使用revoke,但不会有任何变化。 如果需要提前删除任务记录,可以使用forget

参考

基本上都是参考官方文档。 也有看过一些StackOverFlow,但很多都是早期版本的方案,过时了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK