Springboot与消息

    xiaoxiao2022-07-03  117

    消息的优点: 1.使用异步处理的方式,提高系统的通信能力

    例如: 1.当我们用户注册时信息输入完成时,系统需要给用户发送一封邮件,然后发送注册短信.假如每一步需要50ms,如果是系统同步的情况下,那么整个过程需要150ms 2.使用多线程的方式,发送邮件和发送验证短信两个操作是并发执行的,这样总时长降低到了100ms 3.当用户把注册信息写入到数据库之后,只需要将后来要用的相关信息写入消息队列,写入消息队列的时间级短,假如花费5ms,只要写入消息队列就对用户进行响应,提示注册成功,然后发送邮件和发送短信都可以通过异步读取消息队列完成,整体耗时为55ms

    2. 应用解耦

    1.例如,如果我们系统的订单和库存系统都写在一个应用中耦合起来,每次下单都会去库存系统中减库存,这样系统的耦合度是非常高的,维护也是非常麻烦的 解决方法:我们可以使用微服务的方式把订单和库存系统都单独抽取出来,然后引入缓存队列,只要下订单,然后把下单的信息写入到消息队列,库存系统通过订阅消息队列,只要读取到消息队列中的订单信息,就会执行相应的库存系统中的相应的方法.这样通过消息队列传递数据就会降低系统饿耦合度

    3. 流量消峰 特别是在限时秒杀的情景中,使用的最多

    例如, 假如我们现在的秒杀系统中的库存只有10K,然后有100K的用户在剁手一样的抢商品,也就是不停的点击,然后会不停的提交SQL给后台的服务器,服务器的压力可想而知. 解决方法: 我们可以让用户的请求先进入消息队列,然后给消息队列设置长度,只能存储10K个请求,然后没有进入到消息队列的请求一致认为无效的.可以抛弃请求,然后给用户响应秒杀失败.然后我们的秒杀业务,只需要处理消息队列中的请求即可,这样可以极大的降低系统的压力.

    发送消息的流程:

    我们需要把消息发送给消息代理(也就是消息中间件),消息代理接管消息之后,会把消息发送到指定目的地.发送消息有以下两种模式:

    1.点对点: 消息发送者发送一条消息之后,消息会被放到消息代理中,每一个消息都可以有多个接收者,但是只有一个接受者,一单吗被接受,其他的接收者就拿不到这个消息了. 2.发布订阅模式 当发布者把消息发布到一个主题中,这个主题的所有的订阅者(接收者)都可以在消息到达时同时接收到这个消息

    无论使用JMS还是AMQP,Spring都是支持的.

    RabbitMQ

    优点:稳定性和可靠性非常的高 总结(消息发送到接受的具体流程):

    1.消息生产者生产的消息,会发送给消息队列 服务器(broker) 内的一个 虚拟主机(Virtual-Host) 的其中一个 交换器(Exchange),每一个服务器内部会有多个虚拟主机,和多个消息队列(Queue),它们之间互相隔离,互不干涉,虚拟主机和队列之间是多对多的关系,它们之间是通过 绑定(Binding) 进行连接的,交换器其实就是一个路由表,它会根据消息头中的 路由键(routing-key) 确定把消息路由到哪个队列中 2.消费者为了获取消息,需要和消息队列之间建立一条TCP连接(Connection),为了节省资源,引入了 信道(Channel),信道是建立在真实的TCP连接中的虚拟连接,多条信道复用一条TCP连接. 3.建立起连接之后,消费者就能够从消息队列中获取消息,消息一旦从队列中被消费者取出,就不复存在了.这就是整个的流程

    消息的产生和接收的过x程中,Exchange和Binding是核心部分,它们两个来决定消息最终会被路由到哪个队列.

    direct exchange:

    直连型的交换器,属于是点对点类型的,只有消息头中的路由键和队列绑定的键值相同时,才会把消息路由给匹配的队列.

    fanout exchange :

    fanout其实说白了就是广播类型的交换器,它会无视消息的路由键,只要消息到达交换器,他都会给和他绑定的队列发一份消息,在三种类型中,它是发送消息最快的.订阅和发布就是它的参考实现

    topic exchange:

    这种类型的交换器更为复杂, **我们可以使用通配符对消息的路由键进行模糊匹配,**只把消息路由到匹配的队列.

    #可以匹配0个或者多个单词 只匹配一个单词,他们俩都是以单词为分隔单位的 步骤: 1.通过docker给Linux上安装rabbitmq镜像

    docker pull rabbitmq:3-management

    2.镜像下载完成之后,启动rabbitmq

    docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq rabbitmq

    rabbitmq暴露两个端口,15672是访问web页面的端口

    3.测试rabbit目前的管理页面,用户名和密码初始值都是guest 登录之后的页面: 如何在项目中发送和接受消息的步骤

    /** * rabbitmq的自动配置原理(rabbitautoconfiguration) * 1. * 2.自动配置了连接工程ConnectionFactory * 3.所有的配置都通过spring.rabbitmq封装在了RabbitProperties类中. * 4.rabbitTemplate组件 : 就是用来给rabbitMQ发送和接受消息的 * 5.AmqpAdmin : rabbitMQ的系统管理功能组件,它不具备发送和接受消息的功能,但是 * 可以给我们 创建,声明一个队列,创建一个交换器等功能. * */

    1.连接上rabbitmq

    spring: rabbitmq: host: 192.168.124.178 username: guest password: guest port: 5672 # 默认的连接端口就是5672,不写也行. virtual-host: / # 虚拟主机的地址,默认的就是/, 不写也行.

    2.使用RabbitTemplate就可以发送和接收消息

    @RunWith(SpringRunner.class) @SpringBootTest public class SpringbootGaoji02AmqpApplicationTests { @Autowired RabbitTemplate rabbitTemplate; /* * 点对点式发送消息 */ @Test public void contextLoads() { //这样写的好处,就是message需要我们自己定义,可以定制消息体内容,和消息头 //rabbitTemplate.send(exchange,routingKey,message); //一般是用下面这种形式,object就是你要发送的数据对象,默认会被当成消息体, // 自动序列化发送给rabbitmq. //rabbitTemplate.convertAndSend(exchange,routingKey,object); //map用来保存我们消息体中需要携带的数据 Map<String,Object> map = new HashMap<String,Object>(); map.put("msg","这是第一个消息队列"); map.put("data", Arrays.asList("helloworld",123456,true)); //对象默认被序列化以后发送 rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",map); } /** * 接收消息 */ @Test public void receive(){ /* rabbitTemplate.receive(); 它会把消息对象接收,带有消息头和消息体 rabbitTemplate.receiveAndConvert(); 它会把序列化之后的消息转化为对象,也就是 它会反序列化消息 */ Object o = rabbitTemplate.receiveAndConvert("atguigu.news"); System.out.println(o.getClass()); System.out.println(o); } //如何以json的数据格式序列化消息并发送 /** * 广播模式 */ @Test public void sendMsg(){ //传入的Object是自定义类型的对象 rabbitTemplate.convertAndSend("exchange.fanout","",new Book("三国演义","罗贯中")); } }

    3.默认使用的java的序列化方式,如何切换为json格式的序列化器呢? 原因:

    rabbitMQ默认采用的是java的系列化的方式.使用的是SimpleMessageConverter

    如何以json的格式序列化并且发送消息呢?

    我们只需要给容器中添加Json序列化器的组件就行了

    @Configuration public class AMQPConfig { /* 配置消息转换器,使用json的格式序列化消息, 替换默认的java的序列化器 */ @Bean public MessageConverter messageConverter(){ //它会替换默认的java的系列化器 return new Jackson2JsonMessageConverter(); } }

    使用json格式的序列化器: 在实际中,我们经常需要监听消息队列,例如,当订单信息进入消息队列后,库存系统就要调用相应的方法,来更改库存.库存系统就需要来监听消息队列. 可以使用 @RabbitListener

    @Service public class BookService { /** * @RabbitListener(queues = "atguigu.news") * 该注解会监听指定的消息队列,只要该队列有消息进入,被标注饿方法就会被调用 * 该注解一定发挥作用的前提就是开启基于注解的rabbitMQ模式 * 也就是要和@EnableRabbit一起使用 * @param book */ //接受反序列化之后的消息对象 @RabbitListener(queues = "atguigu.news") public void receive(Book book){ System.out.println("收到消息 "+book); } //接受消息对象 @RabbitListener(queues = "atguigu") public void receive01(Message msg){ //获取消息体 System.out.println(msg.getBody()); //获取消息头信息 System.out.println(msg.getMessageProperties()); } }

    AmqpAdmin组件的使用 作用: 创建和删除 queue,exchange ,binding

    只要是declare开头的都是创建,delete开头的都是删除.

    我们可以创建多种类型的Exchange,包括用户自定义的Exchange

    @Autowired AmqpAdmin admin; @Test public void creatExchange(){ //创建一个direct类型的Exchange admin.declareExchange(new DirectExchange("amqp.exchange")); //创建一个queue admin.declareQueue(new Queue("amqpadmin.queue",true)); //创建绑定规则 admin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqp.exchange","amqp.hahah",null)); }

    注意: Binding 的创建

    public Binding(String destination, DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments) //destination : 队列的名字 //DestinationType : 要绑定的类型 //exchange : 交换机的名字 //routingKey : 路由键 //Map<String, Object> arguments : 需要携带的参数,没有写null
    最新回复(0)