5

python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务

 2 years ago
source link: https://v3u.cn/a_id_99
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务

首页 - Python/2019-07-31

    在之前的一篇文章中提到了用Django+Celery+Redis实现了异步任务队列,只不过消息中间件使用了redis,redis作为消息中间件可谓是差强人意,功能和性能上都不如Rabbitmq,所以本次使用tornado框架结合celery,同时消息中间件使用Rabbitmq来实现异步发邮件,并且使用flower来监控任务队列。

    首先安装rabbitmq

    Mac os直接运行brew命令安装

#安装服务
brew install rabbitmq
#启动服务
brew services start rabbitmq

    Win10系统就要下载安装包进行安装了,由于rabbitmq是基于erlang的,所以要首先安装erlang

    1、首先,下载并运行Erlang for Windows 安装程序 (地址:http://www.erlang.org/downloads)下载完毕并安装(注    意:安装目录请选择默认目录)

    

20190731092117_21738.png

    2、下载 RabbitMQ,(地址:http://www.rabbitmq.com/download.html )(注意:安装目录请选择默认目录)

20190731092131_65666.png

    安装成功后,启用web管理UI,进入RabbitMQ Serverrabbitmq_server-3.6.6sbin,输入命令rabbitmq-plugins enable rabbitmq_management

    在系统的开始菜单里找到RabbitMQ的启动菜单,启动服务

    浏览器输入,http://localhost:15672/,使用默认用户guest/guest进入网页端控制台:

    

20190731092415_65324.png

    代表没有问题了

    然后安装tornado和celery,注意指定版本号

pip3 install tornado==5.1.1
pip3 install celery ==3.1
pip3 install pika ==0.9.14
pip3 install tornado-celery
pip3 install flower

    需要注意一点,由于python3.7中async已经作为关键字存在,但是有的三方库还没有及时修正,导致它们自己声明的变量和系统关键字重名,所以我们要深入三方库的源码,帮他们修改async关键字为async_my,需要修改的文件夹和文件包含但不限于:

    /site-packages/pika/adapters/libev_connection.py

    /site-packages/celery下面的文件

    /site-packages/kombu下面的文件夹

    在tornado项目下新建一个任务队列文件task.py:

import time
from celery import Celery
from func_tool import mail

C_FORCE_ROOT=True

celery = Celery("tasks", broker="amqp://guest:guest@localhost:5672")
celery.conf.CELERY_RESULT_BACKEND = "amqp"


@celery.task
def sleep(seconds):
    time.sleep(float(seconds))
    return seconds

@celery.task
def sendmail(title,text,tomail):
    mail(title,text,tomail)
    return '发送邮件成功'

if __name__ == "__main__":
    celery.start()

然后编写服务端代码:

from celery import Celery
from tornado import gen
import tcelery
sys.path.append("..")
import task

#异步任务
class CeleryHandler(BaseHandler):
    @gen.coroutine
    def get(self):
        response = yield gen.Task(task.sendmail.apply_async,args=['你好','非常好','[email protected]'])
        self.write('ok')
        self.finish()

路由器代码:

import tornado.web
from views import Index
import config


#路由 

class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r"/celery", Index.CeleryHandler)
        ]
        super(Application,self).__init__(handlers,**config.setting)

    

程序入口代码server.py:

import tornado.ioloop
import tornado.httpserver
import config
 
from application import Application
 
 
 
 
if __name__ == "__main__":
    print('启动...')
    app = Application()
    httpServer = tornado.httpserver.HTTPServer(app)
    # httpServer.listen(8888)
    #绑定端口
    httpServer.bind(config.options['port'])
    #开启5个子进程(默认1,若为None或者小于0,开启对应硬件的CPU核心数个子进程)
    httpServer.start(1)
    tornado.ioloop.IOLoop.current().start()

进入项目目录,分别启动tornado服务,celery服务,以及flower服务

python server.py
celery -A task worker --loglevel=info
celery flower -A task --broker=amqp://guest:guest@localhost:5672//

访问网址http://localhost:8000/celery 用来触发异步任务

后台服务显示任务返回值:

20190731093546_89099.png

进入flower在线任务监控网址:http://localhost:5555/

20190731093706_35481.png

    至此,整个流程就走完了。


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK