上一篇文章我们已经看过activemq的点对点模式,本文将阐述使用java完成发布订阅值topic模式的应用,所谓topic模式,其实就是广播。
1、前提约束
已经安装和启动activemq https://www.jianshu.com/p/47d6d824ad50
2、使用idea创建一个maven的项目
https://www.jianshu.com/p/042073b7710b
3、修改pom.xml
在pom.xml中加入以下依赖:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>
4、创建订阅者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class PubsubConsumer {
public static void main(String[] args)throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.100.192:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
//Queue queue = session.createQueue("test-queue");
Topic topic = session.createTopic("my-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
}
启动两次,查看管理界面。依次点击Manage ActiveMQ broker->topics,我们看到创建的两个消费者。
5、创建发布者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class PubsubProducer {
public static void main(String[] args) throws Exception{
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.100.192:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("my-topic");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage textMessage = session.createTextMessage("ali");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
}
启动,查看管理界面。依次点击Manage ActiveMQ broker->topics,我们看到刚才创建的消息已经被消费:
1条消息被消费2次
至此,我们完成了activemq中topic模式的测试。