所需jar包,在Rabbitmq中版本不匹配是很常见的问题,所以如果版本不匹配,需要及时进行更换操作,参考RabbitMQ版本3.6.6
序号jar1amqp-client-4.1.1.jar2log4j-1.2.17.jar3slf4j-1.7.21.jarRabbitMQ Management创建队列选项说明 Name:队列的名字必须唯一,不可重复
Durability:消息保存类型
名称类型说明Durable持久化状态在队列中未被消费者使用的信息,在RabbitMQ关闭后,再重启信息一直在Transient瞬时状态在队列中未被消费者使用的信息,在RabbitMQ关闭后,再重启信息不存在Auto delete:自动删除,建议选yes
此次创建一个名称为sjw.queue允许自动删除的持久化状态队列 大多数情况下并不需要自己进行受动化创建,可以用程序进行自动创建,视个人清空而定
消费者 MessageProducer.class
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class MessageProducer { //RabbitMQ服务所在地址 public final static String HOST="192.168.74.145"; //RabbitMQ端口 public final static int PORT=5672; //RabbitMQ登陆用户名 public final static String USERNAME="sjw"; //RabbitMQ登陆密码 public final static String PASSWORD="haha"; //队列名称 public final static String QUEUE_NAME="sjw.queue"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); //获取连接 Connection connection=factory.newConnection(); //获取信道,可以有多个信道 Channel channel=connection.createChannel(); //信道指定队列已经队列设置 //queueDeclare(名字,是否持久化,独占的queue, 不使用时是否自动删除,其他参数) channel.queueDeclare(QUEUE_NAME,true,false,true,null); //在开始前获取一下当前时间,方便统计消息全部进入队列所需的时间 long start=System.currentTimeMillis(); for (int i=0;i<1000;i++){ String message="sjw"+i; //basicPublish(exchange,队列名称,属性,参数.getbyte()) channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); } //结束时间 long end=System.currentTimeMillis(); //输出所需时间 System.out.println("进入队列总共耗时:"+(end-start)); //关闭信道 channel.close(); //关闭连接 connection.close(); } }切记端口是5672,访问port不是15672,15672是api和管理界面的port.
查看队列
成功进入队列!!!
MessageConsumer.class
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class MessageConsumer { //RabbitMQ服务所在地址 public final static String HOST="192.168.74.145"; //RabbitMQ端口 public final static int PORT=5672; //RabbitMQ登陆用户名 public final static String USERNAME="sjw"; //RabbitMQ登陆密码 public final static String PASSWORD="123"; //队列名称 public final static String QUEUE_NAME="lgq.queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); //获取连接 Connection connection = factory.newConnection(); //获取信道,可以有多个信道 Channel channel = connection.createChannel(); //信道设置,必须与要对应接收的队列设置一模一样,有差别则无法接收你想要的信道 channel.queueDeclare(QUEUE_NAME, true, false, true, null); QueueingConsumer consumer = new QueueingConsumer(channel); //信道交给consumer进行内容接收处理 channel.basicConsume(QUEUE_NAME,consumer); while (true) { //消费者程序运行开着 如果生产者新增了数据会自动获取 // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[消息]" + message); } } }运行结果
关闭程序,队列消失