Django+redis+celery 实现异步任务

    xiaoxiao2025-07-21  17

    在后台中,可能会有一些耗时的任务,会影响到前端响应速度。为加快响应,可使用异步任务的方式在后台执行耗时的操作。

    一、Django中的异步请求

            Django Web中从一个http请求发起,到获得响应返回html页面的流程大致如下:http请求发起 -- http handling(request解析) -- url mapping(url正则匹配找到对应的View) -- 在View中进行逻辑的处理、数据计算(包括调用Model类进行数据库的增删改查)--将数据推送到template,返回对应的template/response。

    同步请求:所有逻辑处理、数据计算任务在View中处理完毕后返回response。在View处理任务时用户处于等待状态,直到页面返回结果。

    异步请求:View中先返回response,再在后台处理任务。用户无需等待,可以继续浏览网站。当任务处理完成时,我们可以再告知用户。

    二、Celery

            Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

           上图展示的是Celery的架构,它采用典型的生产者-消费者模式,主要由三部分组成:broker(消息队列)、workers(消费者:处理任务)、backend(存储结果)。实际应用中,用户从Web前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列broker中,由空闲的worker去处理任务即可,处理的结果会暂存在后台数据库backend中。我们可以在一台机器或多台机器上同时起多个worker进程来实现分布式地并行处理任务。

    三、Django中Celery的实现

    环境:

    django:2.1.7celery:3.1.26.post2(Cipater)redis:2.10.6(注意:最新版本是3.2.1,但会报错Unrecoverable error: AttributeError("'str' object has no attribute 'items'"))redis-server:3.2.100

    配置:

    1、settings.py文件

    INSTALLED_APPS = [ ... # celery + redis 'djcelery', ] # Celery settings import djcelery djcelery.setup_loader() # celery中间人 redis://redis服务器所在的ip地址:地址/数据库号 BROKER_URL = 'redis://127.0.0.1:6379/0' # celery结果返回,可用于跟踪结果 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # celery内容等消息的格式设置 CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # celery时区设置,使用TIME_ZONE CELERY_TIMEZONE = TIME_ZONE

    2、在项目文件夹下增加celery.py文件:

    from __future__ import absolute_import import os from celery import Celery from django.conf import settings # 设置环境变量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') # 实例化Celery,网上很多教程都是未设置broker导致启动失败 app = Celery('mysite', broker='redis://127.0.0.1:6379/0') # 使用django的settings文件配置celery app.config_from_object('django.conf:settings') # Celery加载所有注册的应用 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

    3、项目的__init__.py文件修改:

    from __future__ import absolute_import from .celery import app as celery_app __all__ = ['celery_app']

    4、测试代码

    在一个app中增加tasks.py文件(关于task,并不是一定要把所有的task放在tasks.py,可以放在其他类里面,只要在函数上加@task即可)

    import time from celery import task @task def test(a, b): print('这是任务开始') print(a+b) time.sleep(10) print('这是任务结束')

    app中view代码增加:

    from . import tasks def test_view(request, *args, **kwargs): tasks.test.delay(1, 2) result = {'code': 0, 'msg': '这是一个后台任务'} return HttpResponse(json.dumps(result, ensure_ascii=False), content_type='application/json')

    app中url增加:

    path('test/', views.test_view, name='test_view'),

    四、运行项目

    1、python manage.py runserver

    2、celery -A mysite worker --pool=solo -l info

    python manage.py celery -A celery worker --loglevel=info 是错误的

    3、访问test

     

    另外,Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现,如下图所示:

      Django下实现的方式如下: 

      1. 安装flower:pip install flowr

      2. 启动flower(默认会启动一个webserver,端口为5555):python manage.py celery flower

      3. 进入http://localhost:5555即可查看。

    参考:

    https://www.cnblogs.com/znicy/p/5626040.html

    https://www.jb51.net/article/159215.htm

    最新回复(0)