024、异步方案RabbitMQ和Celery

    xiaoxiao2023-11-23  172

    一、生产者消费者设计模式

    问题:

    我们的代码是自上而下同步执行的。

    发送短信是耗时的操作。如果短信被阻塞住,用户响应将会延迟。

    响应延迟会造成用户界面的倒计时延迟。

    解决:

    异步发送短信

    发送短信和响应分开执行,将发送短信从主业务中解耦出来。

    思考:

    如何将发送短信从主业务中解耦出来。

    1、生产者消费者设计模式介绍

    为了将发送短信从主业务中解耦出来,我们引入生产者消费者设计模式。

    它是最常用的解耦方式之一,寻找中间人(broker)搭桥,保证两个业务没有直接关联。

    总结:

    生产者生成消息,缓存到消息队列中,消费者读取消息队列中的消息并执行。

    由美多商城生成发送短信消息,缓存到消息队列中,消费者读取消息队列中的发送短信消息并执行。

    二、RabbitMQ介绍和使用

    1. RabbitMQ介绍

    消息队列是消息在传输的过程中保存消息的容器。现在主流消息队列有:RabbitMQ、ActiveMQ、Kafka等等。 RabbitMQ和ActiveMQ比较 系统吞吐量:RabbitMQ好于ActiveMQ持久化消息:RabbitMQ和ActiveMQ都支持高并发和可靠性:RabbitMQ好于ActiveMQ RabbitMQ和Kafka: 系统吞吐量:RabbitMQ弱于Kafka可靠性和稳定性:RabbitMQ好于Kafka比较设计初衷:Kafka是处理日志的,是日志系统,所以并没有具备一个成熟MQ应该具备的特性。 综合考虑,本项目选择RabbitMQ作为消息队列。

    2. 安装RabbitMQ(ubuntu 16.04)

    1.安装Erlang

    由于 RabbitMQ 是采用 Erlang 编写的,所以需要安装 Erlang 语言库。Erlang(['ə:læŋ])是一种通用的面向并发的编程语言, # 1. 在系统中加入 erlang apt 仓库 $ wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb $ sudo dpkg -i erlang-solutions_1.0_all.deb # 2. 修改 Erlang 镜像地址,默认的下载速度特别慢 $ sudo vim /etc/apt/sources.list.d/erlang-solutions.list # 替换默认值 $ deb http://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial contrib # 3. 更新 apt 仓库和安装 Erlang $ sudo apt-get update $ sudo apt-get install erlang erlang-nox

    2.安装RabbitMQ

    安装成功后,默认就是启动状态。 # 1. 先在系统中加入 rabbitmq apt 仓库,再加入 rabbitmq signing key $ echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list $ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - # 2. 更新 apt 仓库和安装 RabbitMQ $ sudo apt-get update $ sudo apt-get install rabbitmq-server

     

    # 重启 $ sudo systemctl restart rabbitmq-server # 启动 $ sudo systemctl start rabbitmq-server # 关闭 $ sudo systemctl stop rabbitmq-server

    3.Python访问RabbitMQ

    RabbitMQ提供默认的administrator账户。用户名和密码:guest、guest协议:amqp地址:localhost端口:5672查看队列中的消息:sudo rabbitctl list_queues

    # Python3虚拟环境下,安装pika

    $ pip install pika

     

    # 生产者代码:rabbitmq_producer.py import pika # 链接到RabbitMQ服务器 credentials = pika.PlainCredentials('guest', 'guest') connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials)) #创建频道 channel = connection.channel() # 声明消息队列 channel.queue_declare(queue='zxc') # routing_key是队列名 body是要插入的内容 channel.basic_publish(exchange='', routing_key='zxc', body='Hello RabbitMQ!') print("开始向 'zxc' 队列中发布消息 'Hello RabbitMQ!'") # 关闭链接 connection.close()

     

    # 消费者代码:rabbitmq_customer.py import pika # 链接到rabbitmq服务器 credentials = pika.PlainCredentials('guest', 'guest') connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials)) # 创建频道,声明消息队列 channel = connection.channel() channel.queue_declare(queue='zxc') # 定义接受消息的回调函数 def callback(ch, method, properties, body): print(body) # 告诉RabbitMQ使用callback来接收信息 channel.basic_consume(queue='zxc', on_message_callback=callback, auto_ack=True) # 开始接收信息 channel.start_consuming()

    3. 新建administrator用户

    # 新建用户,并设置密码 $ sudo rabbitmqctl add_user admin your_password # 设置标签为administrator $ sudo rabbitmqctl set_user_tags admin administrator # 设置所有权限 $ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" # 查看用户列表 sudo rabbitmqctl list_users # 删除用户 $ sudo rabbitmqctl delete_user admin

    4. RabbitMQ配置远程访问

    1.准备配置文件

    安装好 RabbitMQ 之后,在 /etc/rabbitmq 目录下面默认没有配置文件,需要单独下载。 $ cd /etc/rabbitmq/ $ wget https://raw.githubusercontent.com/rabbitmq/rabbitmq-server/master/docs/rabbitmq.config.example $ sudo cp rabbitmq.config.example rabbitmq.config

    2.设置配置文件

    $ sudo vim rabbitmq.config

    # 设置配置文件结束后,重启RabbitMQ服务端

    $ sudo systemctl restart rabbitmq-server

    配置完成后,使用rabbitmq_producer.py、rabbitmq_customer.py测试。

    三、Celery介绍和使用

    思考:

    消费者取到消息之后,要消费掉(执行任务),需要我们去实现。任务可能出现高并发的情况,需要补充多任务的方式执行。耗时任务很多种,每种耗时任务编写的生产者和消费者代码有重复。取到的消息什么时候执行,以什么样的方式执行。

    结论:

    实际开发中,我们可以借助成熟的工具Celery来完成。有了Celery,我们在使用生产者消费者模式时,只需要关注任务本身,极大的简化了程序员的开发流程。

    1. Celery介绍

    Celery介绍: 一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。单个 Celery 进程每分钟可处理数以百万计的任务。通过消息进行通信,使用消息队列(broker)在客户端和消费者之间进行协调。 安装Celery:

    $ pip install -U Celery

    Celery官方文档

    2. 创建Celery实例并加载配置

    1.定义Celery包

    makedir celery_tasks

    2.创建Celery实例

    celery_tasks.main.py

    # celery启动文件

    from celery import Celery

    # 创建celery实例 c

    elery_app = Celery('meiduo')

    3.加载Celery配置

    celery_tasks.config.py

    # 指定消息队列的位置 broker_url= 'amqp://guest:guest@192.168.103.158:5672'

    celery_tasks.main.py

    # celery启动文件 from celery import Celery # 创建celery实例 celery_app = Celery('meiduo') # 加载celery配置 celery_app.config_from_object('celery_tasks.config')

    3. 定义发送短信任务

    1.注册任务:celery_tasks.main.py

    # celery启动文件 from celery import Celery # 创建celery实例 celery_app = Celery('meiduo') # 加载celery配置 celery_app.config_from_object('celery_tasks.config') # 自动注册celery任务 celery_app.autodiscover_tasks(['celery_tasks.sms'])

    2.定义任务:celery_tasks.sms.tasks.py

    # bind:保证task对象会作为第一个参数自动传入 # name:异步任务别名 # retry_backoff:异常自动重试的时间间隔 第n次(retry_backoff×2^(n-1))s # max_retries:异常自动重试次数的上限 @celery_app.task(bind=True, name='ccp_send_sms_code', retry_backoff=3) def ccp_send_sms_code(self, mobile, sms_code): """ 发送短信异步任务 :param mobile: 手机号 :param sms_code: 短信验证码 :return: 成功0 或 失败-1 """ try: send_ret = CCP().send_template_sms(mobile, [sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID) except Exception as e: logger.error(e) # 有异常自动重试三次 raise self.retry(exc=e, max_retries=3) if send_ret != 0: # 有异常自动重试三次 raise self.retry(exc=Exception('发送短信失败'), max_retries=3) return send_ret

    4. 启动Celery服务

    $ cd ~/projects/meiduo_project/meiduo_mall $ celery -A celery_tasks.main worker -l info

    -A指对应的应用程序, 其参数是项目中 Celery实例的位置。worker指这里要启动的worker。-l指日志等级,比如info等级。

    5. 调用发送短信任务

     

    # 发送短信验证码 # CCP().send_template_sms(mobile,[sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID) # Celery异步发送短信验证码 ccp_send_sms_code.delay(mobile, sms_code)

    6. 补充celery worker的工作模式

    默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。如何自己指定进程数:celery worker -A proj --concurrency=4如何改变进程池方式为协程方式:celery worker -A proj --concurrency=1000 -P eventlet -c 1000

    # 安装eventlet模块

    $ pip install eventlet

    # 启用 Eventlet 池

    $ celery -A celery_tasks.main worker -l info -P eventlet -c 1000

    celery模板下载:

    链接:https://pan.baidu.com/s/1twFGXOxH9MJb7GPrLCPCKw

    提取码:fodj

    最新回复(0)