ActiveMQ消息(二)

    xiaoxiao2025-04-15  29

    一、.非持久的Topic消息示例

    1.过程

    2.编写生产者

    package cn.day1; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicSend { // activemq服务器的url地址,默认通信端口为61616 private static final String URL = "tcp://localhost:61616"; // 队列的名称 private static final String TOPICNAME = "MyTopic"; public static void main(String[] args) { // 1.创建连接工厂对象(ConnectionFactory) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.创建连接对象(Connection) Connection connection = null; try { connection = connectionFactory.createConnection(); // 3.启动连接 connection.start(); // 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建目的地(destination) Destination destination = session.createTopic(TOPICNAME); // 6.创建生产者 MessageProducer producer = session.createProducer(destination); // 循环发送消息 for (int i = 0; i < 10; i++) { // 7.创建消息,这里创建的是简单的文本消息体 TextMessage textMessage = session.createTextMessage("TopicTest" + i); // 8.使用消息生产者往目的地发送消息 producer.send(destination, textMessage); System.out.println("消息发送成功:" + textMessage.getText()); } // 9.关闭连接 connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }

    运行结果:

    消息发送成功:TopicTest0 消息发送成功:TopicTest1 消息发送成功:TopicTest2 消息发送成功:TopicTest3 消息发送成功:TopicTest4 消息发送成功:TopicTest5 消息发送成功:TopicTest6 消息发送成功:TopicTest7 消息发送成功:TopicTest8 消息发送成功:TopicTest9

     

    3.消费者编写

    package cn.day1; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicReceive { // activemq服务器的url地址,默认通信端口为61616 private static final String URL = "tcp://localhost:61616"; // 队列的名称 private static final String TOPICNAME = "MyTopic"; public static void main(String[] args){ // 1.创建连接工厂对象(ConnectionFactory) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.创建连接对象(Connection) Connection connection = null; Session session = null; try{ connection = connectionFactory.createConnection(); // 3.启动连接 connection.start(); // 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建目的地(destination) Destination destination = session.createTopic(TOPICNAME); // 6.创建消费者 MessageConsumer messageConsumer = session.createConsumer(destination); // 7.创建接收方 Message message = messageConsumer.receive(); while (message != null){ TextMessage textMessage = (TextMessage) message; System.out.println("接收消息:"+textMessage.getText()); message = messageConsumer.receive(1000); } session.commit(); }catch (Exception e){ e.getStackTrace(); }finally { try { session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }

     

    接收消息:TopicTest0 接收消息:TopicTest1 接收消息:TopicTest2 接收消息:TopicTest3 接收消息:TopicTest4 接收消息:TopicTest5 接收消息:TopicTest6 接收消息:TopicTest7 接收消息:TopicTest8 接收消息:TopicTest9

    注意:消费者一定要先开启来 才  接受到消息

    二 持久的Topic

    1.步骤

    package cn.day1; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class PetsistTopicSend { // activemq服务器的url地址,默认通信端口为61616 private static final String URL = "tcp://localhost:61616"; // 队列的名称 private static final String TOPICNAME = "MyPersistTopic"; public static void main(String[] args) { // 1.创建连接工厂对象(ConnectionFactory) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.创建连接对象(Connection) Connection connection = null; try { connection = connectionFactory.createConnection(); // 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建目的地(destination) Destination destination = session.createTopic(TOPICNAME); // 6.创建生产者 MessageProducer producer = session.createProducer(destination); //变成持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 3.启动连接 connection.start(); // 循环发送消息 for (int i = 0; i < 10; i++) { // 7.创建消息,这里创建的是简单的文本消息体 TextMessage textMessage = session.createTextMessage("TopicTest" + i); // 8.使用消息生产者往目的地发送消息 producer.send(destination, textMessage); System.out.println("持久消息发送成功:" + textMessage.getText()); } // 9.关闭连接 connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }

    运行结果:

    持久接收消息:TopicTest2 持久接收消息:TopicTest3 持久接收消息:TopicTest4 持久接收消息:TopicTest5 持久接收消息:TopicTest6 持久接收消息:TopicTest7 持久接收消息:TopicTest8 持久接收消息:TopicTest9 持久接收消息:TopicTest0 持久接收消息:TopicTest1 持久接收消息:TopicTest2 持久接收消息:TopicTest3 持久接收消息:TopicTest4 持久接收消息:TopicTest5 持久接收消息:TopicTest6 持久接收消息:TopicTest7 持久接收消息:TopicTest8 持久接收消息:TopicTest9

    package cn.day1; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class PersistTopicReceive { // activemq服务器的url地址,默认通信端口为61616 private static final String URL = "tcp://localhost:61616"; // 队列的名称 private static final String TOPICNAME = "MyPersistTopic"; public static void main(String[] args){ // 1.创建连接工厂对象(ConnectionFactory) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.创建连接对象(Connection) Connection connection = null; Session session = null; try{ connection = connectionFactory.createConnection(); connection.setClientID("cc1"); // 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建目的地(destination) Topic destination = session.createTopic(TOPICNAME); // 6.创建消费者 TopicSubscriber topicSubscriber = session.createDurableSubscriber(destination,"t1"); // 3.启动连接 connection.start(); // 7.创建接收方 Message message = topicSubscriber.receive(); while (message != null){ TextMessage textMessage = (TextMessage) message; System.out.println("持久接收消息:"+textMessage.getText()); message = topicSubscriber.receive(1000); } session.commit(); session.close(); connection.close(); }catch (Exception e){ e.getStackTrace(); } } }

    第一要先启动起来才能往服务注册

    运行结果:

    持久接收消息:TopicTest2 持久接收消息:TopicTest3 持久接收消息:TopicTest4 持久接收消息:TopicTest5 持久接收消息:TopicTest6 持久接收消息:TopicTest7 持久接收消息:TopicTest8 持久接收消息:TopicTest9 持久接收消息:TopicTest0 持久接收消息:TopicTest1 持久接收消息:TopicTest2 持久接收消息:TopicTest3 持久接收消息:TopicTest4 持久接收消息:TopicTest5 持久接收消息:TopicTest6 持久接收消息:TopicTest7 持久接收消息:TopicTest8 持久接收消息:TopicTest9

    https://www.cnblogs.com/niit-soft-518/p/6957384.html

    最新回复(0)