RabbitMQ:四种ExChange用法

    xiaoxiao2022-05-19  124

    RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。 ExChange和Queue之前是多对多的关系。 RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。 一、fanout 当向一个fanout发送一个消息时,RoutingKey的设置不起作用。 消息会被发送给同一个交换机下的所有队列,每个队列接收到的消息是一样的; 一个队列内有所有消费者(包含那些并没有相应RoutingKey的 消费者 ),将平分队列接收到的消息 ----------------消息生产者---------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null); String message = "hello world! "; for(int i=0;i<100;i++) { channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes()); } System.out.println("Sent msg finish"); channel.close(); connection.close(); ----------------消息消费者 ---------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null); // 声明 队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //绑定路由和队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routkey2", null); System.out.println(" Waiting for msg...."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope,   AMQP.BasicProperties properties, byte[] body) { String message = ""; try { message = new String(body, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (Throwable ex) { ex.printStackTrace(); } System.out.println("Received msg='" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); 二、direct 当向一个direct发送一个消息时, 消息会被发送给同一个交换机下的 拥有相应RoutingKey的队列, 每个队列接收到的消息是一样的; 一个队列内拥有相应RoutingKey的消费者,将平分队列接收到的消息。 ----------------消息生产者 ---------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); String message = "hello world! "; for(int i=0;i<100;i++) { channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message+i).getBytes()); } System.out.println("Sent msg is '" + message + "'"); channel.close(); connection.close(); ----------------消息消费者 ---------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); //声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //绑定路由和队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null); System.out.println(" Waiting for msg...."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String message = ""; try { message = new String(body, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (Throwable ex) { ex.printStackTrace(); } System.out.println("1 Received msg='" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); 三、topic 当向一个topic发送一个消息时 消息会被发送给同一个交换机下的 拥有相应RoutingKey的队列, 每个队列接收到的消息是一样的; 一个队列内有所有消费者(包含那些并没有相应RoutingKey的 消费者 ),将平分队列接收到的消息 ----------------消息生产者 ---------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null); String message = "hello world! "; // int i=101; for (int i = 0; i < 100; i++) { channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message + i).getBytes()); } System.out.println("Sent msg is '" + message + "'"); channel.close(); connection.close(); ----------------消息消费者 ---------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null); //声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //绑定路由和队列// 把队列绑定到路由上并指定headers channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null); System.out.println("1 Waiting for msg...."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,  byte[] body) { String message = ""; try { message = new String(body, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (Throwable ex) { ex.printStackTrace(); } System.out.println("1 Received msg='" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); 四、headers 当向一个headers发送一个消息时 消息会被发送给同一个交换机下的 拥有相应RoutingKey或者headers的队列, 每个队列接收到的消息是一样的; 一个队列内有所有消费者(包含那些并没有相应RoutingKey或headers的 消费者 ),将平分队列接收到的消息 ----------------消息生产者 ---------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "headers", true, false, null); // 设置消息头键值对信息 Map<String, Object> headers = new Hashtable<String, Object>(); headers.put("name", "jack"); headers.put("age", 31); Builder builder = new Builder(); builder.headers(headers); String message = "hello world! "; // int i=101; for (int i = 0; i < 100; i++) { channel.basicPublish(EXCHANGE_NAME, "routingkey1", builder.build(), (message + i).getBytes()); } System.out.println("Sent msg is '" + message + "'"); channel.close(); connection.close(); ----------------消息消费者 ---------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost(S_RabbitMQ.QUEUE_IP); factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "headers", true, false, null); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 设置消息头键值对信息 Map<String, Object> headers = new Hashtable<String, Object>(); // 这里x-match有两种类型 // all:表示所有的键值对都匹配才能接受到消息 // any:表示只要有键值对匹配就能接受到消息 headers.put("x-match", "all"); headers.put("name", "jack"); headers.put("age", 30); // 把队列绑定到路由上并指定headers channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", headers); System.out.println(" Waiting for msg...."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,  byte[] body) throws IOException { System.out.println("Received start --------------"); for (Entry<String, Object> entry : properties.getHeaders().entrySet()) { System.out.println(entry.getKey() + "=" + entry.getValue()); } String message = new String(body, "UTF-8"); System.out.println("msg='" + message + "'"); System.out.println("Received end --------------"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); 相关资源:七夕情人节表白HTML源码(两款)

    最新回复(0)