P2P :(点对点)消息域使用 queue 作为 Destination,消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通过使用 MessageConsumer.set MessageListener() 注册一个 MessageListener 实现异步接收。多个 Consumer 可以注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,然后由该 Consumer 来确认消息。并且在这种情况下,Provider 对所有注册的 Consumer 以轮询的方式发送消息。
Pub/Sub:(发布/订阅,Publish/Subscribe)消息域使用 topic 作为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。
1. 双击 bin/win64(win32)/activemq.bat 启动服务。
2. 默认 ActiveMQ 启动时,启动了内置的 jetty 服务器,提供一个用于监控ActiveMQ的admin应用
3. 服务启动地址:http://127.0.0.1:8161/admin/
4. 默认用户名/密码:admin/admin
生产者:
package com.zth.p2p; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息生产者 * @author zth * @Date 2019-05-25 17:01 */ public class JMSProducer { // 默认连接用户名 private static final String USENAME = ActiveMQConnection.DEFAULT_USER; // 默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认连接地址 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args){ // 连接工厂 ConnectionFactory connectionFactory; // 连接 Connection connection = null; // 会话 (接受或者发送消息的线程) Session session; // 消息目的地 Destination destination; // 消息的生产者 MessageProducer messageProducer; // 实例化工厂 connectionFactory = new ActiveMQConnectionFactory(USENAME,PASSWORD,BROKERURL); try { // 获取连接 connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建会话 /** * 参数1:是否启动事务 * 参数2:消息确认模式[ * AUTO_ACKNOWLEDGE = 1 自动确认 * CLIENT_ACKNOWLEDGE = 2 客户端手动确认 * DUPS_OK_ACKNOWLEDGE = 3 自动批量确认 * SESSION_TRANSACTED = 0 事务提交并确认 * ] */ session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); // 创建消息队列 destination = session.createQueue("FirstQueue"); // 创建消息生产者 messageProducer = session.createProducer(destination); // 发送消息 sendMessage(session,messageProducer); session.commit(); // 关闭资源 messageProducer.close(); session.close(); } catch (JMSException e) { e.printStackTrace(); }finally { if (connection != null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发送消息 */ private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException { for (int i = 0; i <10; i++) { TextMessage message = session.createTextMessage("ActiveMQ 发的第 "+i+"条消息"); System.out.println("发送消息:"+message.getText()); messageProducer.send(message); } } }执行代码,后台管理界面显示结果:
Receive 方式接收消息:
package com.zth.p2p; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息消费者 * @author zth * @Date 2019-05-25 19:28 */ public class JMSConsumer { private static final String USENAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args){ // 实例化工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USENAME,PASSWORD,URL); try { // 创建连接 Connection connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建 session Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); // 创建连接的消息队列 Destination destination = session.createQueue("FirstQueue"); // 创建消息的消费者 MessageConsumer messageConsumer = session.createConsumer(destination); while (true){ TextMessage textMessage = (TextMessage)messageConsumer.receive(100000); if (null != textMessage){ System.out.println("接收到消息:"+textMessage.getText()); }else { break; } } } catch (JMSException e) { e.printStackTrace(); } } }执行代码,控制台结果:
后台管理界面显示结果:
再次执行生产者代码,后台管理界面显示结果:
Listener 监听方式接收消息:
package com.zth.p2p; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; /** * @author zth * @Date 2019-05-25 19:59 */ public class JMSProducer2 { public static void main(String[] args){ // 实例化工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL ); try { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("FirstQueue"); MessageConsumer messageConsumer = session.createConsumer(destination); // 注册消息监听 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // 等待键盘输入 try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } // 关闭资源 messageConsumer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }键盘输入前后台管理界面显示结果:
消息生产者:
package com.zth.PubSub; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息发布者 * @author zth * @Date 2019-05-25 20:34 */ public class JMSProducer { public static void main(String[] args){ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, ActiveMQConnectionFactory.DEFAULT_BROKER_URL ); try { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("FirstTopic"); MessageProducer messageProducer = session.createProducer(destination); sendMessage(session,messageProducer); session.commit(); messageProducer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException { for (int i = 0; i < 10; i++) { TextMessage message = session.createTextMessage("ActiveMQ 发布的第 "+i+" 条消息"); System.out.println("发送的消息是:"+message.getText()); messageProducer.send(message); } } }消息消费者:
package com.zth.PubSub; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息订阅者 1 * @author zth * @Date 2019-05-25 20:33 */ public class JMSConsumer01 { public static void main(String[] args){ ConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, ActiveMQConnectionFactory.DEFAULT_BROKER_URL ); try { Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("FirstTopic"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("订阅者 1 收到的消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } }先执行 2 个消息消费者代码(消费者2 代码同消费者1)
后台管理界面显示结果:
执行消息生产者代码,后台管理界面显示结果: