✅基于本地消息表实现分布式事务保证最终一致性

背景

这种背景的话一般都是在分布式场景中,需要保证各个系统之间的数据的最终一致性,比如交易下单环节,保证订单系统和用户积分系统之间的最终一致。

也就是说,用户下单后,订单需要创建成功,用户积分也要增加成功,如果有一个失败了,这都是没满足一致性。

技术选型

保证分布式事务的方案有很多,比如本地消息表、MQ的事务消息、TCC、Seata等、2PC等

这些方案中各自都有优缺点,首先比较重的就是TCC、Seata和2PC,因为他们要么需要引入一个单独的协调者,要么需要代码做改造,要么对分布式系统之间有很强的侵入性。

比如TCC需要下游提供Try、Confirm和Cancel三种操作,2PC也是,需要把一个业务操作拆成2个阶段。

那么相对来说比较轻量级的方案就是依赖可靠消息,实现最终一致性。尤其是我们这个场景中,积分的增加其实不需要强一致性,只需要保证几秒钟之后积分增加成功就行,而且是一旦下单成功,积分增加必须成功,所以就比较适合使用可靠消息来保证最终一致性。

那么也就是说我们可以在创订单系统创建订单成功之后,发一个MQ消息,然后积分系统接收这个MQ消息即可。

@Transactional
public void order(OrderDTO orderDTO){

    orderServive.createOrder(orderDTO);
    mqService.send(orderDTO);

}

但是这个方案存在一个问题,那就是第二步,发送消息其实是有可能失败的。那么就有以下几种情况:

1、消息发送失败,MQ没接到消息

这种情况,messageService.send(orderDTO);会抛异常,那么本地事务捕获到这个异常之后,把createOrder回滚了就行了

2、MQ接到了消息,但是客户端因为网络延迟以为失败了

这种情况比较复杂了,就是说客户端因为接到了失败的response,会直接回滚createOrder。但是MQ收到了消息之后,会投递给积分系统,积分系统会直接消费消息,然后增加积分

上面的第二种情况,就导致了数据不一致。

那么想要解决这个问题,要么就是用MQ的事务消息,要么就是引入本地消息表。因为不是所有的MQ都支持事务消息,所以这里我们选择本地消息表。

具体实现

在这个环节中,在订单服务的数据库中创建一张本地消息表

id long
gmt_create datetime
gmt_modified datetime
message_type varchar
biz_type varchar
identifier varchar
content text
state varchar

这样表就用来记录本地消息的。这样我们就可以把以上代码做一下调整:

@Transactional
public void order(OrderDTO orderDTO){

    orderServive.createOrder(orderDTO);
    messageService.createMessage(orderDTO);

}

这样我们就在一个事务中,创建两条数据库记录,因为加了事务,那么就可以保证,如果order创建成功,message也一定能写入成功。否则就都失败。

但是这里只是记录了本地消息,还需要把本地消息通过MQ发出去。这里就可以有很多办法了,一种是异步扫表,还可以直接同步发消息,也可以借助Spring Event来异步处理,都是可以的。

但是如果是同步发的话时效性肯定更好,但是同步发消息需要注意,要把调MQ发消息的地方放到事务外,要不然会因为MQ网络延迟等问题导致回滚,就又出现前面的问题了。

所以就可以用编程式事务:

@Autowired
TransactionTemplate transactionTemplate;

public void order(OrderDTO orderDTO){

   transactionTemplate.execute(
        new TransactionCallbackWithoutResult(){  
            @Override
            public Object doTransactionWithoutResult(TransactionStatus status){  
                orderServive.createOrder(orderDTO);
                                messageService.createMessage(orderDTO);
            }  
    }); 

    mqService.send(orderDTO);
    messageService.updateSuccess(orderDTO);

}
@Autowired
TransactionTemplate transactionTemplate;

public void order(OrderDTO orderDTO){

   boolean transactionSuccess = transactionTemplate.execute(new TransactionCallback<Boolean>() {
        @Override
        public Boolean doInTransaction(TransactionStatus status) {
            try {
                orderServive.createOrder(orderDTO);
                messageService.createMessage(orderDTO);
                //以上执行如果未抛异常,则成功,返回true
                return true; // 表示事务执行成功
            } catch (Exception e) {
                // 如果发生异常,则标记事务为回滚
                status.setRollbackOnly();
                return false; // 表示事务执行失败
            }
        }
    });

    if (transactionSuccess) {
        // 事务执行成功,可以执行 mqService.send(orderDTO)
        mqService.send(orderDTO);
        messageService.updateSuccess(orderDTO);
    } else {
        // 事务执行失败的处理逻辑
        // 可以抛出异常或记录日志等
    }
}

在事务中写入本地业务数据+本地消息,然后在事务外发MQ消息,如果发送失败了,也不影响事务的commit,如果发送成功了,把本地消息表的状态推进一下。

如果失败,下一次再通过定时任务扫表把需要处理的事件查出来重发就行了。

所以本地消息表中还需要有一个定时任务,还需要提供一个接口给下游回调,具体的主要流程看这个就行了:

✅如何基于本地消息表实现分布式事务?

学习资料

✅如何基于本地消息表实现分布式事务?

✅本地消息表实现的分布式的缺点有什么?

✅什么是分布式事务?

✅常见的分布式事务有哪些?

原文: https://www.yuque.com/hollis666/xkm7k3/hi956hl64rr7cwx1