Spring集成RabbitMQ 消费者的配置

    xiaoxiao2023-12-01  154

    需要的jar包:spring-rabbit-1.3.5.RELEASE.jar,amqp-client-3.3.4.jar,spring-amqp-1.3.5.RELEASE.jar,spring-retry-1.1.0.RELEASE.jar

    1. 添加配置文件rabbitmq.xml、rabbitmq.properties

    mq.host=192.168.64.129 mq.username=guest mq.password=guest mq.port=5672

     

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!--配置connection-factory,指定连接rabbit server参数--> <!--或者这样配置,connection-factory元素实际就是注册一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory实例--> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" port="${mq.port}" username="${mq.username}" password="${mq.password}"/> <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 --> <!-- <rabbit:admin connection-factory="connectionFactory" /> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> --> <!--定义queue 说明:durable:是否持久化 auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 exclusive: 仅创建者可以使用的私有队列,断开后自动删除 --> <rabbit:queue name="queue" durable="true" auto-delete="false" exclusive="false" /> <!--定义topic-exchange --> <!-- <rabbit:topic-exchange name="mq.asdfExChange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queue1" pattern="queue1Key"></rabbit:binding> <rabbit:binding queue="queue2" pattern="queue2Key"></rabbit:binding> <rabbit:binding queue="queue2" pattern="queue2Key"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> --> <!--定义direct-exchange key对应的是发送消息是的routingkey --> <rabbit:direct-exchange name="(AMQP default)" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queue" key="queueKey" ></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 消息接收者 --> <bean id="qwerConsumer" class="com.gzedu.util.rabbitmq.MessageConsumer"></bean> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 --> <!-- <rabbit:listener-container connection-factory="connectionFactory" > <rabbit:listener queues="" ref=""/> </rabbit:listener-container> --> <rabbit:listener-container connection-factory="connectionFactory" > <rabbit:listener queues="queue" ref="qwerConsumer"/> </rabbit:listener-container> </beans>

    2. 消息消费者 MessageConsumer.java

    public class MessageConsumer implements MessageListener { private static Logger log = Logger.getLogger(MessageConsumer.class); @Override public void onMessage(Message message) { System.out.println(message.getBody()); String ms = new String(message.getBody(), "UTF-8"); System.out.println(ms); } }

    在消费者中无法注入自己的bean,

    activeMq,rabbitMq这些用监听形式配置的bean是放在ContextListener的初始化的时候的,dispatcher.xml中配置的注解bean的优先级肯定没有框架中的contextListener的优先级高,所以直接用@Autwire会获取不到dispatch.xml中的bean,

    此时从上下文中获取,需要实现ApplicationContextAware,这里的Bean的name就是@Service注解的类的小写全称。

    public class SpringContextHolder implements ApplicationContextAware { private static ApplicationContext applicationContext; /** * 实现ApplicationContextAware接口的context注入函数, 将其存入静态变量. */ @Override public void setApplicationContext(ApplicationContext applicationContext) { SpringContextHolder.applicationContext = applicationContext; // NOSONAR } /** * 取得存储在静态变量中的ApplicationContext. */ public static ApplicationContext getApplicationContext() { checkApplicationContext(); return applicationContext; } /** * 从静态变量ApplicationContext中取得Bean, 自动转型为所赋值对象的类型. */ @SuppressWarnings("unchecked") public static <T> T getBean(String name) { checkApplicationContext(); return (T) applicationContext.getBean(name); } /** * 从静态变量ApplicationContext中取得Bean, 自动转型为所赋值对象的类型. */ @SuppressWarnings("unchecked") public static <T> T getBean(Class<T> clazz) { checkApplicationContext(); return (T) applicationContext.getBeansOfType(clazz); } /** * 清除applicationContext静态变量. */ public static void cleanApplicationContext() { applicationContext = null; } private static void checkApplicationContext() { if (applicationContext == null) { throw new IllegalStateException("applicaitonContext未注入,请在applicationContext.xml中定义SpringContextHolder"); } } }

     

    最新回复(0)