

python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务
source link: https://v3u.cn/a_id_99
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务
在之前的一篇文章中提到了用Django+Celery+Redis实现了异步任务队列,只不过消息中间件使用了redis,redis作为消息中间件可谓是差强人意,功能和性能上都不如Rabbitmq,所以本次使用tornado框架结合celery,同时消息中间件使用Rabbitmq来实现异步发邮件,并且使用flower来监控任务队列。
首先安装rabbitmq
Mac os直接运行brew命令安装
#安装服务
brew install rabbitmq
#启动服务
brew services start rabbitmq
Win10系统就要下载安装包进行安装了,由于rabbitmq是基于erlang的,所以要首先安装erlang
1、首先,下载并运行Erlang for Windows 安装程序 (地址:http://www.erlang.org/downloads)下载完毕并安装(注 意:安装目录请选择默认目录)

2、下载 RabbitMQ,(地址:http://www.rabbitmq.com/download.html )(注意:安装目录请选择默认目录)

安装成功后,启用web管理UI,进入RabbitMQ Serverrabbitmq_server-3.6.6sbin,输入命令rabbitmq-plugins enable rabbitmq_management
在系统的开始菜单里找到RabbitMQ的启动菜单,启动服务
浏览器输入,http://localhost:15672/,使用默认用户guest/guest进入网页端控制台:

代表没有问题了
然后安装tornado和celery,注意指定版本号
pip3 install tornado==5.1.1
pip3 install celery ==3.1
pip3 install pika ==0.9.14
pip3 install tornado-celery
pip3 install flower
需要注意一点,由于python3.7中async已经作为关键字存在,但是有的三方库还没有及时修正,导致它们自己声明的变量和系统关键字重名,所以我们要深入三方库的源码,帮他们修改async关键字为async_my,需要修改的文件夹和文件包含但不限于:
/site-packages/pika/adapters/libev_connection.py
/site-packages/celery下面的文件
/site-packages/kombu下面的文件夹
在tornado项目下新建一个任务队列文件task.py:
import time
from celery import Celery
from func_tool import mail
C_FORCE_ROOT=True
celery = Celery("tasks", broker="amqp://guest:guest@localhost:5672")
celery.conf.CELERY_RESULT_BACKEND = "amqp"
@celery.task
def sleep(seconds):
time.sleep(float(seconds))
return seconds
@celery.task
def sendmail(title,text,tomail):
mail(title,text,tomail)
return '发送邮件成功'
if __name__ == "__main__":
celery.start()
然后编写服务端代码:
from celery import Celery
from tornado import gen
import tcelery
sys.path.append("..")
import task
#异步任务
class CeleryHandler(BaseHandler):
@gen.coroutine
def get(self):
response = yield gen.Task(task.sendmail.apply_async,args=['你好','非常好','[email protected]'])
self.write('ok')
self.finish()
路由器代码:
import tornado.web
from views import Index
import config
#路由
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/celery", Index.CeleryHandler)
]
super(Application,self).__init__(handlers,**config.setting)
程序入口代码server.py:
import tornado.ioloop
import tornado.httpserver
import config
from application import Application
if __name__ == "__main__":
print('启动...')
app = Application()
httpServer = tornado.httpserver.HTTPServer(app)
# httpServer.listen(8888)
#绑定端口
httpServer.bind(config.options['port'])
#开启5个子进程(默认1,若为None或者小于0,开启对应硬件的CPU核心数个子进程)
httpServer.start(1)
tornado.ioloop.IOLoop.current().start()
进入项目目录,分别启动tornado服务,celery服务,以及flower服务
python server.py
celery -A task worker --loglevel=info
celery flower -A task --broker=amqp://guest:guest@localhost:5672//
访问网址http://localhost:8000/celery 用来触发异步任务
后台服务显示任务返回值:

进入flower在线任务监控网址:http://localhost:5555/

至此,整个流程就走完了。
Recommend
-
220
引入 队列对于任何语言来说都是重要的,io 的串行,请求的并行等等。在 JavaScript 中,又由于单线程的原因,异步编程又是非常重要的。昨天由一道面试题的启发,我去实现 JS 中的异步队列的时候,借鉴了 express 中间件思想,并发散到 co 实现
-
3
王霸雄图荣华敝屣,谈笑间尽归尘土|基于Python3双队列数据结构搭建股票/外汇交易匹配撮合系统 首页 - Python /2021-04-22...
-
7
官方教程:Installing on Windows 安装Erlang 根据官网的版本要求文档:RabbitMQ Erlang Version Requiremen...
-
5
记录实现一个异步并发 worker 队列的过程 2022-04-01 约 2805 字 预计阅读 6 分钟 原文链接: 实现异步并发 worker 队列
-
7
虚拟机规划 hostname 控制台地址 192.168.247.150 ...
-
3
基于python3.7利用Motor来异步读写Mongodb提高效率首页 - Python/2019-09-06 如果使用Python做大型海量数据...
-
7
使用Docker-compose来封装celery4.1+rabbitmq3.7服务,实现微服务架构首页 - Mac & Linux/2019-09-28...
-
5
使用Python3.7+Tornado5.1集成新浪微博三方登录(无需企业资质)首页 - Python/2020-03-11 新浪微博:山...
-
3
python3.7+Django2.0.4配合vue.js2.0实现又拍云(upyun.cm)存储的异步拖拽文件上传功能首页 - Python/2020-03-21...
-
6
Python3的原生协程(Async/Await)和Tornado异步非阻塞首页 - Python/2019-09-20
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK