消息中间件 - RabbitMQ - Python pika 库

    xiaoxiao2022-07-03  143

    目录

    零、RabbitMQ 总结

    0-1 channel

    0-2 Exchange、Queue、Route 三者的关系和交互

    一、轮询消费模式

    1-1 生产者

    1-2 消费者

    二、队列持久化

    三、广播模式 - ECHANGE 

    3-1 fanout - 转发速度最快

    3-1-1 消费者(订阅者)

    3-1-2 生产者(发布者)

    3-2 direct - 路由键的完全匹配分发

    3-2-1 生产者

    3-2-2 消费者

    3-3 topic - 路由关键词分发

    3-3-1 生产者

    3-3-2 消费者

    四、 channel.basic_qos - 保证资源,提高效率

    五、错误总结


     

    零、RabbitMQ 总结

    参考学习 - python RabbitMQ队列使用

    参考学习 - RabbitMQ的应用场景以及基本原理介绍

    python pika 库 - 官方文档

    0-1 channel

    channel = connect.channel() - 提供与 RabbitMQ 交互的通道,通过通道对 exchange、queue 进行设置。

    0-2 Exchange、Queue、Route 三者的关系和交互

    执行流程: 生产者生产msg,交付 exchange,exchange 通过 routingkey 匹配对应的 queue(绑定关系),queue 积累 msg,消费者监听对应的 queue,一旦监听的 queue 内存在msg,则自动获取执行(默认不拒绝)。

    !注意!:通常情况下,exchange、queue的创建和绑定优先初始化,之后生产者和消费者对现有的,进行调用处理业务。

    一、轮询消费模式

    生产者将消息存入 rabbit 指定队列,若有消费者接入,立即获取消息,并消费删除队列里的消息。

    1-1 生产者

    import pika ​ # 证书创建 credentials = pika.PlainCredentials('test', '123') # 本地连接 conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials)) # 远程连接 # connection = pika.BlockingConnection(pika.ConnectionParameters( # '192.168.56.19',5672,'/',credentials)) ​ # 声明队列 channel = conn.channel() channel.queue_declare(queue='test') # exchange 交换机; # routing_key 路由,指定消息发送队列; # body,消息内容 channel.basic_publish(exchange='', routing_key='test', body='hello pika') '''!!! 注意 basic_publish方法只要给予存在的队列和交换机,就可以进行数据推送,无需前后channel进行关联''' print('[x] sent "hello pika"') ​ # 关闭连接 conn.close() ​

    1-2 消费者

    import pika ​ # 证书创建 credentials = pika.PlainCredentials('test', '123') # 本地连接 conn = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", credentials=credentials)) # 远程连接 # connection = pika.BlockingConnection(pika.ConnectionParameters( # '192.168.56.19',5672,'/',credentials)) channel = conn.channel() ​ # 再次创建队列,防止队列不存在的情况出现,和生产者队列同名 channel.queue_declare(queue='test') ​ ​ # 回调函数,用于队列任务结束后的回调,用于将数据返回发送端,表明是否执行完毕 def callback(ch, method, properties, body):   print("[x] Received ")   print(ch)   print(method)   print(properties)   print(body) ​ ​ # no_ack 表名是否告知发送端是否接受消息,True - 接收 channel.basic_consume(callback, queue="test", no_ack=True) ​ # 开启监听,进程夯住,等待消息 print('waiting for message To exit   press CTRL+C') channel.start_consuming() ​

    二、队列持久化

    防止 RabbitMQ 的意外宕机,保存队列宕机时的状态。

    # 在队列声明时使用 durable=Ture 启用状态保存 channel.queue_declare(queue='durable',durable=True)

     

    三、广播模式 - ECHANGE 

    生产者发送消息到队列,所有的消费者都会收到消息,若广播后有消费者没有接收到消息,那消息即丢失。

    Echange 分发消息,具有四种分发策略:direct、fanout、topic、headers(几乎不用)

    3-1 fanout - 转发速度最快

    消息被分发到所有的绑定队列上,fanout 将队列绑定到交换器上,交换器上的消息都会被转发到与该交换器绑定的所有队列上。

    3-1-1 消费者(订阅者)

    import pika ​ credentials = pika.PlainCredentials('test', '123') conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials)) ​ channel = conn.channel() ​ channel.exchange_declare(exchange='Clogs',                         type='fanout') ​ # 不指定queue名字,rabbit会随机分配一个名字 # exclusive=True会在使用此queue的消费者断开后,自动将queue删除 result = channel.queue_declare(exclusive=True)   queue_name = result.method.queue ​ channel.queue_bind(exchange='Clogs',                   queue=queue_name) ​ print(' [*] Waiting for logs. To exit press CTRL+C') ​ ​ def callback(ch, method, properties, body):   print("[x] Received ")   print(ch)   print(method)   print(properties)   print(body) ​ ​ channel.basic_consume(callback,                     queue=queue_name,                     no_ack=True) ​ channel.start_consuming()

    3-1-2 生产者(发布者)

    import pika import sys ​ credentials = pika.PlainCredentials('test', '123') conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials)) channel = conn.channel() ​ channel.exchange_declare(exchange='Clogs',                         type='fanout') ​ message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='Clogs',                     routing_key='',                     body=message)                       print(" [x] Sent %r" % message) ​ connection.close()

    3-2 direct - 路由键的完全匹配分发

    消息中的 routing-key 和 binding 中的 bingding-key 一致,交换器才将消息发送至相应队列。

    完全匹配、单播

    3-2-1 生产者

    import pika,sys credentials = pika.PlainCredentials('lisi','123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", credentials=credentials)) channel = connection.channel() # 开始连接exchange channel.exchange_declare(exchange='mydirect',type='direct') log_level = sys.argv[1] if len(sys.argv) > 1 else "info" message = ' '.join(sys.argv[1:]) or "info:helloworld!" channel.basic_publish(exchange='mydirect', routing_key=log_level, body=message) print("publish %s to %s" % (message,log_level)) connection.close()

    3-2-2 消费者

    import pika,sys credentials = pika.PlainCredentials('lisi','123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='mydirect', type='direct') queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = queue_obj.method.queue print('queue name',queue_name,queue_obj) log_levels = sys.argv[1:] if not log_levels: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for level in log_levels: # 绑定队列到Exchange channel.queue_bind(exchange='mydirect',queue=queue_name,routing_key=level) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name, no_ack=True) channel.start_consuming()

    3-3 topic - 路由关键词分发

    官方文档 - topic

    topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。

    它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配不多不少一个单词。

    3-3-1 生产者

    import pika import sys credentials = pika.PlainCredentials('用户名', '密码') parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #队列连接通道 channel.exchange_declare(exchange='mytopic',type='topic') log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info' message = ' '.join(sys.argv[1:]) or "all.info: Hello World!" channel.basic_publish(exchange='topic_log', routing_key=log_level, body=message) print(" [x] Sent %r" % message) connection.close()

    3-3-2 消费者

    import pika,sys credentials = pika.PlainCredentials('用户名', '密码') parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() queue_obj = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字 # exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = queue_obj.method.queue log_levels = sys.argv[1:] # info warning errr if not log_levels: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for level in log_levels: channel.queue_bind(exchange='topic_log', queue=queue_name, routing_key=level) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name, no_ack=True) channel.start_consuming()

    3-3-3 topic 注意点

    错误原因:routing key 设置理解错误

    正确写法如下

    四、 channel.basic_qos - 保证资源,提高效率

    参考博文 - RabbitMQ BasicQos消费者并行处理限制

    假设,单个消费者单位时间处理60条msg,生产者单位时间发送300条msg。

    假设,单个消费者单位时间接收300条msg(默认情况下,消费者不拒绝),超过了最大承受力。

    结果导致,服务器资源耗尽,消费者卡死。

    RabbitMQ - qos(服务质量保证) - 若超过负载的消息推送,在非自动确认消息的前提下,消费者不进行新的消费。即托送消息仍位于queue中,而非交于消费者。

     

    prefetchSize:0 prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer 将 block 掉,直到有消息ackglobal:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别备注:据说 prefetchSize 和 global这两项,rabbitmq没有实现,暂且不研究prefetch_count 在 no_ask=false 的情况下生效即在自动应答的情况下这两个值是不生效注意,这种方法可能会导致 queue 满。当然,这种情况下你可能需要添加更多的 Consumer,或者创建更多的virtualHost来细化你的设计。

    五、错误总结

    credentials = pika.PlainCredentials('test', '123') # 确认证书的创建为服务端存在的用户名和密码

     

     

     

    最新回复(0)