ActiveMQ主要涉及到5个方面: 1. 传输协议:消息之间的传递,无疑需要协议进行沟通,启动一个ActiveMQ打开了一个监听端口, ActiveMQ提供了广泛的连接模式,其中主要包括SSL、STOMP、XMPP;ActiveMQ默认的使用的协议是openWire,端口号:61616; 2. 消息域:ActiveMQ主要包含Point-to-Point (点对点),Publish/Subscribe Model (发布/订阅者),其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化订阅)2种消息处理方式 3. 消息存储:在消息传递过程中,部分重要的消息可能需要存储到数据库或文件系统中,当中介崩溃时,信息不回丢失 4. Cluster (集群): 最常见到 集群方式包括network of brokers和Master Slave; 5. Monitor (监控) :ActiveMQ一般由jmx来进行监控
默认配置下的ActiveMQ只适合学习代码而不适用于实际生产环境,ActiveMQ的性能需要通过配置挖掘,其性能提高包括代码级性能、规则性能、存储性能、网络性能以及多节点协同方法(集群方案),所以我们优化ActiveMQ的中心思路也是这样的:
1. 优化ActiveMQ单个节点的性能,包括NIO模型选择和存储选择。
2. 配置ActiveMQ的集群(ActiveMQ的高性能和高可用需要通过集群表现出来)。
点对点模式下一条消息将会发送给一个消息消费者,如果当前Queue没有消息消费者,消息将进行存储。
1.创建连接Connection 2.创建会话Session 3.通过Session来创建其它的(MessageProducer、MessageConsumer、Destination、TextMessage) 4.将生产者 MessageProducer 和消费者 MessageConsumer 都会指向目标 Destination 5.生产者向目标发送TextMessage消息send() 6.消费者设置监听器,监听消息。
点对点的消息模型,只需要一个消息生成者和消息消费者,下面我们编写代码。
目录结构:
pom结构
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.loafer</groupId> <artifactId>ActiveMq01</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ActiveMq01</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.4</version> </dependency> </dependencies> </project>编写生产者代码
package com.loafer.active; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class TextProducer { /** * 发送消息到ActiveMQ中,具体的消息内容为参数信息 * 开发JMS相关代码中,使用的接口类型都是javax.jms包下的类型 * @param datas 消息内容 */ public void sendTextMessage(String datas){ //连接工厂 ConnectionFactory factory = null; //连接 Connection connection = null; //目的地 Destination destination = null; //会话 Session session = null; //消息发送者 MessageProducer producer = null; //消息对象 Message message = null; try{ //创建连接工厂,连接ActiveMQ服务的连接工厂 //创建工厂,构造方法有三个参数,分别是用户名,密码,连接地址 //无参构造,有默认的连接地址。本地连接 //单参数构造,五无验证模式,没有用户的认证 //三参数构造,有认证+指定地址。默认端口是61616.从ActiveMQ的conf/activemq.xml配置文件中查看 factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.0.32:61616"); //通过工厂,创建连接对象 //创建连接的方法有重载,其中有createConnection(String username,String password) //可以再创建连接工厂时,只传递连接地址,不传递用户信息 connection = factory.createConnection(); //建议启动连接,消息的发送者不是必须启动连接。消息的消费者必须启动连接 //producer再发送消息的时候,会检查是否启动了连接,如果未启动,自动启动 //如果有特殊的配置,建议配置完毕后再启动连接 connection.start(); //通过连接对象,创建会话对象 /*创建会话的时候,必须传递两个参数,分别代表的是否支持事务和如何确认消息处理 * transacted:是否支持事务,数据类型是boolean.true-支持 false-不支持 * true:支持事务,第二个参数对producer来说默认是无效。建议传递的数据是Session.SESSION_TRANSACTED * false:不支持事务,常用参数。第二个参数必须传递,且必须有效 * acknowledgeMode:如何确认消息的处理。使用确认机制实现的 * AUTO_ACKNOWLEDGE:自动确认消息。消息的消费者处理消息后,自动确认。常用。商业开发不推荐 * CLIENT_ACKNOWLEDGE:客户端手动确认。消息的消费者处理后,必须手工确认 * DUPS_OK_ACKNOWLEDGE:有副本的客户端手动确认 * 一个消息可以多次处理 * 可以降低Session的消耗,再可以容忍重复消息时使用(不推荐使用) */ session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); //创建目的地。参数是目的地名称。是目的地的唯一标记 destination = session.createQueue("first-mq"); //通过会话对象,创建消息的发送者producer //创建的消息发送者,发送的消息一定到指定的目的地中 //创建producer的时候,可以不提供目的地。在发送消息的时候制定目的地 producer = session.createProducer(destination); //创建文本消息对象,作为具体数据内容的载体 message = session.createTextMessage(datas); //使用producer,发送消息到ActiveMQ中的目的地。如果消息发送失败。抛出异常 producer.send(message); System.out.println("消息已经发送成功...."); }catch(Exception e){ e.printStackTrace(); }finally{ if(producer != null){//回收消息发送者 try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session != null){//回收会话对象 try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(connection != null){//回收连接对象 try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void main(String[] args){ TextProducer producer = new TextProducer(); producer.sendTextMessage("我是一个测试的ActiveMQ..."); } }运行生产者结果
编写消费者 package com.loafer.active; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class TextConsumer { public String receiveTextMessage(){ String resultCode = ""; ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; //消息的消费者,用于接收消息的对象 MessageConsumer consumer = null; TextMessage message = null; try{ factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.0.32:61616"); connection = factory.createConnection(); //消息的消费者必须启动连接,否支无法处理消息 connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("first-mq"); //创建消息消费者对象,在制定的目的地中获取消息 consumer = session.createConsumer(destination); //获取队列中的消息 message = (TextMessage) consumer.receive(); //处理文本消息 resultCode = message.getText(); }catch(Exception e){ e.printStackTrace(); }finally{ if(consumer != null){//回收消息消费者 try { consumer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session != null){//回收会话对象 try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(connection != null){//回收连接对象 try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } return resultCode; } public static void main(String[] args){ TextConsumer consumer = new TextConsumer(); String messageString = consumer.receiveTextMessage(); System.out.print("接收的消息内容是:" + messageString); } }运行消费者结果:
代码下载:https://github.com/loafer7423/ActiveMq/tree/master/ActiveMq01
目录结构:
生产者代码
package com.loafer.producer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import com.loafer.consumer.ConsumerListener; public class ProducerSend { public void sendMessage(Object obj){ ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; Message message = null; try{ factory = new ActiveMQConnectionFactory("guest","guest","tcp://192.168.0.32:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test-listener"); producer = session.createProducer(destination); connection.start(); for(int i=0;i<100;i++){ message = session.createObjectMessage("我是第"+i+"个消息,消息内容是:"+obj); producer.send(message); } }catch(Exception e){ e.printStackTrace(); }finally{ if(producer != null){//回收消息生产者 try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session != null){//回收会话对象 try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(connection != null){//回收连接对象 try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void main(String[] args){ ProducerSend producer = new ProducerSend(); producer.sendMessage("this is message!!!"); } }消费者代码:
package com.loafer.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import com.sun.xml.internal.org.jvnet.fastinfoset.VocabularyApplicationData; /** * 使用监听器的方式,实现消息的处理【消费】 * @author 王东 * */ public class ConsumerListener { /** * 处理消息 */ public void consumMessage(){ ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try{ factory = new ActiveMQConnectionFactory("guest","guest","tcp://192.168.0.32:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test-listener"); consumer = session.createConsumer(destination); //注册监听器。注册成功后,队列中的消息变化会自动触发监听器代码。接收消息并处理 consumer.setMessageListener(new MessageListener(){ /* *监听器一旦注册,永久有效。 *永久-consumer线程不关闭 *处理消息的方式:只要有消息未处理,自动调用onMessage方法,处理消息 *监听器可以注册若干。注册多个监听器,相当于集群 *ActiveMQ自动的循环调用多个监听器,处理队列中的消息,实现并行处理 * * 处理消息的方法。就是监听方法 * 监听的事件是:消息,消息未处理 * 要处理的具体内容:消息处理 * @param message-未处理的消息 * */ public void onMessage(Message message) { try{ ObjectMessage om = (ObjectMessage) message; Object data = om.getObject(); System.out.println(data); }catch(Exception e){ e.printStackTrace(); } } }); //阻塞当前代码。保证listener代码未结束。如果代码结束了,监听器自动关闭 System.in.read(); }catch(Exception e){ e.printStackTrace(); }finally{ if(consumer != null){//回收消息消费者 try { consumer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session != null){//回收会话对象 try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(connection != null){//回收连接对象 try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void main(String[] args){ ConsumerListener listener = new ConsumerListener(); listener.consumMessage(); } }依次运行生产者和消费者,最后消费者打印的结果:
代码下载:https://github.com/loafer7423/ActiveMq/tree/master/ActiveMq02
1.创建连接Connection 2.创建会话Session 3.通过Session来创建其它的(MessageProducer、MessageConsumer、Destination、TextMessage) 4.将生产者 MessageProducer 和消费者 MessageConsumer 都会指向目标 Destination 5.生产者向目标发送TextMessage消息send() 6.消费者设置监听器,监听消息。
目录结构:
pom.xml代码:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.loafer</groupId> <artifactId>ActiveMq03</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ActiveMq03</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.4</version> </dependency> </dependencies> </project>生产者代码:
package com.loafer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /** * 发送消息到ActiveMQ中,具体的消息内容为参数信息 * 开发JMS相关代码中,使用的接口类型都是javax.jms包下的类型 * @param datas 消息内容 */ public class TopicProducer { /** * 发送消息到ActiveMQ中,具体的消息内容为参数信息 * 开发JMS相关代码中,使用的接口类型都是javax.jms包下的类型 * @param datas 消息内容 */ public void sendTextMessage(String datas){ //连接工厂 ConnectionFactory factory = null; //连接 Connection connection = null; //目的地 Destination destination = null; //会话 Session session = null; //消息发送者 MessageProducer producer = null; //消息对象 Message message = null; try{ //创建连接工厂,连接ActiveMQ服务的连接工厂 //创建工厂,构造方法有三个参数,分别是用户名,密码,连接地址 //无参构造,有默认的连接地址。本地连接 //单参数构造,五无验证模式,没有用户的认证 //三参数构造,有认证+指定地址。默认端口是61616.从ActiveMQ的conf/activemq.xml配置文件中查看 factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.0.32:61616"); //通过工厂,创建连接对象 //创建连接的方法有重载,其中有createConnection(String username,String password) //可以再创建连接工厂时,只传递连接地址,不传递用户信息 connection = factory.createConnection(); //建议启动连接,消息的发送者不是必须启动连接。消息的消费者必须启动连接 //producer再发送消息的时候,会检查是否启动了连接,如果未启动,自动启动 //如果有特殊的配置,建议配置完毕后再启动连接 connection.start(); //通过连接对象,创建会话对象 /*创建会话的时候,必须传递两个参数,分别代表的是否支持事务和如何确认消息处理 * transacted:是否支持事务,数据类型是boolean.true-支持 false-不支持 * true:支持事务,第二个参数对producer来说默认是无效。建议传递的数据是Session.SESSION_TRANSACTED * false:不支持事务,常用参数。第二个参数必须传递,且必须有效 * acknowledgeMode:如何确认消息的处理。使用确认机制实现的 * AUTO_ACKNOWLEDGE:自动确认消息。消息的消费者处理消息后,自动确认。常用。商业开发不推荐 * CLIENT_ACKNOWLEDGE:客户端手动确认。消息的消费者处理后,必须手工确认 * DUPS_OK_ACKNOWLEDGE:有副本的客户端手动确认 * 一个消息可以多次处理 * 可以降低Session的消耗,再可以容忍重复消息时使用(不推荐使用) */ session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //创建主题目的地。参数是目的地名称。是目的地的唯一标记 destination = session.createTopic("topicDemo"); //通过会话对象,创建消息的发送者producer //创建的消息发送者,发送的消息一定到指定的目的地中 //创建producer的时候,可以不提供目的地。在发送消息的时候制定目的地 producer = session.createProducer(destination); //创建文本消息对象,作为具体数据内容的载体 message = session.createTextMessage(datas); //使用producer,发送消息到ActiveMQ中的目的地。如果消息发送失败。抛出异常 producer.send(message); System.out.println("消息已经发送成功...."); }catch(Exception e){ e.printStackTrace(); }finally{ if(producer != null){//回收消息发送者 try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session != null){//回收会话对象 try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(connection != null){//回收连接对象 try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void main(String[] args){ System.out.println("生产者程序开始启动....."); TopicProducer producer = new TopicProducer(); producer.sendTextMessage("我是一个测试的ActiveMQ..."); System.out.println("生产者程序已经关闭....."); } }消费者A代码:
package com.loafer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicConsumer { public String receiveTextMessage(){ String resultCode = ""; ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; //消息的消费者,用于接收消息的对象 MessageConsumer consumer = null; TextMessage message = null; try{ factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.0.32:61616"); connection = factory.createConnection(); //消息的消费者必须启动连接,否支无法处理消息 connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("topicDemo"); //创建消息消费者对象,在制定的目的地中获取消息 consumer = session.createConsumer(destination); message = (TextMessage) consumer.receive(); resultCode = message.getText(); /**消费者消息监听开始*/ // consumer.setMessageListener(new MessageListener() { // // public void onMessage(Message message) { // try { // System.out.println("接收消息 = [" + ((TextMessage) message).getText() + "]"); // } catch (JMSException e) { // e.printStackTrace(); // } // } // }); //阻塞当前代码。保证listener代码未结束。如果代码结束了,监听器自动关闭 // System.in.read(); /**消费者消息监听结束*/ }catch(Exception e){ e.printStackTrace(); }finally{ if(consumer != null){//回收消息消费者 try { consumer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session != null){//回收会话对象 try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(connection != null){//回收连接对象 try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } return resultCode; } public static void main(String[] args){ System.out.println("消费者程序开始启动....."); TopicConsumer consumer = new TopicConsumer(); String messageString = consumer.receiveTextMessage(); System.out.println("接收的消息内容是:" + messageString); System.out.println("消费者程序已经关闭....."); } }消费者B代码:
package com.loafer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicConsumerB { public String receiveTextMessage(){ String resultCode = ""; ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; //消息的消费者,用于接收消息的对象 MessageConsumer consumer = null; TextMessage message = null; try{ factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.0.32:61616"); connection = factory.createConnection(); //消息的消费者必须启动连接,否支无法处理消息 connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("topicDemo"); //创建消息消费者对象,在制定的目的地中获取消息 consumer = session.createConsumer(destination); message = (TextMessage) consumer.receive(); resultCode = message.getText(); /**消费者消息监听开始*/ // consumer.setMessageListener(new MessageListener() { // // public void onMessage(Message message) { // try { // System.out.println("接收消息 = [" + ((TextMessage) message).getText() + "]"); // } catch (JMSException e) { // e.printStackTrace(); // } // } // }); //阻塞当前代码。保证listener代码未结束。如果代码结束了,监听器自动关闭 // System.in.read(); /**消费者消息监听结束*/ }catch(Exception e){ e.printStackTrace(); }finally{ if(consumer != null){//回收消息消费者 try { consumer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session != null){//回收会话对象 try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(connection != null){//回收连接对象 try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } return resultCode; } public static void main(String[] args){ System.out.println("消费者程序开始启动....."); TopicConsumerB consumer = new TopicConsumerB(); String messageString = consumer.receiveTextMessage(); System.out.println("接收的消息内容是:" + messageString); System.out.println("消费者程序已经关闭....."); } }1.先运行生产者
控制台:
会发现有1条消息被发布
2. 消息发布后再开启一个消费者
运行TopicConsumer.java后会发现发布的1条消息并没有被消费者接收,因为在主题模式中: 只有提前进行订阅的消费者才能成功消费消息。而队列模式中消费者不需要提前订阅也可以消费消息。如下图:
3.先开启两个消费者,后运行生产者
开启消费者A
开启消费者B
启动生产者
会发现生产者发送的1个消息,两个消费者都全部接收。
Jms规范里的两种message传输方式Topic和Queue,两者的对比如下表():
TopicQueue概要Publish Subscribe messaging 发布订阅消息Point-to-Point 点对点有无状态topic数据默认不落地,是无状态的。Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。
完整性保障并不保证publisher发布的每条数据,Subscriber都能接受到。Queue保证每条数据都能被receiver接收。消息是否会丢失一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。消息发布接收策略一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。代码下载:https://github.com/loafer7423/ActiveMq/tree/master/ActiveMq03