目录
零、RabbitMQ 总结
0-1 channel
0-2 Exchange、Queue、Route 三者的关系和交互
一、轮询消费模式
1-1 生产者
1-2 消费者
二、队列持久化
三、广播模式 - ECHANGE
3-1 fanout - 转发速度最快
3-1-1 消费者(订阅者)
3-1-2 生产者(发布者)
3-2 direct - 路由键的完全匹配分发
3-2-1 生产者
3-2-2 消费者
3-3 topic - 路由关键词分发
3-3-1 生产者
3-3-2 消费者
四、 channel.basic_qos - 保证资源,提高效率
五、错误总结
零、RabbitMQ 总结
参考学习 - python RabbitMQ队列使用
参考学习 - RabbitMQ的应用场景以及基本原理介绍
python pika 库 - 官方文档
0-1 channel
channel = connect.channel() - 提供与 RabbitMQ 交互的通道,通过通道对 exchange、queue 进行设置。
0-2 Exchange、Queue、Route 三者的关系和交互
执行流程: 生产者生产msg,交付 exchange,exchange 通过 routingkey 匹配对应的 queue(绑定关系),queue 积累 msg,消费者监听对应的 queue,一旦监听的 queue 内存在msg,则自动获取执行(默认不拒绝)。
!注意!:通常情况下,exchange、queue的创建和绑定优先初始化,之后生产者和消费者对现有的,进行调用处理业务。
一、轮询消费模式
生产者将消息存入 rabbit 指定队列,若有消费者接入,立即获取消息,并消费删除队列里的消息。
1-1 生产者
import pika
# 证书创建
credentials = pika.PlainCredentials('test', '123')
# 本地连接
conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
# 远程连接
# connection = pika.BlockingConnection(pika.ConnectionParameters(
# '192.168.56.19',5672,'/',credentials))
# 声明队列
channel = conn.channel()
channel.queue_declare(queue='test')
# exchange 交换机;
# routing_key 路由,指定消息发送队列;
# body,消息内容
channel.basic_publish(exchange='', routing_key='test', body='hello pika')
'''!!! 注意 basic_publish方法只要给予存在的队列和交换机,就可以进行数据推送,无需前后channel进行关联'''
print('[x] sent "hello pika"')
# 关闭连接
conn.close()
1-2 消费者
import pika
# 证书创建
credentials = pika.PlainCredentials('test', '123')
# 本地连接
conn = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", credentials=credentials))
# 远程连接
# connection = pika.BlockingConnection(pika.ConnectionParameters(
# '192.168.56.19',5672,'/',credentials))
channel = conn.channel()
# 再次创建队列,防止队列不存在的情况出现,和生产者队列同名
channel.queue_declare(queue='test')
# 回调函数,用于队列任务结束后的回调,用于将数据返回发送端,表明是否执行完毕
def callback(ch, method, properties, body):
print("[x] Received ")
print(ch)
print(method)
print(properties)
print(body)
# no_ack 表名是否告知发送端是否接受消息,True - 接收
channel.basic_consume(callback, queue="test", no_ack=True)
# 开启监听,进程夯住,等待消息
print('waiting for message To exit press CTRL+C')
channel.start_consuming()
二、队列持久化
防止 RabbitMQ 的意外宕机,保存队列宕机时的状态。
# 在队列声明时使用 durable=Ture 启用状态保存
channel.queue_declare(queue='durable',durable=True)
三、广播模式 - ECHANGE
生产者发送消息到队列,所有的消费者都会收到消息,若广播后有消费者没有接收到消息,那消息即丢失。
Echange 分发消息,具有四种分发策略:direct、fanout、topic、headers(几乎不用)
3-1 fanout - 转发速度最快
消息被分发到所有的绑定队列上,fanout 将队列绑定到交换器上,交换器上的消息都会被转发到与该交换器绑定的所有队列上。
3-1-1 消费者(订阅者)
import pika
credentials = pika.PlainCredentials('test', '123')
conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = conn.channel()
channel.exchange_declare(exchange='Clogs',
type='fanout')
# 不指定queue名字,rabbit会随机分配一个名字
# exclusive=True会在使用此queue的消费者断开后,自动将queue删除
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='Clogs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print("[x] Received ")
print(ch)
print(method)
print(properties)
print(body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
3-1-2 生产者(发布者)
import pika
import sys
credentials = pika.PlainCredentials('test', '123')
conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = conn.channel()
channel.exchange_declare(exchange='Clogs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='Clogs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
3-2 direct - 路由键的完全匹配分发
消息中的 routing-key 和 binding 中的 bingding-key 一致,交换器才将消息发送至相应队列。
完全匹配、单播
3-2-1 生产者
import pika,sys
credentials = pika.PlainCredentials('lisi','123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", credentials=credentials))
channel = connection.channel()
# 开始连接exchange
channel.exchange_declare(exchange='mydirect',type='direct')
log_level = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[1:]) or "info:helloworld!"
channel.basic_publish(exchange='mydirect',
routing_key=log_level,
body=message)
print("publish %s to %s" % (message,log_level))
connection.close()
3-2-2 消费者
import pika,sys
credentials = pika.PlainCredentials('lisi','123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='mydirect', type='direct')
queue_obj = channel.queue_declare(exclusive=True)
#不指定queue名字,rabbit会随机分配一个名字
#exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = queue_obj.method.queue
print('queue name',queue_name,queue_obj)
log_levels = sys.argv[1:]
if not log_levels:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for level in log_levels:
# 绑定队列到Exchange
channel.queue_bind(exchange='mydirect',queue=queue_name,routing_key=level)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name, no_ack=True)
channel.start_consuming()
3-3 topic - 路由关键词分发
官方文档 - topic
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配不多不少一个单词。
3-3-1 生产者
import pika
import sys
credentials = pika.PlainCredentials('用户名', '密码')
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #队列连接通道
channel.exchange_declare(exchange='mytopic',type='topic')
log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info'
message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"
channel.basic_publish(exchange='topic_log',
routing_key=log_level,
body=message)
print(" [x] Sent %r" % message)
connection.close()
3-3-2 消费者
import pika,sys
credentials = pika.PlainCredentials('用户名', '密码')
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
queue_obj = channel.queue_declare(exclusive=True)
# 不指定queue名字,rabbit会随机分配一个名字
# exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = queue_obj.method.queue
log_levels = sys.argv[1:] # info warning errr
if not log_levels:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for level in log_levels:
channel.queue_bind(exchange='topic_log',
queue=queue_name,
routing_key=level)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name, no_ack=True)
channel.start_consuming()
3-3-3 topic 注意点
错误原因:routing key 设置理解错误
正确写法如下
四、 channel.basic_qos - 保证资源,提高效率
参考博文 - RabbitMQ BasicQos消费者并行处理限制
假设,单个消费者单位时间处理60条msg,生产者单位时间发送300条msg。
假设,单个消费者单位时间接收300条msg(默认情况下,消费者不拒绝),超过了最大承受力。
结果导致,服务器资源耗尽,消费者卡死。
RabbitMQ - qos(服务质量保证) - 若超过负载的消息推送,在非自动确认消息的前提下,消费者不进行新的消费。即托送消息仍位于queue中,而非交于消费者。
prefetchSize:0 prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer 将 block 掉,直到有消息ackglobal:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别备注:据说 prefetchSize 和 global这两项,rabbitmq没有实现,暂且不研究prefetch_count 在 no_ask=false 的情况下生效,即在自动应答的情况下这两个值是不生效的注意,这种方法可能会导致 queue 满。当然,这种情况下你可能需要添加更多的 Consumer,或者创建更多的virtualHost来细化你的设计。
五、错误总结
credentials = pika.PlainCredentials('test', '123')
# 确认证书的创建为服务端存在的用户名和密码