消息中间件rabbitmq学习(二)-简单队列

    xiaoxiao2022-06-28  211

    一,名词介绍

    Server:又称 Broker,接受客户端的连接,实现AMQP实体服务。Connection:连接,应用程序与 Broker的网络连接。Channel:网络信道,几乎所有的操作都在 Channel中进行, Channel是进行消息读写的通道。客户端可建立多个 hannel,每个 Channel代表一个会话任务。Message:消息,服务器和应用程序之间传送的数据,由 Properties和Body组成。 Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容。Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。个 /irtual Host里面可以有若千个 Exchange和 Queue,同一个 VirtualHost里面不能有相同名称的 Exchange或 Queue。Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列。Binding: Exchange和 Queue?之间的虚拟连接, binding中可以包含 routing keyRouting key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息Queue:也称为 Message Queue,消息队列,保存消息并将它们转发给消费

    二,hello world,简单队列

    /** * @Description:rabbitmq获取连接类 */ import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ClassName: ConnectUtil * @Description: rabbitmq获取连接工厂类 */ public class ConnectUtil { /** * @throws TimeoutException * @throws IOException * @Title: getRMQconnect * @Description: 获取mq连接 * @return Connection 返回类型 * @throws */ public static Connection getRMQconnect() throws IOException, TimeoutException{ //定义连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //定义连接地址 connectionFactory.setHost("192.168.1.123"); //定义连接端口 AMQP协议 5672端口 connectionFactory.setPort(5672); //定义连接的virtualhost connectionFactory.setVirtualHost("/vh_sxrmq"); //用户名 connectionFactory.setUsername("sx_rmq"); //密码 connectionFactory.setPassword("123456"); return connectionFactory.newConnection(); } } /**   * @Title: Constants.java   */   /**    * @Description: 定义通用的常量名称    */ public final class Constants {          //定义简单队列名称     public final static String SIMPLE_QUEUE_NAME = "SIMPLE_QUEUE_NAME";          //定义work queue队列名称     public final static String WORK_QUEUE_NAME = "WORK_QUEUE_NAME"; }   /** * @Title: Send.java */ import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.suxu.rmq.common.Constants; import com.suxu.rmq.util.ConnectUtil; /** * @ClassName: Send * @Description: 消息生产者 */ public class Send { /** * @throws TimeoutException * @throws IOException * @Title: main * @Description: 发送消息 * @param args * @return void 返回类型 * @throws */ public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectUtil.getRMQconnect(); Channel channel = conn.createChannel(); channel.queueDeclare(Constants.SIMPLE_QUEUE_NAME, false, false, false, null); String msg = "hello world!"; channel.basicPublish("", Constants.SIMPLE_QUEUE_NAME, null, msg.getBytes()); System.out.println(Constants.SIMPLE_QUEUE_NAME+" send: "+msg); channel.close(); conn.close(); } } /** * @Title: Recv.java */ import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.AMQP.BasicProperties; import com.suxu.rmq.common.Constants; import com.suxu.rmq.util.ConnectUtil; /** * @ClassName: Recv * @Description: 消息消费者 */ public class Recv { /** * @throws InterruptedException * @throws ConsumerCancelledException * @throws ShutdownSignalException * @throws TimeoutException * @throws IOException * @Title: main * @Description: 消费消息 * @param args * @return void 返回类型 * @throws */ public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { Connection conn = ConnectUtil.getRMQconnect(); Channel channel = conn.createChannel(); channel.queueDeclare(Constants.SIMPLE_QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // TODO Auto-generated method stub super.handleDelivery(consumerTag, envelope, properties, body); String msg = new String(body,"utf-8"); System.out.println(Constants.SIMPLE_QUEUE_NAME + " new api recv: "+msg); } }; channel.basicConsume(Constants.SIMPLE_QUEUE_NAME, true, consumer); } }

    简单队列不足:耦合性高,生产者和消费者一对一

     

     

     

     


    最新回复(0)