配置Djcelery
主要步骤
在settings配置相关参数定义任务执行任务,可以在程序中调用执行,也可交给后台周期性执行1) Django项目的settings模块配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
INSTALLED_APPS = [
'suit', # 添加suit支持
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'djcelery', # 添加djcelery
'frame',
'rest_framework',
'monitor_agent',
'monitor_master',
'webpage',
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# celery
import djcelery
djcelery.setup_loader()
CELERY_IMPORTS = ('monitor_agent.my_celery.tasks', ) # 任务定义所在的模块
CELERY_TIMEZONE = TIME_ZONE
BROKER_URL = 'amqp://guest:guest@10.10.83.162:5672/'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 使用了django-celery默认的数据库调度模型,任务执行周期都被存在你指定的orm数据库中
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERY_TASK_RESULT_EXPIRES = 1200 # celery任务执行结果的超时时间,我的任务都不需要返回结果,只需要正确执行就行
CELERYD_CONCURRENCY = 10 # celery worker的并发数 也是命令行-c指定的数目,事实上实践发现并不是worker也多越好,保证任务不堆积,加上一定新增任务的预留就可以
CELERYD_PREFETCH_MULTIPLIER = 4 # celery worker 每次去rabbitmq取任务的数量,我这里预取了4个慢慢执行,因为任务有长有短没有预取太多
CELERYD_MAX_TASKS_PER_CHILD = 200 # 每个worker执行了多少任务就会死掉
CELERY_DEFAULT_QUEUE = "default_wj" # 默认的队列,如果一个消息不符合其他的队列就会放在默认队列里面
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_QUEUES = {
"default_wj": { # 这是上面指定的默认队列
"exchange": "default_wj",
"exchange_type": "direct",
"routing_key": "default_wj"
},
"topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
"routing_key": "topictest.#",
"exchange": "topic_exchange",
"exchange_type": "topic",
},
"test2": { # test和test2是2个fanout队列,注意他们的exchange相同
"exchange": "broadcast_tasks",
"exchange_type": "fanout",
"binding_key": "broadcast_tasks",
},
"test": {
"exchange": "broadcast_tasks",
"exchange_type": "fanout",
"binding_key": "broadcast_tasks2",
},
}
CELERY_IMPORTS这个引用自己定义的任务,本文是设置在app名为monitor_agent的my_celery下的tasks.py
官方配置项介绍 http://docs.celeryproject.org/en/latest/userguide/configuration.html
2)编写tasks.py文件中的函数,如下例
1
2
3
4
5
6
7
from __future__ import absolute_import
from celery import shared_task,task
@shared_task()
def ping_net_task():
print “ping_net_task”
3)同步数据库
python manage.py makemigrations
python manage.py migrate
在数据库中生成几张表
可以通过django的admin页面进行数据创建
启动 /usr/bin/python manage.py celerycam --loglevel=INFO
admin后台查看 woker是不是在线
当worker可以监控后,在admin后台tasks表中可以查看每次任务的执行状态
或者自己导入from djcelery import models as celery_models,通过它提供的Model Query API来操作,同平常的数据库查询一样。
4)启动
官网 http://docs.celeryproject.org/en/latest/userguide/workers.html
启动 python manage.py celery worker -l info
如果有定时任务的话,还需要启动心跳 python manage.py celery beat
celery worker --help 启动参数帮助命令
celery beat --help 启动参数帮助命令
1)以两台主机为例,通过django创建2个app,名称为net_celery,local_celery,它们的task不同
2)net_celery部署在一台主机,settings配置
worker程序启动指定hostname和queue
1
/usr/bin/python /monitor/demo_web/manage.py celery worker --hostname=192.168.137.11 --pidfile=/tmp/django_celeryworker.pid --loglevel=INFO --queues=net
admin页面配置,需要将queue、exchange指定
3)local_celery部署在一台主机,settings配置
worker程序启动指定hostname和queue,注意与上面一台不一样
1
/usr/bin/python /monitor/demo_web/manage.py celery worker --hostname=192.168.137.12 --pidfile=/tmp/django_celeryworker.pid --loglevel=INFO --queues=local
admin页面配置,需要将queue、exchange指定
4)启动celery beat和celerycam
备注:两个进程只需要在一台主机上启动,不要启动多,如果启动多个,会出现同一个task多次执行。
1
/usr/bin/python /monitor/demot_web/manage.py celery beat --pidfile=/tmp/django_celerybeat.pid --loglevel=INFO
1
/usr/bin/python /home/demo_web/manage.py celerycam --loglevel=INFO --pidfile=/tmp/django_celerycam.pid