Flask - flask-pika 操作 Rabbit

    xiaoxiao2022-07-05  150

    目录

    一、flask-pika

    二、基于 Flask 框架的初步使用

    2-1 生产者的队列堵塞问题


     

    一、flask-pika

    官方 github

    安装

    二、基于 Flask 框架的初步使用

    生产者包装成API调用,消费者使用其他进程监听执行

    '''配置文件 dev.py''' FLASK_PIKA_PARAMS = { 'host': 'localhost', # amqp.server.com 'username': 'test', # convenience param for username 'password': '123', # convenience param for password 'port': 5672, # amqp server port #'virtual_host': 'vhost' # amqp vhost } # optional pooling params FLASK_PIKA_POOL_PARAMS = { 'pool_size': 8, 'pool_recycle': 600 } '''app.py''' import os from flask import Flask from extensions import fpika _default_instance_path = '%(instance_path)s/instance' % \ {'instance_path': os.path.dirname(os.path.realpath(__file__))} def create_app(): app = Flask(__name__, instance_relative_config=True, instance_path=_default_instance_path) configure_app(app) configure_blueprint(app) configure_redis(app) configure_rabbitmq(app) return app def configure_app(app): app.config.from_pyfile('dev.py') def configure_blueprint(app): app.register_blueprint(rabbit.rabbit_blueprint, url_prefix='/rabbit') def configure_rabbitmq(app): fpika.init_app(app) ''' extensions.py ''' from flask_pika import Pika fpika = Pika() '''视图函数 - rabbit.py''' from flask import Blueprint from flask_restful import Resource from extensions import fpika from tasks import task0 from . import Api rabbit_blueprint = Blueprint('rabbit', __name__) rabbit_api = Api(rabbit_blueprint) class Producer(Resource): def get(self): print('Producer') channel = fpika.channel() channel.queue_declare(queue='test') channel.basic_publish(exchange='', routing_key='test', body='hello pika') # 将通道还给池 # return_broken_channel 在该框架下使用解决队列堵塞问题,详见下方分析 fpika.return_broken_channel(channel) fpika.return_channel(channel) return 'Producer' rabbit_api.add_resource(Producer, '/Producer')

     

    '''消费者''' import pika # 证书创建 credentials = pika.PlainCredentials('test', '123') # 本地连接 conn = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", credentials=credentials)) # 远程连接 # connection = pika.BlockingConnection(pika.ConnectionParameters( # '192.168.56.19',5672,'/',credentials)) channel = conn.channel() # 再次创建队列,防止队列不存在的情况出现,和生产者队列同名 channel.queue_declare(queue='test') # 回调函数,用于队列任务结束后的回调,用于将数据返回发送端,表明是否执行完毕 def callback(ch, method, properties, body): print("[x] Received ") print(ch) print(method) print(properties) print(body) # no_ack 表名是否告知发送端是否接受消息,True - 接收 channel.basic_consume(callback, queue="test", no_ack=True) # 开启监听,进程夯住,等待消息 print('waiting for message To exit press CTRL+C') channel.start_consuming()

    2-1 生产者的队列堵塞问题

    问题分析:设置'pool_size',若超过'pool_size'的设置数量,则无法将任务推入 RabbitMQ 中

    初步问题分析:框架兼容问题(不确定)

    暂时处理方式:

    FLASK_PIKA_POOL_PARAMS = None 不使用 pika 池添加 return_broken_channel ,问题暂时未出现 # 将通道还给池 fpika.return_broken_channel(channel) fpika.return_channel(channel)

    正常批量推送 

    最新回复(0)