Spring整合ActiveMQ

    xiaoxiao2023-11-05  132

    1.pom依赖

    properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> <java.version>1.8</java.version> <spring.version>5.1.2.RELEASE</spring.version> <!--<log4j2.Version>2.11.0</log4j2.Version>--> </properties> <profiles> <profile> <id>dev</id> <properties> <profile.active>dev</profile.active> </properties> <activation> <activeByDefault>true</activeByDefault> <jdk>1.8</jdk> </activation> <build> <filters> <filter>src/main/resource/dev.properties</filter> </filters> </build> </profile> </profiles> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <artifactId>spring-context</artifactId> <groupId>org.springframework</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.9</version> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.9.0</version> </dependency> </dependencies>

    2.ActiveMQ.xml配置

    <?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="com.xiongxiaolang.activemq.*"/> <!--<amq:connectionFactory id="amqconnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin"/>--> <!--activeMQ Pool--> <bean id="amqConnnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value type="java.lang.String">tcp://localhost:61616</value> </property> </bean> </property> <property name="maxConnections" value="100"/> </bean> <!--将ActiveMQ ConnectionFactory 交给spring 容器管理--> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="amqConnnectionFactory"/> <property name="sessionCacheSize" value="1"/> </bean> <!--定义两种通信类型-start--> <!--queue模式PTP--> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="pubSubDomain" value="false"/> </bean> <!--topic pub/sub--> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="pubSubDomain" value="true"/> </bean> <!--end--> <!--下面另一种配置消费者的方式,不过从代码简洁程度和理解上都没有之前的好--> <!-- <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="spring-queue"/> </bean> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-topic"/> </bean> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="destinationQueue"/> <property name="messageListener" ref="messageListener"/> </bean> <bean id="messageListener" class="com.winner.spring.MyMessageListener"> </bean>--> <!--消息监听 异步消费--> <!--queue类型消息监听--> <jms:listener-container destination-type="queue" container-type="default" acknowledge="auto" connection-factory="connectionFactory"> <jms:listener destination="spring_queue" ref="queueConsumer1"/> <jms:listener destination="spring_queue" ref="queueConsumer2"/> </jms:listener-container> <!--topic消息监听--> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="spring_topic" ref="topicConsumer1"/> <jms:listener destination="spring_topic" ref="topicConsumer2"/> </jms:listener-container> </beans>

    3.生成者实现

    @Service public class QueueSender { private JmsTemplate jmsTemplate; @Autowired private QueueSender(@Qualifier("jmsQueueTemplate")JmsTemplate jmsTemplate){ this.jmsTemplate = jmsTemplate; } public void send(String queueName, String message){ jmsTemplate.send(queueName,(Session session)->{ return session.createTextMessage(message); }); } } @Service public class TopicSender { @Autowired @Qualifier("jmsTopicTemplate") private JmsTemplate jmsTemplate; public void send(String topicName,final String message){ jmsTemplate.send(topicName,(Session session)->{ return session.createTextMessage(message); }); } }

    4.消费者实现

    @Service("queueConsumer1") public class QueueConsumer1 implements MessageListener{ @Override public void onMessage(Message message) { TextMessage message1 = (TextMessage) message; try { System.out.println("消费者queueConsumer1获取消息:" + message1.getText()); } catch (JMSException e) { e.printStackTrace(); } } } @Service("topicConsumer1") public class TopicConsumer1 implements MessageListener { @Override public void onMessage(Message message) { TextMessage message1 = (TextMessage) message; try { System.out.println("消费者topicConsumer1获取消息:" + message1.getText()); } catch (JMSException e) { e.printStackTrace(); } } }

    5.测试

    @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations= {"classpath:amq-applicationContext.xml"}) public class ActiveMQ_Test { @Autowired private QueueSender queueSender; @Autowired private TopicSender topicSender; @Test public void testSendMessage(){ queueSender.send("spring_queue","你好,我是queue producer"); topicSender.send("spring_topic","你好,我是topic Producer"); //这里跑个死循环,是不让进程挂掉,不然消息还没来得及消费,进程就结束了 while(true){ } } }

     

    最新回复(0)