目录

    场景一:公司准备给全部员工推送一个邮件公告。采用封装的邮件库API,使用一次循环遍历全部员工推送邮件,会使主程序卡在循环处无法响应,这是我们不愿意看到的,我们希望点击执行之后,可以正常其他操作。场景二,量化程序的本地回测数据库每天都要更新。写个程序每天执行一次,是可以的,但不是有效率的,同时也会因为人而引入不确定因素。我们希望能每天定时执行一个操作,或者每小时执行一个操作,有效、可靠。这里归纳起来就是两类需求:异步任务和定时任务。

    1. 什么是Celery

    Celery是一个基于Python编写的分布式任务调度模块,同时也支持实时处理的任务队列。

    2. Celery工作原理

    Celery的原理是利用了一个任务队列,将任务写入队列中,然后一个一个取出来执行并将结果写入指定数据库。理解起来就是生产者-消费者模型。在django中,执行异步任务代码产生一个task消息,传递给broker,而后celery的worker会获取到task消息,对task执行,最后将执行返回的消息和结果存储在backend中。这里的broker和backend通常使用的是rabbitMq或redis。

    2.1 Pruducer

    负责产生任务,然后交给任务队列去处理。有两种生产者,一种是上面提到的django,也就是主程序,这种任务放入队列之后马上会被执行;另外一种是任务调度器–celery beat,它读取配置文件的内容,周期性的将任务请求发送给任务队列,这样就满足了定时任务的需求。

    2.2 Celery worker

    执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率

    2.3 Celery序列化

    在客户端和消费者之间传输数据需要序列化和反序列化,主要有pickle、json、yaml、msgpack方案,其中的pickle由于安全问题从3.2的版本开始不被支持。

    3. 构建一个Celery任务

    3.1 基本安装

    先安装Erlang

    再安装rabbitmq-server

    http://localhost:15672/ 可以查看rabbitmq中的消息队列

    安装celery

    pip install "celery[librabbitmq,redis,msgpack]"
    

    tasks.py

    from celery import Celery
    app = Celery('hello', broker='amqp://guest@localhost/5672')
    @app.task
    def hello():
        return 'hello world'
    
    celery -A tasks worker
    

    4. django 中使用 celery

    4.1 基本安装和配置

    • 需要pip安装django-celery、celery,这里需要注意 django 和其兼容性问题,选择django – 1.8.3,dajngo-celery–3.1.17,celery–3.1是个不错的选择。
    • 需要在INSTALLED_APPS中加入’djcelery’应用,执行 python manage migrate 将djcelery的数据库表写入数据库。celery 的后台任务和定时任务在相关表中都能找到对应,如果想写入或修改任务,我们可以直接操作这些表。

    4.2 应用

    在使用时,建议将配置信息放在单独的 config文件中,比如 broker,backend,超时,序列化信息等。将任务函数注册在一个单独的celery文件中。这样文件下就会有四个文件,tasks.py,config.py,celery.py, __init__.py。其中tasks.py主要写异步任务,如果直接是view中的函数,那么直接在view相关函数前加装饰器就可以了。

    4.3 最后,如何启动celery任务

    • task任务使用
    python  manage.py  celery  worker  --settings=settings
    
    • 周期性任务
    python manage.py celery beat
    python manage.py celery worker --settings=settings
    

    5. 参考资料