一、安装ActiveMQ
注意:JDK版本需要1.7及以上才行
到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下,下载链接如下:http://activemq.apache.org/download.html,解压后的目录结构如下:
如果我们是32位的机器,就双击win32目录下的activemq.bat,如果是64位机器,则双击win64目录下的activemq.bat,运行结果如下:
启动成功!成功之后在浏览器输入http://127.0.0.1:8161/地址,可以看到ActiveMQ的管理页面,用户名和密码默认都是admin,如下:
<!-- activemq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency> <!-- activemq -->
activemq: broker-url: tcp://127.0.0.1:61616 user: admin password: admin # 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。 in-memory: true # 等待消息发送响应的时间。设置为0等待永远。 send-timeout: 0 # 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。 non-blocking-redelivery: false # 是否用Pooledconnectionfactory代替普通的ConnectionFactory pool: enabled: true packages: trust-all: true # 如果使用ObjectMessage传输对象,必须要加上这个信任包,否则会报ClassNotFound异常 jms: #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置 pub-sub-domain: true
package com.wondertek.oes.workbench.collect.config;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
@Configuration public class ActiveMqConfig {
// queue模式的ListenerContainer @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(activeMQConnectionFactory); return bean; }
// topic模式的ListenerContainer @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; }
}
package com.wondertek.oes.workbench.collect.mq;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service;
import java.io.Serializable; import java.util.List;
@Service public class MqProducer {
@Autowired private JmsMessagingTemplate jmsMessagingTemplate;
/** * 发送字符串消息队列 * * @param queueName 队列名称 * @param message 字符串 */ public void sendStringQueue(String queueName, String message) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message); }
/** * 发送字符串集合消息队列 * * @param queueName 队列名称 * @param list 字符串集合 */ public void sendStringListQueue(String queueName, List<String> list) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list); }
/** * 发送对象消息队列 * * @param queueName 队列名称 * @param obj 对象 */ public void sendObjQueue(String queueName, Serializable obj) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj); }
/** * 发送对象集合消息队列 * * @param queueName 队列名称 * @param objList 对象集合 */ public void sendObjListQueue(String queueName, List<Serializable> objList) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList); }
/** * 发送字符串消息主题 * * @param topicName 主题名称 * @param message 字符串 */ public void sendStringTopic(String topicName, String message) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), message); }
/** * 发送字符串集合消息主题 * * @param topicName 主题名称 * @param list 字符串集合 */ public void sendStringListTopic(String topicName, List<String> list) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), list); }
/** * 发送对象消息主题 * * @param topicName 主题名称 * @param obj 对象 */ public void sendObjTopic(String topicName, Serializable obj) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), obj); }
/** * 发送对象集合消息主题 * * @param topicName 主题名称 * @param objList 对象集合 */ public void sendObjListTopic(String topicName, List<Serializable> objList) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), objList); } //@JmsListener(destination="out.queue") @JmsListener(destination = "out.queue", containerFactory = "jmsListenerContainerQueue") public void consumerMessage(String text){ System.out.println("从out.queue队列收到的回复报文为:"+text); }
}
package com.wondertek.oes.workbench.collect.mq;
import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component;
import javax.jms.ObjectMessage; import java.util.List;
@Component public class QueueConsumer { @JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue") public void receiveStringQueue(String msg) { System.out.println("接收到消息...." + msg); }
@JmsListener(destination = "stringListQueue", containerFactory = "jmsListenerContainerQueue") public void receiveStringListQueue(List<String> list) { System.out.println("接收到集合队列消息...." + list); }
@JmsListener(destination = "objQueue", containerFactory = "jmsListenerContainerQueue") public void receiveObjQueue(ObjectMessage objectMessage) throws Exception { System.out.println("接收到对象队列消息...." + objectMessage.getObject()); }
@JmsListener(destination = "objListQueue", containerFactory = "jmsListenerContainerQueue") public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception { System.out.println("接收到的对象队列消息..." + objectMessage.getObject()); }
}
注: A主题消费者,用来接收主题类消息。目前搭了俩个消费者,一个A,一个B。
package com.wondertek.oes.workbench.collect.mq;
import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component;
import javax.jms.ObjectMessage; import java.util.List;
@Component public class ATopicConsumer {
@JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic") public void receiveStringTopic(String msg) { System.out.println("ATopicConsumer接收到消息...." + msg); }
@JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic") public void receiveStringListTopic(List<String> list) { System.out.println("ATopicConsumer接收到集合主题消息...." + list); }
@JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic") public void receiveObjTopic(ObjectMessage objectMessage) throws Exception { System.out.println("ATopicConsumer接收到对象主题消息...." + objectMessage.getObject()); }
@JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic") @SendTo("out.queue") public String receiveObjListTopic(ObjectMessage objectMessage) throws Exception { System.out.println("ATopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject()); return "ATopicConsumer:"+objectMessage.getObject(); }
}
实现双向队列
我们在receiveObjListTopic方法上面多加了一个注解@SendTo("out.queue"),该注解的意思是将return回的值,再发送的"out.queue"队列中(生产者有接受方法)
package com.wondertek.oes.workbench.collect.mq;
import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component;
import javax.jms.ObjectMessage; import java.util.List;
@Component public class BTopicConsumer {
@JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic") public void receiveStringTopic(String msg) { System.out.println("BTopicConsumer接收到消息...." + msg); }
@JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic") public void receiveStringListTopic(List<String> list) { System.out.println("BTopicConsumer接收到集合主题消息...." + list); }
@JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic") public void receiveObjTopic(ObjectMessage objectMessage) throws Exception { System.out.println("BTopicConsumer接收到对象主题消息...." + objectMessage.getObject()); }
@JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic") @SendTo("out.queue") public String receiveObjListTopic(ObjectMessage objectMessage) throws Exception { System.out.println("BTopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject()); return "BTopicConsumer:"+objectMessage.getObject(); }
}
注: 用来测试对象的类,复写了toString方法。必须实现Serializable接口
package com.wondertek.oes.workbench.collect.mq;
import java.io.Serializable;
public class User implements Serializable {
private String id; private String name; private Integer age;
public User() { }
public User(String id, String name, Integer age) { this.id = id; this.name = name; this.age = age; }
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; }
@Override public String toString() { return "User{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", age=" + age + '}'; } }
package com.wondertek.oes.workbench.collect;
import java.io.Serializable; import java.util.ArrayList; import java.util.List;
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
import com.wondertek.oes.workbench.collect.mq.MqProducer; import com.wondertek.oes.workbench.collect.mq.User;
@RunWith(SpringRunner.class) @SpringBootTest public class ActivemqdemoApplicationTests {
@Autowired private MqProducer mqProducer;
@Test public void testStringQueue() {
for (int i = 1; i <= 100; i++) { System.out.println("第" + i + "次发送字符串队列消息"); mqProducer.sendStringQueue("stringQueue", "消息:" + i); } }
@Test public void testStringListQueue() {
List<String> idList = new ArrayList<>(); idList.add("id1"); idList.add("id2"); idList.add("id3");
System.out.println("正在发送集合队列消息ing......"); mqProducer.sendStringListQueue("stringListQueue", idList); }
@Test public void testObjQueue() {
System.out.println("正在发送对象队列消息......"); mqProducer.sendObjQueue("objQueue", new User("1", "小明", 20)); }
@Test public void testObjListQueue() {
System.out.println("正在发送对象集合队列消息......");
List<Serializable> userList = new ArrayList<>(); userList.add(new User("1", "小明", 21)); userList.add(new User("2", "小雪", 22)); userList.add(new User("3", "小花", 23));
mqProducer.sendObjListQueue("objListQueue", userList); }
@Test public void testStringTopic() {
for (int i = 1; i <= 100; i++) { System.out.println("第" + i + "次发送字符串主题消息"); mqProducer.sendStringTopic("stringTopic", "消息:" + i); } }
@Test public void testStringListTopic() {
List<String> idList = new ArrayList<>(); idList.add("id1"); idList.add("id2"); idList.add("id3");
System.out.println("正在发送集合主题消息ing......"); mqProducer.sendStringListTopic("stringListTopic", idList); }
@Test public void testObjTopic() {
System.out.println("正在发送对象主题消息......"); mqProducer.sendObjTopic("objTopic", new User("1", "小明", 20)); }
@Test public void testObjListTopic() {
System.out.println("正在发送对象集合主题消息......");
List<Serializable> userList = new ArrayList<>(); userList.add(new User("1", "小明", 21)); userList.add(new User("2", "小雪", 22)); userList.add(new User("3", "小花", 23));
mqProducer.sendObjListTopic("objListTopic", userList); } }