RabbitMQ死信队列

    xiaoxiao2025-05-15  52

    死信队列

    DLX(Dead-Letter-Exchange),当信息在一个队列变成死信(Dead message)后,能被重新发送到DLX中,绑定DLX的队列称之为死信队列。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。

    消息变成死信队列可以是:

    消息过期消息被拒绝,requeue参数为false队列达到最大长度

    下面将介绍导致消息进入死信队列的三种情况

    首先创建用于获取Connection对象

    package com.dfyang.rabbitmq; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class RabbitConnectionFactory { private static final String IP_ADDRESS = "192.168.195.150"; private static final int PORT = 5672; private static final String USERNAME = "root"; private static final String PASSWORD = "151310"; private static ConnectionFactory factory = new ConnectionFactory(); static { factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); } public static Connection getConnection() { Connection connection = null; try { connection = factory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return connection; } }

    (1)消息因超时而进入死信队列

    package com.dfyang.rabbitmq.dead; import com.dfyang.rabbitmq.RabbitConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.HashMap; import java.util.Map; public class TimeoutDLX { public static void main(String[] args) throws Exception { Connection connection = RabbitConnectionFactory.getConnection(); Channel channel = connection.createChannel(); //创建DLX及死信队列 channel.exchangeDeclare("dlx.exchange", "direct"); channel.queueDeclare("dlx.queue", true, false, false, null); channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routingKey"); //创建测试超时的Exchange及Queue channel.exchangeDeclare("timeout.exchange", "direct"); Map<String, Object> arguments = new HashMap<>(); //过期时间5s arguments.put("x-message-ttl", 5000); //绑定DLX arguments.put("x-dead-letter-exchange", "exchange.dlx"); //绑定发送到DLX的RoutingKey arguments.put("x-dead-letter-routing-key", "routingKey"); channel.queueDeclare("timeout.queue", true, false, false, null); channel.queueBind("timeout.queue", "timeout.exchange", "timeout.routingKey"); //发布一条消息 channel.basicPublish("timeout.exchange", "timeout.routingKey", null, "该消息将在5s后超时".getBytes()); channel.close(); connection.close(); } }

    执行代码 5s后因超时而进入死心队列

    (2)消息因被拒绝而进入死信队列

    创建生产者,这里将沿用上面的死信队列

    package com.dfyang.rabbitmq.dead; import com.dfyang.rabbitmq.RabbitConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.HashMap; import java.util.Map; public class RejectProduct { public static void main(String[] args) throws Exception { Connection connection = RabbitConnectionFactory.getConnection(); Channel channel = connection.createChannel(); //创建测试超时的Exchange及Queue channel.exchangeDeclare("reject.exchange", "direct"); Map<String, Object> arguments = new HashMap<>(); //绑定DLX arguments.put("x-dead-letter-exchange", "dlx.exchange"); //绑定发送到DLX的RoutingKey arguments.put("x-dead-letter-routing-key", "dlx.routingKey"); channel.queueDeclare("reject.queue", true, false, false, arguments); channel.queueBind("reject.queue", "reject.exchange", "reject.routingKey"); //发布一条消息 channel.basicPublish("reject.exchange", "reject.routingKey", null, "生产者发送一条消息".getBytes()); channel.close(); connection.close(); } }

    执行一次,reject.queue有一条待消费消息

    拒取一个消息

    下面是获取一条消息后拒绝

    package com.dfyang.rabbitmq.dead; import com.dfyang.rabbitmq.RabbitConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.GetResponse; public class RejectConsumer { public static void main(String[] args) throws Exception { Connection connection = RabbitConnectionFactory.getConnection(); Channel channel = connection.createChannel(); //获取一条消息,设置为非自动签收 GetResponse response = channel.basicGet("reject.queue", false); //传入消息编号和是否重回队列 channel.basicReject(response.getEnvelope().getDeliveryTag(), false); channel.close(); connection.close(); } }

    执行上述代码 还有另一个拒绝方法:channel.basicNack(long deliveryTag, boolean multiple, boolean requeue); multiple:为false表示拒绝deliveryTag这条消息,为true是表示拒绝deliveryTag之前所有未被当前消费者确认的消息

    (3)因队列达到最大长度而进入死信队列

    package com.dfyang.rabbitmq.dead; import com.dfyang.rabbitmq.RabbitConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.HashMap; import java.util.Map; public class QueueMaxLengthDLX { public static void main(String[] args) throws Exception { Connection connection = RabbitConnectionFactory.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("max.exchange", "direct"); Map<String, Object> arguments = new HashMap<>(); //设置Queue最大长度为5 arguments.put("x-max-length", 5); //绑定DLX arguments.put("x-dead-letter-exchange", "dlx.exchange"); //绑定发送到DLX的RoutingKey arguments.put("x-dead-letter-routing-key", "dlx.routingKey"); channel.queueDeclare("max.queue", true, false, false, arguments); channel.queueBind("max.queue", "max.exchange", "max.routingKey"); //发布10条消息 for (int i = 0; i < 10; i++) channel.basicPublish("max.exchange", "max.routingKey", null, "发送".getBytes()); channel.close(); connection.close(); } }

    执行上述代码,发现有5条消息被送往了死信队列

    最新回复(0)