异步接收消息的最简单的方法是使用注解监听端点的基础架构。简而言之,它允许你暴露托管一个 bean 的方法作为一个 JMS 的监听端点。
@Component public class MyService { @JmsListener(destination = "myDestination") public void processOrder(String data) { ... } }上述示例的想法是,每当javax.jms.Destination “myDestination” 上有消息可用时,就调用相应地processOrder方法(在这种情况下,JMS消息的内容类似于MessageListenerAdapter提供的内容)。
注解端点的基础架构使用JmsListenerContainerFactory为每个注解方法创建一个消息监听容器。这样的容器没有针对应用上下文进行注册,但是可以使用JmsListenerEndpointRegistry bean 进行简单的管理。
@JmsListener是 Java 8上的可重复注解,因此可以通过向其添加额外的@JmsListener声明将多个JMS目的地关联到同一个方法。 在 Java 6和7上,你可以使用@JmsListeners注解。
要启用对@JmsListener注解的支持,请将@EnableJms添加到你的一个@Configuration类上。
@Configuration @EnableJms public class AppConfig { @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setDestinationResolver(destinationResolver()); factory.setConcurrency("3-10"); return factory; } }默认情况下,基础架构将查找名为jmsListenerContainerFactory的 bean 作为用于创建消息监听容器的工厂源。 在这种情况下,忽略 JMS 基础架构的设置,processOrder方法可以在一个线程池中被调用,线程池核心数为3,最大线程数为10。
对于使用的每个注解都可以自定义监听容器工厂,或通过实现JmsListenerConfigurer接口来显示的配置默认值。仅在存在没有指定容器工厂的端点时,默认值才是必须的。有关详细信息和示例,请参阅 javadoc。
如果您喜欢XML配置,请使用<jms:annotation-driven>元素。
<jms:annotation-driven/> <bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destinationResolver" ref="destinationResolver"/> <property name="concurrency" value="3-10"/> </bean>JmsListenerEndpoint提供了 JMS 端点的模型,并负责为该模型配置容器。除了使用注解之外,基础架构也允许你用编程的方式来配置端点。
@Configuration @EnableJms public class AppConfig implements JmsListenerConfigurer { @Override public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) { SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint(); endpoint.setId("myJmsEndpoint"); endpoint.setDestination("anotherQueue"); endpoint.setMessageListener(message -> { // processing }); registrar.registerEndpoint(endpoint); } }本例中我们使用SimpleJmsListenerEndpoint来提供MessageListener,你也可以建立自己的端点变体并自定义调用机制。
应该注意的是,你完全可以不使用@JmsListener,而仅通过JmsListenerConfigurer来注册所有端点。
到目前为止,我们已经在我们的端点注入了一个简单的String,但实际上它可以有一个非常灵活的方法签名。现在让我们重写它并注入一个带有自定义头部的Order:
@Component public class MyService { @JmsListener(destination = "myDestination") public void processOrder(Order order, @Header("order_type") String orderType) { ... } }可以向 JMS 监听端点中注入的主要元素包括:
原始的javax.jms.Message或任意子类(当然,它与传入的消息类型相匹配)。可选的javax.jms.Session来操作 JMS 原生 API,来发送自定义回复。代表着接收消息的org.springframework.messaging.Message。注意此消息同时包含自定义和JmsHeaders定义的标准头。 @Header注解的方法参数被用来提取一个特定的头部值,包括标准 JMS 头。 @Headers注解参数必须指定给一个java.util.Map,用来获取所有头。不被支持的(如Message、Session等)、且无注解的元素被视为有效载荷。可以明确的给它们添加@Payload注解。也可以通过添加@Valid注解来开启校验。注入 Spring 的Message抽象可以获取特定消息的所有信息,而无需依赖特定传输 API。
@JmsListener(destination = "myDestination") public void processOrder(Message<Order> order) { ... }可以自己扩展DefaultMessageHandlerMethodFactory来处理额外的方法参数。同时你也可以自定义转换和校验规则。
例如,如果我们需要在处理之前确保我们的Order有效,我们可以使用@Valid对有效负载进行注解,并配置必要的验证器,如下所示:
@Configuration @EnableJms public class AppConfig implements JmsListenerConfigurer { @Override public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory()); } @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setValidator(myValidator()); return factory; } }MessageListenerAdapter允许你的方法有非空返回值。此时返回值将被封装在一个 javax.jms.Message中,发往原始消息的JMSReplyTo头中定义的目的地或者监听自己默认的目的地。监听默认的目的地可以通过@SendTo注解来设置。
假定现在我们的processOrder方法将返回一个OrderStatus,下面将展示如何自动地发送响应:
@JmsListener(destination = "myDestination") @SendTo("status") public OrderStatus processOrder(Order order) { // order processing return status; }如果你有多个@JmsListener注解方法,您还可以将@SendTo注解放在 class 上以共享默认响应目的地。
如果您需要以独立传输的方式设置额外的头信息,则可以返回一个Message,如下所示:
@JmsListener(destination = "myDestination") @SendTo("status") public Message<OrderStatus> processOrder(Order order) { // order processing return MessageBuilder .withPayload(status) .setHeader("code", 1234) .build(); }如果响应的目的地是运行时实时计算的,可以将响应结果封装在一个JmsResponse中,直接指定一个目的地。前面的例子可以重写如下:
@JmsListener(destination = "myDestination") public JmsResponse<Message<OrderStatus>> processOrder(Order order) { // order processing Message<OrderStatus> response = MessageBuilder .withPayload(status) .setHeader("code", 1234) .build(); return JmsResponse.forQueue(response, "status"); }Spring 引入了 XML 命名空间以简化 JMS 的配置。使用 JMS 命名空间元素时,需要引用如下的 JMS Schema:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <!-- bean definitions here --> </beans>命名空间由三个顶级元素组成:,和。可以使用注解驱动的监听端点。 和定义共享监听容器的配置,并且包含了子元素。下面是一个基本配置的示例,包含两个监听器。
<jms:listener-container> <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/> <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/> </jms:listener-container>上面的例子等同于在第26.4.4节“MessageListenerAdapter”的示例,定义两个不同的监听器容器和两个不同的MessageListenerAdapter。除了上面显示的属性之外,listener元素也包含几个可选属性。下面的表格列出了所有的属性:
<listener-container />元素也接受几个可选属性。这允许自定义各种策略(例如,taskExecutor和destinationResolver)以及基本的 JMS 设置和资源引用。使用这些属性,可以定义很广泛的定制监听容器,同时仍享有命名空间的便利。
作为一个通过factory-id属性指定要暴露的 bean 的 id 的JmsListenerContainerFactory,自动暴露了这些设置。
jms:listener-container connection-factory="myConnectionFactory" task-executor="myTaskExecutor" destination-resolver="myDestinationResolver" transaction-manager="myTransactionManager" concurrency="10"> <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/> <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/> </jms:listener-container>下面的表格描述了所有可用的属性。参考AbstractMessageListenerContainer类和具体子类的 Javadoc 来了解每个属性的细节。这部分的 Javadoc 也提高那个了事务选择和消息传输场景的讨论。
使用 jms Schema 支持来配置基于 JCA 的监听器容器很相似。
<jms:jca-listener-container resource-adapter="myResourceAdapter" destination-resolver="myDestinationResolver" transaction-manager="myTransactionManager" concurrency="10"> <jms:listener destination="queue.orders" ref="myMessageListener"/> </jms:jca-listener-container>JCA 可用的配置选项描述如下表:
转载自 并发编程网 - ifeve.com
相关资源:Spring Integration API(Spring Integration 开发文档).CHM