JMS —— ActiveMQ

    xiaoxiao2023-10-27  162

     一. 概述

    1. 相关概念

    JMS 即 Java 消息服务(Java Message Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。ActiveMQ 是一个 MOM,是一个实现了 JMS 规范的系统间远程通信的消息代理。Provider:纯 Java 语言编写的 JMS 接口实现(比如 ActiveMQ )Domains:消息传递方式,包括点对点(P2P)、发布/订阅(Pub/Sub)两种。Connection factory:客户端使用连接工厂来创建与 JMS provider 的连接。Destination:消息被寻址、发送以及接收的对象。MQ全称为Message Queue,消息队列(MQ)是正确而又完整的 JMS 实现,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

     2. ActiveMQ 的消息传递模式

    P2P :(点对点)消息域使用 queue 作为 Destination,消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通过使用 MessageConsumer.set MessageListener()  注册一个 MessageListener 实现异步接收。多个 Consumer 可以注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,然后由该 Consumer 来确认消息。并且在这种情况下,Provider 对所有注册的 Consumer 以轮询的方式发送消息。

    Pub/Sub:(发布/订阅,Publish/Subscribe)消息域使用 topic 作为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。

     

    3. JMS 创建步骤:

    获取连接工厂使用连接工厂创建连接启动连接从连接创建会话获取 Destination创建 Producer,或 创建 Producer创建 message创建 Consumer,或 创建 Consumer注册消息监听器(可选)发送或接收 message关闭资源(connection, session, producer, consumer 等)

    4. JMS 五种不同的消息正文格式,以及调用的消息类型。

    TextMessage--一个字符串对象MapMessage--一套名称-值对ObjectMessage--一个序列化的 Java 对象BytesMessage--一个字节的数据流StreamMessage -- Java 原始值的数据流

    二. 启动 ActiveMQ

    1. 双击 bin/win64(win32)/activemq.bat 启动服务。

    2. 默认 ActiveMQ 启动时,启动了内置的 jetty 服务器,提供一个用于监控ActiveMQ的admin应用

    3. 服务启动地址:http://127.0.0.1:8161/admin/

    4. 默认用户名/密码:admin/admin

     

    三. 案例

    1. 点对点模式

    生产者:

    package com.zth.p2p; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息生产者 * @author zth * @Date 2019-05-25 17:01 */ public class JMSProducer { // 默认连接用户名 private static final String USENAME = ActiveMQConnection.DEFAULT_USER; // 默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认连接地址 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args){ // 连接工厂 ConnectionFactory connectionFactory; // 连接 Connection connection = null; // 会话 (接受或者发送消息的线程) Session session; // 消息目的地 Destination destination; // 消息的生产者 MessageProducer messageProducer; // 实例化工厂 connectionFactory = new ActiveMQConnectionFactory(USENAME,PASSWORD,BROKERURL); try { // 获取连接 connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建会话 /** * 参数1:是否启动事务 * 参数2:消息确认模式[ * AUTO_ACKNOWLEDGE = 1 自动确认 * CLIENT_ACKNOWLEDGE = 2 客户端手动确认 * DUPS_OK_ACKNOWLEDGE = 3 自动批量确认 * SESSION_TRANSACTED = 0 事务提交并确认 * ] */ session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); // 创建消息队列 destination = session.createQueue("FirstQueue"); // 创建消息生产者 messageProducer = session.createProducer(destination); // 发送消息 sendMessage(session,messageProducer); session.commit(); // 关闭资源 messageProducer.close(); session.close(); } catch (JMSException e) { e.printStackTrace(); }finally { if (connection != null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发送消息 */ private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException { for (int i = 0; i <10; i++) { TextMessage message = session.createTextMessage("ActiveMQ 发的第 "+i+"条消息"); System.out.println("发送消息:"+message.getText()); messageProducer.send(message); } } }

    执行代码,后台管理界面显示结果:

    Receive 方式接收消息:

    package com.zth.p2p; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息消费者 * @author zth * @Date 2019-05-25 19:28 */ public class JMSConsumer { private static final String USENAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args){ // 实例化工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USENAME,PASSWORD,URL); try { // 创建连接 Connection connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建 session Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); // 创建连接的消息队列 Destination destination = session.createQueue("FirstQueue"); // 创建消息的消费者 MessageConsumer messageConsumer = session.createConsumer(destination); while (true){ TextMessage textMessage = (TextMessage)messageConsumer.receive(100000); if (null != textMessage){ System.out.println("接收到消息:"+textMessage.getText()); }else { break; } } } catch (JMSException e) { e.printStackTrace(); } } }

    执行代码,控制台结果:

     后台管理界面显示结果:

    再次执行生产者代码,后台管理界面显示结果:

    Listener 监听方式接收消息:

    package com.zth.p2p; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; /** * @author zth * @Date 2019-05-25 19:59 */ public class JMSProducer2 { public static void main(String[] args){ // 实例化工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL ); try { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("FirstQueue"); MessageConsumer messageConsumer = session.createConsumer(destination); // 注册消息监听 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // 等待键盘输入 try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } // 关闭资源 messageConsumer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }

    键盘输入前后台管理界面显示结果:

     

    2. 发布/订阅模式

    消息生产者:

    package com.zth.PubSub; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息发布者 * @author zth * @Date 2019-05-25 20:34 */ public class JMSProducer { public static void main(String[] args){ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, ActiveMQConnectionFactory.DEFAULT_BROKER_URL ); try { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("FirstTopic"); MessageProducer messageProducer = session.createProducer(destination); sendMessage(session,messageProducer); session.commit(); messageProducer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException { for (int i = 0; i < 10; i++) { TextMessage message = session.createTextMessage("ActiveMQ 发布的第 "+i+" 条消息"); System.out.println("发送的消息是:"+message.getText()); messageProducer.send(message); } } }

    消息消费者:

    package com.zth.PubSub; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息订阅者 1 * @author zth * @Date 2019-05-25 20:33 */ public class JMSConsumer01 { public static void main(String[] args){ ConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, ActiveMQConnectionFactory.DEFAULT_BROKER_URL ); try { Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("FirstTopic"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("订阅者 1 收到的消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } }

    先执行 2 个消息消费者代码(消费者2 代码同消费者1)

    后台管理界面显示结果:

     

     

    执行消息生产者代码,后台管理界面显示结果:

     

    最新回复(0)