python之消息队列

    xiaoxiao2022-07-04  153

    rabbitmq安装

    通过命令行直接安装rabbitmq:

    sudo apt-get install rabbitmq-server

    安装完成后自动开启服务,也可通过命令行进行服务开启/关闭操作:

    service rabbitmq-server start # 启动 service rabbitmq-server stop # 停止 service rabbitmq-server restart # 重启

    web可视化

    可以使用web界面进行消息队列的后台管理。需要配置插件:

    rabbitmq-plugins enable rabbitmq_management # 启用插件 service rabbitmq-server restart # 重启

    此时,应该可以通过 http://localhost:15672 查看,使用默认账户guest/guest 登录。

    注意:RabbitMQ 3.3 及后续版本,guest 只能在服务本机登录。 要想连接远程服务器,建议创建其他新用户。

    python中的使用

    python通过pika负责与消息队列的连接。在项目的依赖清单requirements.txt中添加pika依赖,并将其安装到项目中。 简单demo如下:

    # coding=utf-8 import pika def pub(): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') # 队列声明 # 发送消息,exchange交换器指定消息发送的队列,routing_key设置队列名称,body为消息体 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print "Send 'Hello World!'" connection.close() def rev(): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') # 队列声明 channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming() # 进入阻塞状态,当队列里有消息时则通过callback进行处理 def callback(ch, method, properties, body): print "Received %r" % (body) if __name__ == '__main__': pub() # 发送消息 rev() # 接收消息

    输出结果如下:

    Send 'Hello World!' Received 'Hello World!'

    交换器类型

    直连交换器(direct):路由键与绑定键相同时投递。 扇形交换器(fanout):发送到交换机的消息会被转发到与该交换机绑定的所有队列上。Fanout交换机转发消息是最快的。 主题交换器(topic):路由规则由绑定键决定,只有消息的路由键满足绑定键的规则,消息才可以路由到对应的队列上;* 用来表示一个单词,# 用来表示任意数量(零个或多个)单词。 头交换器(headers):类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

    心跳检测

    服务端和客户端通过heartbeat参数协商心跳检测超时时间,当超过这个时间无数据通信时,则认为客户端失效,断开链接。 heartbeat参数将取服务端和客户端中的较小的那一个,如果客户端未设置,则服务端默认为60s(旧版本是580s)。当客户端设置为0时,表明禁用心跳检测,此时服务器将不会主动断开链接。通常不建议禁用心跳检测,因为当链接数量较大时,会给服务器带来较大的压力。

    在新版中,服务器heartbeat默认时间为60s,如果消费回调函数耗时较长(超过60s),此时服务器将断开与客户端的链接。如果消费者需要回传ack确认,此时将无法通信,导致的一个问题就是:该消息无法被确认消费,下次将继续重复消费。

    解决办法是调整服务器heartbeat参数(但无法调太大,一旦超过某个值如20h,其实际值会变小,原因未知。测试发现15h是没有问题的。难道是有上限?)

    常用命令

    rabbitmqctl list_queues:列出所有消息队列及其消息数 rabbitmqctl purge_queue name:清空指定队列 rabbitmqctl list_connections timeout:列出所有心跳检测超时时间 rabbitmqctl status:查看服务器状态 rabbitmqctl cluster_status:查看集群状态

    创建集群

    假设现有rabbitmq@node1, rabbitmq@node2两个消息服务器节点。建立集群方式如下: 1.登录node1,先关闭rabbitmq: ./rabbitmqctl stop_app

    2.重置rabbitmq: ./rabbitmqctl reset

    3.加入集群: ./rabbitmqctl join_cluster rabbit@node2

    4.修改节点类型为ram,在集群中,只要有一个节点是Disc Node则可将集群元数据写到磁盘,其他均为Ram Node: ./rabbitmqctl change_cluster_node_type ram

    5.开启rabbitmq: ./rabbitmqctl start_app

    6.修改集群名称: ./rabbitmqctl set_cluster_name rabbit_demo

    参考:

    [1]http://www.rabbitmq.com [2]https://blog.csdn.net/zhuiqiuk/article/details/78957349

    最新回复(0)