SpringBoot RabbitMQ 延时队列 (ttl)

    xiaoxiao2025-08-07  4

    流程:[ttl.exchange] --<ttl.routing-kye>--[ttl.queue(此queue指定到期后的exchage)]  -- >[普通exchange] --<普通routing-key> --> [普通queue]

    RabbitMQConfig

    package com.example.rabbitmqttl.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { public static final String NORMAL_QUEUE_NAME = "normal.queue"; public static final String NORMAL_EXCHANGE_NAME = "normal.exchange"; public static final String NORMAL_ROUTING_KEY = "normal.routing.key"; public static final String TTL_QUEUE_NAME = "ttl.queue"; public static final String TTL_EXCHANGE_NAME = "ttl.exchange"; public static final String TTL_ROUTING_KEY = "ttl.routing.key"; // 普通的 @Bean public Queue queue(){ return new Queue(NORMAL_QUEUE_NAME); } @Bean public DirectExchange directExchange(){ return new DirectExchange(NORMAL_EXCHANGE_NAME); } @Bean public Binding binding() { return BindingBuilder. bind(queue()). to(directExchange()) .with(NORMAL_ROUTING_KEY); } // ttl的 @Bean public Queue ttlQueue(){ return QueueBuilder.durable(TTL_QUEUE_NAME) .withArgument("x-dead-letter-exchange", NORMAL_EXCHANGE_NAME)// 到期后转发的交换机 .withArgument("x-dead-letter-routing-key", NORMAL_ROUTING_KEY)// 到期后转发的交换机对应的路由key .build(); } @Bean public DirectExchange ttlDirectExchange(){ return new DirectExchange(TTL_EXCHANGE_NAME); } @Bean public Binding ttlBinding() { return BindingBuilder. bind(ttlQueue()). to(ttlDirectExchange()) .with(TTL_ROUTING_KEY); } }

     


    ISendService

    package com.example.rabbitmqttl.service; import com.example.rabbitmqttl.config.RabbitMQConfig; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; @Component @RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_NAME) public class ReceiverService { @RabbitHandler public void handle(String msg){ System.out.println("receiver:" + msg + "_" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); } }

    SendServiceImpl

    package com.example.rabbitmqttl.service.impl; import com.example.rabbitmqttl.config.RabbitMQConfig; import com.example.rabbitmqttl.service.ISendService; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; @Service public class SendServiceImpl implements ISendService { @Autowired private RabbitTemplate rabbitTemplate; @Override public void send() { String msg = "hello" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME); System.out.println(" send:" + msg); rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME, RabbitMQConfig.TTL_ROUTING_KEY, msg, message -> { message.getMessageProperties().setExpiration(String.valueOf(5000));// 单位毫秒 return message; }); } }

    ReceiverService

    package com.example.rabbitmqttl.service; import com.example.rabbitmqttl.config.RabbitMQConfig; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; @Component @RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_NAME) public class ReceiverService { @RabbitHandler public void handle(String msg){ System.out.println("receiver:" + msg + "_" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); } }

    RabbitmqTtlApplication

    package com.example.rabbitmqttl; import com.example.rabbitmqttl.service.ISendService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController @SpringBootApplication public class RabbitmqTtlApplication { @Autowired private ISendService iSendService; @GetMapping("/send") public Object send(){ iSendService.send(); return 0; } public static void main(String[] args) { SpringApplication.run(RabbitmqTtlApplication.class, args); } }

    yml

    spring: rabbitmq: host: 192.168.1.104 virtual-host: / port: 5672 username: root password: 1
    send:hello2019-05-26T22:40:04.291 receiver:hello2019-05-26T22:40:04.291_2019-05-26T22:40:09.319

     

    最新回复(0)