RabbitMQ获取消息源码解析

    xiaoxiao2021-04-16  209

    一、问题场景描述 利用springboot+rabbitmq结合,在消费端消费时,发现直接接受String类型的消息,发现消息是字节数组

    @RabbitListener(queues = "testQueue") public void get(String message ){ System.out.println(message); System.out.println("消费者1"); }

    发送消息“123”,执行结果:

    34,49,50,51,34 消费者1

    二、源码跟踪 利用idea的debug代码栈, 发现message为一个字节数组内容,那就依此倒序查找在哪里,这个字符串变成字节数组了。 如中间这步,还是字节数组 走到这一步,发现有消息内容对象 查看关键代码

    org.springframework.messaging.Message<?> message = this.toMessagingMessage(amqpMessage);

    一直进去到消息转化器接口实现:SimpleMessageConverter 发现消费消息时,消息转化器转化规则,跟消息内容类型是否是"text"或者是“application/x-java-serialized-object”有关。 如果类型为“text”开头,就表示是文本内容,直接获取消息内容,按照“utf-8”,转成字符串 如果是“application/x-java-serialized-object”,就需要获取内容输入流转成对象流,再解序列化。 如果没有指定,就直接返回生产者生产的消息内容(字节数组) 三、RabbitMQ消息转化器设置 在RabbitTemplate定义时设置

    //消息转化器 // rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setMessageConverter(new MessageConverter() { @Override public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException { //按照上面源码流程,可以设置内容类型格式,保证springboot解析数据时,正常解析 //messageProperties.setContentType("text/xml"); //messageProperties.setContentEncoding("utf-8"); //当然也可以在发送时,利用序列化工具进行序列化 Message message = new Message(JSON.toJSONBytes(o),messageProperties); System.out.println("调用了消息解析器"); return message; } @Override public Object fromMessage(Message message) throws MessageConversionException { //相应的在接受消费时,再利用序列化工具解序列化 return null; } });

    项目地址:github


    最新回复(0)