左边的Client向右边的Client发送消息,流程: 第一:获取Conection 第二:获取Channel 第三:定义Exchange,Queue 第四:使用一个RoutingKey将Queue Binding到一个Exchange上 第五:通过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上, 第六:Consumer在接收时也是获取connection,接着获取channel,然后指定一个Queue,到Queue上取消息,它对Exchange,RoutingKey及如何Binding都不关心,到对应的Queue上去取消息就行了。
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式 (1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.
(2)并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.
(3)消息队列 :引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理
由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
这种做法有一个缺点:
当库存系统出现故障时,订单就会失败。(这样马云将少赚好多好多钱^ ^)订单系统和库存系统高耦合. 引入消息队列
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,获取下单消息,进行库操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失(马云这下高兴了).流量削峰一般在秒杀活动中应用广泛 场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。 作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)、、、1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面. 2.秒杀业务根据消息队列中的请求信息,再做后续处理.
添加依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>生产者代码:
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //建立到代理服务器到连接(TCP连接) Connection conn = factory.newConnection(); //获得信道(信道复用TCP连接) Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); // "direct"将消息保存到队列的方式 String routingKey = "hola"; //发布消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } }消费者代码:
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //建立到代理服务器到连接 Connection conn = factory.newConnection(); //获得信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //声明队列 String queueName = channel.queueDeclare().getQueue(); String bindingKey = "hola"; //绑定队列,通过键 hola 将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, bindingKey); while(true) { //消费消息 boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); long deliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); // 手动确认消息,防止消息丢失 System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } }application_context_rabbit.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <context:property-placeholder location="classpath:application.properties"/> <!-- RabbitMQ start --> <!-- 连接配置 --> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 队列声明 --> <!-- durable 是否持久化 exclusive 仅创建者可以使用的私有队列,断开后自动删除 auto-delete 当所有消费端连接断开后,是否自动删除队列 --> <rabbit:queue id="orderinfomation_queue" name="${mq.queue}_orderinfomation" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="payinfomation_queue" name="${mq.queue}_payinfomation" durable="true" auto-delete="false" exclusive="false" /> <!-- 交换机定义 --> <!-- 交换机:一个交换机可以绑定多个队列,一个队列也可以绑定到多个交换机上。 如果没有队列绑定到交换机上,则发送到该交换机上的信息则会丢失。 direct模式:消息与一个特定的路由器完全匹配,才会转发 topic模式:按模式匹配 --> <rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false"> <rabbit:bindings> <!-- 设置消息Queue匹配的pattern (direct模式为key) --> <rabbit:binding queue="orderinfomation_queue" pattern="orderinfomation"/> <rabbit:binding queue="payinfomation_queue" pattern="payinfomation"/> </rabbit:bindings> </rabbit:topic-exchange> <bean name="orderinfoService" class="com.youfan.service.impl.OrderinfoService"></bean> <bean name="payinfoService" class="com.youfan.service.impl..PayinfoService"></bean> <!-- 配置监听 消费者 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <!-- queues 监听队列,多个用逗号分隔 ref 监听器 --> <rabbit:listener queues="orderinfomation_queue" ref="orderinfoService"/> <rabbit:listener queues="payinfomation_queue" ref="payinfoService"/> </rabbit:listener-container> </beans>application.properties:
mq.host=127.0.0.1 mq.username=guest mq.password=guest mq.port=5672 mq.queue=ms监听的消费者:
public class PayinfoService implements MessageListener { public void onMessage(Message message) { System.out.println("消息消费者 = " + message.toString()); } } public class OrderinfoService implements MessageListener { public void onMessage(Message message) { System.out.println("消息消费者 = " + message.toString()); } }在application.yml中添加RabbitMQ地址:
spring: rabbitmq: host: 192.168.56.101 username: leyou password: leyou virtual-host: /leyou在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。
@Component public class Listener { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "spring.test.queue", durable = "true"), exchange = @Exchange( value = "spring.test.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC ), key = {"#.#"})) public void listen(String msg){ System.out.println("接收到消息:" + msg); } }@Componet:类上的注解,注册到Spring容器
@RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
value:这个消费者关联的队列。值是@Queue,代表一个队列
exchange:队列所绑定的交换机,值是@Exchange类型
key:队列和交换机绑定的RoutingKey
类似listen这样的方法在一个类中可以写多个,就代表多个消费者。
Spring最擅长的事情就是封装,把他人的框架进行封装和整合。Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:
红框圈起来的是比较常用的3个方法,分别是:
指定交换机、RoutingKey和消息体指定消息指定RoutingKey和消息,会向默认的交换机发送消息