DLX(Dead-Letter-Exchange),当信息在一个队列变成死信(Dead message)后,能被重新发送到DLX中,绑定DLX的队列称之为死信队列。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。
消息变成死信队列可以是:
消息过期消息被拒绝,requeue参数为false队列达到最大长度首先创建用于获取Connection对象
package com.dfyang.rabbitmq; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class RabbitConnectionFactory { private static final String IP_ADDRESS = "192.168.195.150"; private static final int PORT = 5672; private static final String USERNAME = "root"; private static final String PASSWORD = "151310"; private static ConnectionFactory factory = new ConnectionFactory(); static { factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); } public static Connection getConnection() { Connection connection = null; try { connection = factory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return connection; } }执行代码 5s后因超时而进入死心队列
创建生产者,这里将沿用上面的死信队列
package com.dfyang.rabbitmq.dead; import com.dfyang.rabbitmq.RabbitConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.HashMap; import java.util.Map; public class RejectProduct { public static void main(String[] args) throws Exception { Connection connection = RabbitConnectionFactory.getConnection(); Channel channel = connection.createChannel(); //创建测试超时的Exchange及Queue channel.exchangeDeclare("reject.exchange", "direct"); Map<String, Object> arguments = new HashMap<>(); //绑定DLX arguments.put("x-dead-letter-exchange", "dlx.exchange"); //绑定发送到DLX的RoutingKey arguments.put("x-dead-letter-routing-key", "dlx.routingKey"); channel.queueDeclare("reject.queue", true, false, false, arguments); channel.queueBind("reject.queue", "reject.exchange", "reject.routingKey"); //发布一条消息 channel.basicPublish("reject.exchange", "reject.routingKey", null, "生产者发送一条消息".getBytes()); channel.close(); connection.close(); } }执行一次,reject.queue有一条待消费消息
下面是获取一条消息后拒绝
package com.dfyang.rabbitmq.dead; import com.dfyang.rabbitmq.RabbitConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.GetResponse; public class RejectConsumer { public static void main(String[] args) throws Exception { Connection connection = RabbitConnectionFactory.getConnection(); Channel channel = connection.createChannel(); //获取一条消息,设置为非自动签收 GetResponse response = channel.basicGet("reject.queue", false); //传入消息编号和是否重回队列 channel.basicReject(response.getEnvelope().getDeliveryTag(), false); channel.close(); connection.close(); } }执行上述代码 还有另一个拒绝方法:channel.basicNack(long deliveryTag, boolean multiple, boolean requeue); multiple:为false表示拒绝deliveryTag这条消息,为true是表示拒绝deliveryTag之前所有未被当前消费者确认的消息
执行上述代码,发现有5条消息被送往了死信队列