RabbitMQ简介
集群模式丰富,表达式配置,HA模式,镜像队列模型
保证数据不丢失的前提做到高可靠性、可用性
AMQP全称:Advance Message Queuing Protocol(高级消息队列协议)
AMQP协议模型
官网地址:http://www.rabbitmq.com/
安装Linux必要依赖包
下载RabbitMQ必须安装包
进行安装,修改相关配置文件即可
准备
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz下载
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.e17.centos.x86_64.rpm wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.e17.lux.x86_64.rpm wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm配置
vim /etc/hosts 以及 /etc/hostname配置文件
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app修改密码、配置等,将loopback_users中的<<“guest”>>,修改只保留guest
服务启动和停止
启动:rabbitmq-server start & 停止:rabbitmqctl app_stop官网登陆
账号:guest 密码:guest
AMQP核心概念
Server:又称Broker,接受客户端的连接,实现AMQP实体服务
Connection:连接,应用程序与Broker的网络连接
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
Message:消息,服务器和应用程序之间传送的数据,有Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容
Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual host里面可以有若干个Exchange和Queue,同一个Virtual host里面不能有相同名称的Exchange或Queue
Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
Queue:也称为Message Queue消息队列,保存消息并将它们转发给消费者
RabbitMQ集成SpringBoot基本实现
1、发送端和接收端的pom.xml添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2、配置RabbitMQ
发送端配置: spring: rabbitmq: addresses: 192.168.11.81:5672 username: guest password: guest virtual-host: / connection-timeout: 15000 http: encoding: charset: UTF-8 jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: NON_NULL server: servlet: context-path: / port: 8001 接收端配置: spring: rabbitmq: addresses: 192.168.11.81:5672 username: guest password: guest virtual-host: / connection-timeout: 15000 listener: simple: #消费者数量 concurrency: 5 #最大消费者数量 max-concurrency: 10 #消息处理方式,有auto自动接收处理,自己manual手动接收处理(手动处理时,处理消息的方法要提供Channel参数回调给RabbitMQ通知执行消费者方法后已消费消息,刷新队列) acknowledge-mode: manual #一次处理的消息数量,在消息数量巨大的情况下,prefetch的设置能减少消息并发处理防止负载过大 prefetch: 1 order: queue: name: order-queue durable: true exchange: name: order-exchange durable: true type: topic ignoreDeclarationExceptions: true key: order.* server: servlet: context-path: / port: 80023、发送接收消息
发送端和接收端的消息处理对象: // 用于消息传递的对象要实现Serializable,因为通过网络通信传输对象需要序列化 public class Order implements Serializable { private static final long serialVersionUID = -1502291609049620042L; private String id; private String name; private String messageId; // 存储消息发送的唯一标识 public Order() {} public Order(String id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } getter and setter... } 发送端消息发送管理类: @Component public cclass OrderSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendOrder(Order order) throws Exception { CorrelationData correlationData = new CorrelationData(); correlationData.setId(order.getMessageId()); rabbitTemplate.convertAndSend( "order-exchange", // 交换机 "order.abcd", // routingKey order, // 消息体内容 correlationData // 消息唯一id ); } } 接收端消息接收管理类: public class OrderReceiver { // 使用RabbitMQ时创建队列queue和交换机exchange有两种方式: // 1、先在官网界面创建,此时发送端和接收端不需要按顺序创建 // 2、通过@RabbitListener指定的参数创建,需要先创建接收端运行启动服务 // 官网配置: // 1、官网添加需要发送的消息的队列queue // 2、官网添加交换机exchange // 3、将队列queue和交换机exchange关联绑定 //(注:绑定的routingKey有两种模糊匹配: // 1、xxx.*:只支持一个后缀的匹配,比如order.ab // 2、xxx.#:支持单个或多个后缀的匹配,比如order.ab、order.abc.123 // order-exchange:创建的交换机名称 // order-queue:创建的队列名称 // key:routingKey的消息匹配类型 // durable:是否支持序列化,对象是序列化的为true // topic:消息类型 @RabbitListener(bindings = @QueueBinding( value = @Queue( value = "${spring.rabbitmq.listener.order.queue.name}", durable = "${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange( name = "${spring.rabbitmq.listener.order.exchange.name}", durable = "${spring.rabbitmq.listener.order.exchange.durable}", type = "${spring.rabbitmq.listener.order.exchange.type}"), key = "${spring.rabbitmq.listener.order.key}" ) ) @RabbitHandler public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception { Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); // Ack,手动处理消息后,最后要设置Ack=false表示已消费消息通知RabbitMQ刷新队列,否则队列不会刷新认为没有处理 channel.basicAck(deliveryTag, false); } }消息百分百投递成功方案
流程:
sender发送前将消息入库到msg.db,消息状态status=0,入库后sender开始发送消息给MQ Broker
如果消息接收成功,则要有confirm listener通知sender消息发送成功,msg.db设置该条消息status=1
消息发送后没有confirm listener,定时消息任务会定时从msg.db获取status=0的消息重发,重发次数大于指定数则认为发送失败,msg.db设置该条消息status=2
主要是发送端的改动,接收端与上面的相同
1、数据库结构
// order订单表 CREATE TABLE IF NOT EXISTS 't_order'( 'id' varchar(128) NOT NULL, 'name' varchar(128), 'message_id' varchar(128) NOT NULL, PRIMARY KEY('id') ) ENGINE=InnoDB DEFAULT CHARSET=utf8; // broker_message_log消息记录表 CREATE TABLE IF NOT EXISTS 'broker_message_log'( 'message_id' varchar(128) NOT NULL, // 消息唯一id 'message' varchar(4000) DEFAULT NULLL, // 消息内容,将对象序列化成json 'try_count' int(4) DEFAULT '0', // 重试次数 'status' varchar(10) DEFAULT '', // 消息投递状态 0:投递中 1:投递成功 2:投递失败 'next_retry' timestamp NOT NULL DEFAULT '0000-00-00', // 下一次重试时间或超时时间 'create_time' timestamp NOT NULL DEFAULT '0000-00-00', 'update_time' timestamp NOT NULL DEFAULT '0000-00-00', PRIMARY KEY('message_id') ) ENGINE=InnoDB DEFAULT CHARSET=utf8;2、pom.xml添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- mybatis数据库jdbc --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper-spring-boot-starter</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.24</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- mybatis分页插件 --> <dependency> <groupId>com.github.miemiedev</groupId> <artifactId>mybatis-paginator</artifactId> <version>1.2.17</version> <exclusions> <exclusion> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> </exclusion> </exclusions> </dependency> .....3、发送端配置
spring: rabbitmq: addresses: 192.168.11.81:5672 username: guest password: guest virtual-host: / connection-timeout: 15000 #采用消息确认模式,异步等待消息响应结果 publisher-confirms: true publisher-returns: true template: mandatory: true http: encoding: charset: UTF-8 jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: NON_NULL datasource: type: com.alibaba.druid.pool.DruidDriver url: jdbc:mysql://localhost:3306/test?charsetEncoding=UTF-8 driver-class-name: com.mysql.jdbc.Driver ursername: root password: root mybatis: type-aliasses-package: com.exam[le.springboot mapper-locations: classpath:com/example/springboot/mapping/*.xml logging: level: tk: mybatis: TRACE server: servlet: context-path: / port: 80014、数据库mybatis配置
@Component @ConfigurationProperties(prefix="spring.datasource") @PropertySource("classpath:druid.properties") // 读取外部配置文件druid.properties public class DruidDataSourceSettings { private String driverClassName; private String url; private String username; private String password; @Value("${druid.initialSIze}") private int initialSize; @Value("${druid.minIdle}") private int minIdle; @Value("${druid.maxActive}") private int maxActive; @Value("${druid.timeBetweenEvictioRunsMillis}") private long timeBetweenEvictionRunsMillis; @Value("${druid.minEvictableIdleTimeMillis}") private long inEvictableIdleTimeMillis; @Value("${druid.validationQuery}") private String validationQuery; @Value("${druid.testWhileIdle}") private boolean testWhileIdle; @Value("${druid.testOnBorrow}") private boolean testOnBorrow; @Value("${druid.testOnReturn}") private boolean testOnReturn; @Value("${druid.poolPreparedStatements}") private boolean poolPreparedStatements; @Value("${druid.maxPoolPreparedStatementPreConnectionSize}") private int maxPoolPreparedStatementPReConnectionSize; @Value("${druid.filters}") private String filters; @Value("${druid.connectionProperties}") private String connectionProperties; @Bean public static PropertySourcePlaceholderConfigure propertyConfigure() { return new PropertySourcePlaceholderConfigure(); } getter and setter... } @Configuration @EnableTransactionManagement public class DruidDataSourceConfig { private DruidDataSourceSettings druidSettings; public static String DRIVER_CLASSNAME; @Bean public static PropertySourcePlaceholderConfigure propertyConfigure() { return new PropertySourcePlaceholderConfigure(); } @Bean public DataSource dataSource() throws SQLException { DruidDataSource ds = new DruidDataSource(); setter.... } @Bean public PlatformTransactionManager transactionManager throws Exception { DataSourceTransactionManager txManager = new DataSourceTransactionManager(); txManager.setDataSource(dataSource()); return txManager; } } @Configuration public class MyBatisDataSourceConfig { @Autowired private DataSource dataSource; @Bean(name="sqlSessionFactory") public SqlSessionFactory sqlSessionFactoryBean() { SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); // xml映射查找 ResourcePatternResolver resolver = new PathMatchingResourcPatterneResolver(); try { bean.setMapperLocations(resolver.getResources("classpath:com/example/springboot/mapping/*.xml")); SqlSessionFactory sqlSessionFactory = bean.getObjects(); sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE); return sqlSessionFactory; } catch (Exception e) { throw new RuntimeException(e); } } } @Configuration @AutoConfigureAfter(MyBatisDataSourceConfig.class) public class MyBatisMapperScannerConfig { @Bean public MapperScannerConfigure mapperScannerConfigure() { MapperScannerConfiguer mapperSannerConfiguer = new MapperScannerConfigure(); mapperScannerConfigure.setSqlSessionFactoreyBeanName("sqlSessionFactory"); mapperScannerConfigure.setBasePackage("com.example.springboot.mapper"); return mapperScannerConfigure; } }5、定时任务配置
@Configure @EnableScheduling public class TaskSchedulerConfig implements SchedulingConfigure { @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(taskScheduler()); } @Bean(destroyMethod="shutdown") public Executor taskScheduler() { return Executors.newScheduledThreadPool(100); } }6、发送端消息业务代码
// 订单表model public class Order implements Serializable { private static final long serialVersionUID = -1; private String id; private String name; private String messageId; getter and setter... } public class BrokerMessageLog { private String messageId; private String message; private Integer tryCount = 0; private String status; private Date nextRetry; private Date createTime; private Date updateTime; getter and setter... } // 消息发送管理类 @Component public class RabbitOrderSender { @Autowired private RabbitTemplate rabbitTemplate; // mybatis消息数据库处理类 @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; // 消息发送成功确认回调 final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String messageId = correlationData.getId(); if (ack) { // 消息被接收到,更新消息数据库该消息的状态为已投递成功status=1 brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, 1); } else { // 失败则进行一些具体的后续操作:重试或补偿手段 .... } } }; public void sendOrder(Order order) throws Exception { // 设置消息投递回调 rabbitTemplate.setConfirmCallback(confirmCallback); CorrelationData correlationData = new CorrelationData(order.getMessageId()); rebbitTemplate.convertAndSend("order-exchange", "order.ABC", order, correlationData); } } // 订单service @Service public class OrderService { public void createOrder(Order order) throws Exception { // 创建订单,更新订单时间 Date orderTime = new Date(); orderMapper.insert(order); // 构建消息投递数据,创建一条订单的消息 BrokerMessageLog brokerMessageLog = new BrokerMessageLog(); brokerMessageLog.setMessageId(order.getMessageId()); brokerMessageLog.setMessage(FastJsonConvertUtil.convertObjectToJSON(order)); brokerMessageLog.setStatus("0"); brokerMessageLog.setNextRetry(DataUtils.addMinutes(orderTime, 1)); brokerMessageLog.setCreateTime(new Date()); brokerMessageLog.setUpdateTime(new Date()); brokerMessageLogMapper.insert(brokerMessageLog); // 发送消息 rabbitOrderSender.sendOrder(order); } } // 消息重试管理类,已经在上面配置了定时任务线程池 @Component public class RetryMessageTasker { @Autowired private RabbitOrderSender rabbitOrderSender; // 消息Repository @Autowired private BrokerMessageLogMapper brokerMessageLogMepper; @Scheduled(initialDelay = 3000, fixedDelay = 1000) public void reSend() { // 获取status=0的消息列表 List<BrokerMessageLog> list = brokerMessageLogMapper.query4StatusAndTimeoutMessage(); list.forEach(messageLog -> { if (messageLog.getTryCount() >= 3) { // 重试3次都失败,消息状态更新为status=2 brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLog.getMessageId(), 2, new Date()); } else { // 更新消息的重试时间 brokerMessageLogMapper.update4ReSend(messageLog.getMessageId(), new Date()); Order reSendOrder = FastJsonConvertUtils.convertJSONToObject(messageLog.getMessage()); try { rabbitOrderSender.sendOrder(reSendOrder); } catch (Exception e) { e.printStatkTrace(); } } }); } }