Celery基础及django-celery实现

    xiaoxiao2024-11-01  88

    Celery官网http://www.celeryproject.org/

    学习资料:http://docs.jinkan.org/docs/celery/

    生产者消费者模式

    在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。

    单单抽象出生产者和消费者,还够不上是生产者消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示:

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过消息队列(缓冲区)来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给消息队列,消费者不找生产者要数据,而是直接从消息队列里取,消息队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个消息队列就是用来给生产者和消费者解耦的。

    解耦:假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。

    Celery介绍

    Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

    上图展示的是Celery的架构,它采用典型的生产者-消费者模式,主要由三部分组成:broker(消息队列)、workers(消费者:处理任务)、backend(存储结果)。

    消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ。任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache。

    实际应用中,用户从Web前端发起一个请求,我们只需要将请求所要处理的任务放入任务队列broker中,由空闲的worker去处理任务即可,处理的结果会暂存在后台数据库backend中。我们可以在一台机器或多台机器上同时起多个worker进程来实现分布式地并行处理任务。

    任务队列

    任务队列是一种在线程或机器间分发任务的机制。

    消息队列

    消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。

    Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程,职程对消息进行处理。如下图所示:

     

    Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。

    安装

    pip install django  安装django

    pip install celery==3.1.23 安装celery

    pip install django-celery  安装django-celery

    对Redis的支持需要额外的依赖。你可以用 celery[redis] 捆绑 同时安装 Celery 和这些依赖

    pip install -U celery[redis]

    pip install python-jenkins  安装jenkins 

    消息队列

    yum install -y rabbitmq-server  安装rabbitmq

    环境变量PATH加入/usr/lib/rabbitmq/bin

    rabbitmq-plugins enable rabbitmq_management  启用web管理插件

    chkconfig rabbitmq-server on  开机自启动

    /etc/init.d/rabbitmq-server start  启动RabbitMQ

    增加用户

    rabbitmqctl add_user shhnwangjian 123456

    rabbitmqctl set_user_tags shhnwangjian administrator

    命令执行成功后,rabbitmq-server就已经安装好并运行在后台了。

    WEB页面访问:http://ip地址:15672/

    另外也可以通过命令rabbitmq-server来启动rabbitmq server以及命令rabbitmqctl stop来停止server。

    更多的命令可以参考rabbitmq官网的用户手册:https://www.rabbitmq.com/manpages.html

    redis 安装 http://redis.io/download

    django中celery实现

    创建项目和app

    1

    2

    3

    django-admin.py startproject celery-wj

    cd celery-wj

    django-admin.py startapp app01

    配置settings.py

    当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task。 BROKER_URL和CELERY_RESULT_BACKEND分别指代你的Broker的代理地址以及Backend(result store)数据存储地址。 在Django中如果没有设置backend,会使用其默认的后台数据库用来存储数据。注意,此处backend的设置是通过关键字CELERY_RESULT_BACKEND来配置,与一般的.py文件中实现celery的backend设置方式有所不同。一般的.py中是直接通过设置backend关键字来配置,如下所示:

    1

    test = Celery('tasks', backend='redis://10.10.83.162:16379/0', broker='redis://10.10.83.162:16379/9')

    Django下要查看其他celery的命令,包括参数配置、启动多worker进程的方式都可以通过python manage.py celery --help来查看:

    创建一个task

    1

    2

    3

    4

    5

    6

    7

    # -*- coding: utf-8 -*-

     

    from celery import task

     

    @task

    def add(x, y):

        return x + y

    注意:与一般的.py中实现celery不同,tasks.py必须建在各app的根目录下,且不能随意命名。

    views.py

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    # -*- coding: utf-8 -*-

     

    from django.shortcuts import HttpResponse, render, redirect, HttpResponseRedirect

    from app01 import tasks

     

    def add_test(request):

        result = tasks.add.delay(2, 2)

        print result

        if result.ready():

            print "Task has run"

            if result.successful():

                print "Result was: %s" % result.result

            else:

                if isinstance(result.result, Exception):

                    print "Task failed due to raising an exception"

                    raise result.result

                else:

                    print "Task failed without raising exception"

        else:

            print "Task has not yet run"

        return HttpResponse(result)

    urls.py

    1

    2

    3

    4

    5

    6

    7

    8

    from django.conf.urls import url

    from django.contrib import admin

    from app01 import views

     

    urlpatterns = [

        url(r'^admin/', admin.site.urls),

        url(r'^add/$', views.add_test, name='add'),

    ]

    #先启动服务 python manage.py runserver 0.0.0.0:9008 #再启动worker python manage.py celery worker -c 4 --logievel=info

    通过terminal测试task

    python manage.py shell

    >>> from app01 import tasks >>> tasks.add.delay(3,5)

    通过页面访问测试task

    Celery的可视化监控工具flower

    1.安装flower 

    pip install flower 

    2.启动

    python manage.py celery flower --port=9008(默认是5555端口)

    3.帮助

    python manage.py celery flower --help

    4.可视化页面

    http://localhost:9008

    参考博客:http://www.cnblogs.com/znicy/p/5626040.html

    Django中如何使用django-celery完成异步任务: http://www.weiguda.com/blog/73/

    djcelery入门:实现运行定时任务: http://my.oschina.net/kinegratii/blog/292395

    最新回复(0)