分布式事务之rabbitMQ最终一致性

    xiaoxiao2022-07-07  197

    一般的大型电商网站都会面临的问题:分布式事务,在面临分布式微服务等项目使用传统的单一事务已经无法满足,解决分布式事务的方案也比较多,有TCC事务补偿(基于2PC的实现)、2PC(两阶段提交)、3PC(三阶段提交)等,框架有JTA atomiks等。很多公司也有自己的分布式事务解决方案,比如最开始支付宝的XTS等

    像JTA atomiks等2PC的方案效率并不高,中间需要一个协调者,并且是同步的,性能低下。如果对性能要求并不高的实际业务可以选用

    在分布式中,前辈们已经很早就挖掘了一套理论,比如CAP理论,BASE定理  https://www.cnblogs.com/duanxz/p/5229352.html

    没有用过rabbitmq,拿来练手了解了解

    用自己学习的项目结构,使用的spring cloud

    首先向product插入一个商品,但是必须生成一条订单,需要数据是一致的。

    product服务:启动类

    package com.chwl.cn; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCaching; import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; import org.springframework.cloud.netflix.ribbon.RibbonClient; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.scheduling.annotation.EnableScheduling; import com.chwl.cn.ribbon.CustomRibbon; @SpringBootApplication @MapperScan(basePackages="com.chwl.cn.mapper") @EnableEurekaClient @EnableCircuitBreaker//开启hystrix服务熔断回调 @EnableCaching//开启本地缓存 @RibbonClient(name="CHWL-PROVIDER-ORDER",configuration=CustomRibbon.class)//自定义的负载均衡算法,针对当前服务按照自己的实际业务进行编写负载均衡算法 @EnableFeignClients//feign声明式API调用(RestTemplate+Ribbon负载均衡) @EnableScheduling//开启定时任务 public class ProductApplication { public static void main(String[] args) { SpringApplication.run(ProductApplication.class, args); } }

    pom.xml需要引入:

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

    application.yml

    server: port: 8003 #开启断路器,Feign接口API的实现类方法处理 feign: hystrix: enabled: true hystrix: command: default: #default全局有效,service id指定应用有效 execution: timeout: #如果enabled设置为false,则请求超时交给ribbon控制,为true,则超时作为熔断根据 enabled: true isolation: thread: timeoutInMilliseconds: 3000 #断路器超时时间,默认1000ms spring: application: name: chwl-provider-product #很重要,很重要,很重要,这是微服务向外部暴露的微服务的名字 datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.cj.jdbc.Driver platform: mysql url: jdbc:mysql://xxx.xxx.xxx.xx:5306/chwl?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false username: root password: admin redis: # database: 1 host: xxxxxxx port: 6379 password: timeout: 10000 lettuce: pool: minIdle: 0 maxIdle: 10 maxWait: 10000 max-active: 10 sentinel: master: master-6379 nodes: 127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381 # cluster: # nodes: # - 192.168.91.5:9001 # - 192.168.91.5:9002 # - 192.168.91.5:9003 # - 192.168.91.5:9004 # - 192.168.91.5:9005 # - 192.168.91.5:9006 rabbitmq: host: 120.79.81.103 port: 5672 username: xxx password: xxxxxx publisher-confirms: true #开启消息确认机制 publisher-returns: true #开启发送失败退回 virtual-host: / #虚拟主机(一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列 template: mandatory: true #保证监听有效 listener: simple: acknowledge-mode: manual #消费者的ack方式为手动 auto自动 none不会发送ACK(与channelTransacted=true不兼容) concurrency: 1 #最小消费者数量 max-concurrency: 10 #最大消费者数量 retry: enabled: true #支持重试/重发 mybatis: config-location: classpath:mybatis/mybatis.cfg.xml #typeAliasesPackage: com.ypp.springcloud.entites mapper-locations: classpath:mybatis/mapper/**/*Mapper.xml mapper: mappers: com.chwl.cn.basemapper.BaseMapper identity: mysql eureka: client: #客户端注册进eureka服务列表内 service-url: #defaultZone: http://localhost:2001/eureka #这个地址就是EurekaServer注册中心的地址 defaultZone: http://ypp:admin@eureka2001.com:2001/eureka/,http://ypp:admin@eureka2002.com:2002/eureka/ instance: instance-id: chwl-provider-product prefer-ip-address: true #访问路径可以显示IP地址

    一般每个消息队列配置一个,比如订单队列、产品队列

    MQOrderQueueConfig:

    package com.chwl.cn.config.mq; import java.util.HashMap; import java.util.Map; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQOrderQueueConfig { public static final String ORDER_QUEUE_NAME="orderQueue"; public static final String ORDER_ROUTING_KEY="order_routing_key"; public static final String ORDER_EXCHANGE_NAME="order_exchange"; /** * 将普通队列绑定到交换机上 * 声明一个持久化队列 第二个参数true为持久化,在下次重启后自动加载队列,不设置也是默认持久化 * @return */ @Bean public Queue orderQueue() { return new Queue(ORDER_QUEUE_NAME,true); } @Bean public DirectExchange orderExchange(){ return new DirectExchange(ORDER_EXCHANGE_NAME); } @Bean public Binding bindingExchange(@Qualifier("orderQueue")Queue queue,@Qualifier("orderExchange")DirectExchange directExchange){ return BindingBuilder.bind(queue).to(directExchange).with(ORDER_ROUTING_KEY); } /** * 定制化amqp模版 * * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack */ // @Bean // public RabbitTemplate rabbitTemplate() { // Logger log = LoggerFactory.getLogger(RabbitTemplate.class); // // // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true // rabbitTemplate.setMandatory(true); // // // 消息返回, yml需要配置 publisher-returns: true // rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // String correlationId = message.getMessageProperties().getCorrelationId(); // log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, // routingKey); // }); // // // 消息确认, yml需要配置 publisher-confirms: true // rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { // if (ack) { // // log.debug("消息发送到exchange成功,id: {}", correlationData.getId()); // } else { // log.debug("消息发送到exchange失败,原因: {}", cause); // } // }); // return rabbitTemplate; // } } package com.chwl.cn.service.product.impl; import java.util.UUID; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import com.alibaba.fastjson.JSONObject; import com.chwl.cn.api.OrderClientService; import com.chwl.cn.config.mq.MQOrderQueueConfig; import com.chwl.cn.entity.localmessage.LocalMessageEntity; import com.chwl.cn.entity.order.OrderEntity; import com.chwl.cn.entity.product.ProductEntity; import com.chwl.cn.mapper.ProductMapper; import com.chwl.cn.service.localmessageservice.IMQLocalMessageService; import com.chwl.cn.service.mq.SenderService; import com.chwl.cn.service.product.IProductService; @Service @Transactional public class ProductService implements IProductService{ @Autowired private OrderClientService orderClientService; @Autowired private SenderService senderService; @Autowired private IMQLocalMessageService mqLocalMessageService; @Autowired private ProductMapper mapper; /** * 本地消息表和产品插入必须在同一个事务,保证原子操作 */ @Override public ProductEntity add(ProductEntity product) { mapper.insert(product); OrderEntity order = new OrderEntity().setOrderNumber(UUID.randomUUID().toString()).setSerialNumber(UUID.randomUUID().toString()); String context = JSONObject.toJSONString(order); // 初始化本地消息为发送失败,mq回调确认收到消息后修改为发送成功 //本地消息服务一般抽离出来做成一个本地消息服务系统,因为其他服务也会用到,也要进行实现最终一致性进行存储本地消息 LocalMessageEntity localMessage = new LocalMessageEntity().setContext(context) .setSerialNumber(UUID.randomUUID().toString()) // 序列号 .setState(LocalMessageEntity.L_M_STATE_FAIL); // 添加本地消息服务 mqLocalMessageService.insert(localMessage); senderService.send(context,String.valueOf(localMessage.getId())); return product; } }

    统一的发送消息服务SenderService:

    package com.chwl.cn.service.mq; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import com.alibaba.fastjson.JSONObject; import com.chwl.cn.config.mq.MQOrderQueueConfig; import com.chwl.cn.entity.localmessage.LocalMessageEntity; import com.chwl.cn.service.localmessageservice.IMQLocalMessageService; @Component @Transactional public class SenderService implements ReturnCallback, ConfirmCallback { private final Logger log= LoggerFactory.getLogger(SenderService.class); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private IMQLocalMessageService mqLocalMessageService; // private AmqpTemplate rabbitTemplate; public void send(String context,String localMessageId) { System.out.println("Sender发送内容 : " + context); this.rabbitTemplate.setMandatory(true);// 当Mandatory参数设为true时,如果目的不可达,会发送消息给生产者,生产者通过一个回调函数来获取该信息。 this.rabbitTemplate.setConfirmCallback(this);//确认回调 this.rabbitTemplate.setReturnCallback(this);//失败回退 CorrelationData correlationData = new CorrelationData(localMessageId);//用于确认之后更改本地消息状态或删除--本地消息id this.rabbitTemplate.convertAndSend(MQOrderQueueConfig.ORDER_EXCHANGE_NAME,MQOrderQueueConfig.ORDER_ROUTING_KEY, context,correlationData); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String localMessageId = correlationData.getId(); if (ack) {// 消息发送成功,更新本地消息为已成功发送状态或者直接删除该本地消息记录,剩余的由MQ投递到消费者端,消费者端需要进行幂等,避免产生脏数据 LocalMessageEntity message = new LocalMessageEntity(); message.setId(Long.valueOf(localMessageId)); message.setState(LocalMessageEntity.L_M_STATE_SUCCESS); mqLocalMessageService.updateById(message); // mqLocalMessageService.deleteById(localMessageId); } else { //失败处理 } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey); } }

    兜底方案,防止MQ中途出现故障,保证每个消息都可以发送到MQ,轮训本地消息表对没有发送消息重发,每隔30秒轮训一次

    CheckMQLocalMessage:

    package com.chwl.cn.config.mq; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.chwl.cn.entity.localmessage.LocalMessageEntity; import com.chwl.cn.service.localmessageservice.IMQLocalMessageService; import com.chwl.cn.service.mq.SenderService; @Component public class CheckMQLocalMessage { @Autowired private SenderService senderService; @Autowired private IMQLocalMessageService messageService; /** * 兜底方案:必须保证每个消息都发送到MQ消费端进行消费,保证数据最终一致 * 每隔30秒检查本地消息表没有发送成功的消息,进行重试再次发送到MQ */ @Scheduled(fixedDelay=1000*30L) public void checkMQLocalMessage(){ LocalMessageEntity lme = new LocalMessageEntity().setState(LocalMessageEntity.L_M_STATE_FAIL); List<LocalMessageEntity> failStates = messageService.select(lme); if(failStates!=null&&failStates.size()>0){ failStates.stream().forEach(messageFailstate->{ senderService.send(messageFailstate.getContext(), String.valueOf(messageFailstate.getId())); }); } } }

    订单服务:不同的项目

    消费者,可以定义多个消费者,但某一条消息只会有一个消费成功的消费者

    OrderConsumer:

    package com.chwl.cn.config.mq; import java.io.IOException; import java.util.Date; import java.util.List; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import com.alibaba.fastjson.JSONObject; import com.chwl.cn.entity.order.OrderEntity; import com.chwl.cn.service.order.IOrderService; import com.rabbitmq.client.Channel; @Component @Transactional public class OrderConsumer { @Autowired private IOrderService orderService; // @RabbitHandler @RabbitListener(queues = "orderQueue")// @RabbitListener注解用于监听RabbitMQ,queues指定监听哪些队列 public void process(Channel channel, Message message) { System.err.println("order收到 : " + message.getBody() +"消费时间"+new Date()); try { OrderEntity orderEntity = JSONObject.parseObject(message.getBody(), OrderEntity.class); //状态和订单号进行幂等性判断防止应用中途挂掉或异常,MQ没有收到ACK确认导致重发消息数据库重复添加 List<OrderEntity> list = orderService.select(orderEntity);//通用mapper查询--根据实体中的属性值进行查询,查询条件使用等号 if(list!=null&&list.size()>0){ orderService.updateByIdSelective(orderEntity); }else { orderService.insert(orderEntity); } //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//deliveryTag是tag的id,由生产者生成 } catch (IOException e) { e.printStackTrace(); //丢弃这条消息 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); System.out.println("receiver fail"); //对同一订单异常次数统计,多次都失败,这里人工干预 } } }

    测试:

    添加商品

    添加成功

    本地消息添加成功,状态是2,还没有发送到MQ(打了断点)

    MQ已经收到消息,并异步确认回调

    断点在确认这里,MQ确认回调,然后修改本地消息表状态为已发送表示MQ已经收到消息

    更改为已发送状态

    启动order服务:

    order服务监听了orderQueue队列,对消息已经消费

    order数据库添加数据,流程完毕。

    连贯走一遍:

    中途在打断点的时候定时任务轮训了本地消息表,对打断点的消息也发送到了MQ队列,意味着MQ有两条消息(实际是同一个order数据),这里仍然只添加了一条,需要做幂等处理,防止脏数据。

    product和order等表都是不同的数据库

     

     

    总结:

    队列消息模型的特点:

    1、消息生产者将消息发送到Queue中,然后消费者监听Queue并接受消息

    2、消息被确认消费(ACK机制)之后,就会从Queue中移除,消费者不会接受到已经被消费过的数据

    3、Queue支持多个消费者,但对于某一个消息而言,只会存在一个成功消费此消息的消费者

    队列生产与消费的流程:

    1、Producer生成消息并发送给MQ(同步/异步)

    2、MQ接受消息并将消息持久化(持久化为可选操作配置)

    3、MQ向生产者返回消息的接受结果(确认/返回值/异常)

    4、Consumer监听MQ中的消息

    5、Consumer获取到消息执行相应的业务逻辑

    6、Consumer进行消费后,对成功消费的消息向MQ进行确认(ACK)

    7、MQ得到ACK确认后将消息从MQ中移除

     

    每一步都有可能异常或者出现意外,比如在插入到数据库中出现异常、发送到MQ中途异常、MQ接受消息后返回确认时异常、MQ向消费者投递消息异常、消费者消费消息后向MQ进行ACK确认时异常等。每一步的异常都要进行相应的处理,不然就有可能导致脏数据。

    插入到数据库中出现异常:当product插入到数据库异常,这一步还没到MQ,直接回滚。本地消息表的作用在于不过度依赖MQ,MQ中途也有可能挂掉。

    发送到MQ中途异常:轮训本地消息表对没有发送到MQ的消息进行重发

    MQ接受消息后返回确认时异常:和上步一样,重发到MQ,等MQ给确认收到更改本地消息

    MQ向消费者投递消息异常:MQ会进行重试向消费者投递消息,只要消费者没有给MQ ACK确认,MQ就会进行重试,重试配置可根据实际业务配置

    消费者消费消息后向MQ进行ACK确认时异常:和上步相同,MQ没收到ACK确认还是会重发

    MQ重发时,在消费者需要进行幂等设计处理,防止脏数据

    对于本地消息表,可以抽离出来做成消息服务系统,毕竟不只是一个服务在使用,抽离出来可以作为共用

    最新回复(0)