一、确认机制
第一种
public static void getMessage() throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null); DefaultConsumer deliverCallback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); //表示消息消费成功 System.out.println("消息消费成功"); //手工确认,第一个参数是消息的id,第二个参数是批量标识(ture标识批量) //channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,消息从ready状态,变成unacked状态 //channel.basicConsume(ConnectionUtil.QUEUE_NAME, deliverCallback); //消费自动确认,一旦选择这种模式,消费发送到消费端,不管消息是否被消费端正常消费,都会将队列中的消息删除掉 channel.basicConsume(ConnectionUtil.QUEUE_NAME,true, deliverCallback); //手工确认 channel.basicConsume(ConnectionUtil.QUEUE_NAME,false, deliverCallback); }第二种 springboot 注解方式 在配置类定义监听器容器工厂,设置手工确认
//监听器容器工厂 @Bean public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){ SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory(); //首先将连接工厂注入进来 simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory()); //设置手工确认 simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return simpleRabbitListenerContainerFactory; }消费端监听队列
//containerFactory指定监听器容器,来决定监听器是否自动确认 @RabbitListener(queues = "queue4",containerFactory = "simpleRabbitListenerContainerFactory") public void get(String message ){ System.out.println(new String(message.getBody(),"utf-8")); System.out.println("消费者1"); }第三种 javabean方式
@Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(){ SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); simpleMessageListenerContainer.setConnectionFactory(connectionFactory()); //设置消息监听器 simpleMessageListenerContainer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println(new String(message.getBody(),"utf-8")); System.out.println("消息监听器-javaconfig"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }); //添加要监听的队列名集合 simpleMessageListenerContainer.addQueueNames("queue4","queue1"); //添加要监听的队列 //simpleMessageListenerContainer.addQueues(new Queue("queue4")); //设置手工确认 //simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); return simpleMessageListenerContainer; }二、拒绝机制
//containerFactory指定监听器容器,来决定监听器是否自动确认 @RabbitListener(queues = "queue4",containerFactory = "simpleRabbitListenerContainerFactory") public void getMessage(Message message, Channel channel) throws IOException { //这里可以用fastjson解序列化 System.out.println(new String(message.getBody(),"utf-8")); //根据库存业务操作决定消息确认或者退回 if(repertoryOp()){ //channel根据消息配置中的id确认消息,第二个参数表示,是否是批量处理。 //如果是批量处理,可以再多少条之后,提交一次即可,保证最后一笔消息id被提交,前面的也就相当于提交了。 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息确认"); }else{ //消息退回 //这个可以批量退回 //第一个参数:消息id,第二个参数:是否批量退回,第三个参数:是否退回消息队列(否则丢弃) channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); //单条退回 //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息退回"); } System.out.println("消费者4"); } private boolean repertoryOp(){ return true; }三、消息预取 一般正常多个消费端消费队列时,会默认采用轮询机制,把所有队列中的消息直接平均分配到各个消费端。这个时候,有可能不同的消费端消费能力不同,造成有的消费端很快消费完,别的消费端迟迟消费不完的情况。 注意:这种方式,队列中的消息,是直接压到消费端,队列中不存在消息。
对于这种情况,可以采用消息预取的机制,每次消费端从队列中预取一定数量的消息,处理完后再次预取一定量消息再次消费。这时候,消费端只能采用手工确认的方式。 这里生产者模拟生产一百条消息:
public static void sendByExchange(String message) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //生产者适合创建交换机和绑定,至于队列在生产者或者消费者创建都可以 // 声明exchange // channel.exchangeDeclare(ConnectionUtil.EXCHANGE_NAME, BuiltinExchangeType.FANOUT); channel.exchangeDeclare("te", BuiltinExchangeType.FANOUT); //声明队列 // 第二个参数表示队列是否持久化(跟消息持久化不同), // 第三个参数表示是否是排他队列,表示这个队列只能绑定在这个链接上。 // 第四个参数,表示是否队列为空时删除,一般结合第三个参数为ture使用true // 第五个参数为队列配置参数集合 channel.queueDeclare("queue4", true, false, false, null); // channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null); //交换机和队列绑定 channel.queueBind("queue4","te", "direct.key"); // channel.queueBind(ConnectionUtil.QUEUE_NAME, ConnectionUtil.EXCHANGE_NAME, "test"); // channel.basicPublish(ConnectionUtil.EXCHANGE_NAME, "", null, // message.getBytes()); for(int i = 0; i < 100; i++){ channel.basicPublish("te", "direct.key", null, (message + i).getBytes()); System.out.println("发送的信息为:" + message + i); } channel.close(); connection.close(); }消费端有两个消费者消费消息: 消费者1
public static void getMessage() throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); DefaultConsumer deliverCallback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); //表示消息消费成功 System.out.println("消费者1消费消息成功"); //手工确认,第一个参数是消息的id,第二个参数是批量标识(ture标识批量) channel.basicAck(envelope.getDeliveryTag(),false); } }; //手工确认 channel.basicConsume("queue4",false, deliverCallback); }消费者2
public static void getMessage() throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); DefaultConsumer deliverCallback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //模拟较长时间的业务逻辑 Thread.sleep(50); System.out.println(new String(body, "UTF-8")); //表示消息消费成功 System.out.println("消费者2消费消息成功"); //手工确认,第一个参数是消息的id,第二个参数是批量标识(ture标识批量) channel.basicAck(envelope.getDeliveryTag(),false); } }; //手工确认 channel.basicConsume("queue4",false, deliverCallback); }执行结果: 会发现队列中没有消息的任何状态,消费者1会分配到50个消息(0,2,4,6~98),消费者2也会分配到50个消息(1,3,5,…99).但是消费者2会执行较长时间,而消费者1会很快消费完。
如果消费者中增加预取处理:
//消息预取 channel.basicQos(1);会发现消费者1会直接消费掉99条消息,而消费者2只消费一条消息。能够很好充分利用消费端消费能力。 springboot设置消息预取:
@Bean public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory(); simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory); //手动确认消息 simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置消息预取的数量 simpleRabbitListenerContainerFactory.setPrefetchCount(1); return simpleRabbitListenerContainerFactory; }关于这个预取的数量如何设置呢? 我们发现 如果设置为1 能极大的利用客户端的性能(我消费完了就可以赶紧消 费下一条 不会导致忙的很忙 闲的很闲) 但是, 我们每消费一条消息 就要通知一次rabbitmq 然后再取出新的消息, 这样对于rabbitmq的性能来讲 是非常不合理的 所以这个参数要根据业务情况设置。 这里发现预取的数据量从1~2500条,性能越高,可靠性越低,中间值为500。 一般预取一定的数量消息,然后批量确认。
