Macula Boot Starter Seata

概述

macula-cloud-seata是基于seata-server的服务端实现,以nacos作为注册和配置中心。同时提供starter,默认支持RestTemplate、FeignClient的分布式事务支持。

客户端接入

组件坐标

<dependency>
    <groupId>dev.macula.boot</groupId>
    <artifactId>macula-boot-starter-seata</artifactId>
    <version>${macula.version}</version>
</dependency>

使用配置

客户端加入如下配置

seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: ${spring.application.name}-tx-group
  enable-auto-data-source-proxy: true			# 开启自动数据源代理,如果不使用AT模式,不要开启,默认为true
  saga:
  	enabled: true													# 开启saga自动配置,默认是false
  config:
    type: nacos
    nacos:
      serverAddr: 127.0.0.1:8848
      dataId: "seata.properties"					# 默认是seata.properties
      username: 'nacos'
      password: 'nacos'
  registry:
    type: nacos
    nacos:
      application: seata-server						# seata服务器的ID
      server-addr: 127.0.0.1:8848
      username: 'nacos'
      password: 'nacos'

在nacos的seata.properties中增加需要的如下事务群组配置

   service.vgroupMapping.order-service-tx-group=default					# 示例
   service.vgroupMapping.account-service-tx-group=default				# 示例
   service.vgroupMapping.business-service-tx-group=default			# 示例
   service.vgroupMapping.storage-service-tx-group=default				# 示例

另外,client.http.interceptor-enabled=false可以关闭默认的HTTP拦截

核心功能

具体可以参考官方文档,这里只简述一些基本概念和基本的运行测试。

数据源支持

AT模式

AT模式支持的数据库有:MySQL、Oracle、PostgreSQL、 TiDB、MariaDB。

TCC模式

TCC模式不依赖数据源(1.4.2版本及之前),1.4.2版本之后增加了TCC防悬挂措施,需要数据源支持。

Saga模式

Saga模式不依赖数据源。

XA模式

XA模式只支持实现了XA协议的数据库。Seata支持MySQL、Oracle、PostgreSQL和MariaDB。

注解开启全局事务

@GetMapping(value = "testCommit")
@GlobalTransactional
public Object testCommit(@RequestParam(name = "id",defaultValue = "1") Integer id,
    @RequestParam(name = "sum", defaultValue = "1") Integer sum) {
    Boolean ok = productService.reduceStock(id, sum);
    if (ok) {
        LocalDateTime now = LocalDateTime.now();
        Orders orders = new Orders();
        orders.setCreateTime(now);
        orders.setProductId(id);
        orders.setReplaceTime(now);
        orders.setSum(sum);
        orderService.save(orders);
        return "ok";
    } else {
        return "fail";
    }
}

Seata AT模式

AT模式根据其名称也能反馈出来他的特性,他是自动型的分布式事务解决方案。这个自动提现在他无需代码入侵,也就是说我们不需要再编写多余的代码来实现这个模式,只需要在方法中添加上指定的注解即可。

那么AT模式是如何实现无代码入侵的呢?他的工作原理是什么?

完整内容可参考原创博客

AT模式原理总览

AT模式分成两阶段来工作,我们先省略部分细节来整体了解其过程:

  • 一阶段:

(1)拦截并解析业务SQL,找到需要在数据表中更新的数据,将其转换为undo_log,并且保存到提前在每个数据库中创建的undo_log表中。打个比方,如果你是在做update product set price = 20 where price=10 and id=1的操作,那么undo_log中保存的就是反向sql update product set price=10 where price=20 and id=1。当然它的存储不会直接这么存,会经过处理。

(2)然后执行业务SQL,这时你会发现数据库中的数据是发生变化了的,同时undo_log中也有对应的新增数据

  • 二阶段

(1)因为第一阶段已经提交了本地事务,数据已经更新过了,这个时候如果没有报错,那么直接删除掉undo_log以及行锁的数据即可

(2)但是如果发生了报错,就只需要根据undo_log来回退数据

在这里插入图片描述

这个就是AT模式执行的两阶段的整体视角,我们可以体会到的是AT模式下,自动帮助我们生成了undo_log、一阶段、二阶段的提交都是由seata完成的,并不需要我们写代码来实现,所以它的无代码入侵体现在这里。

AT模式应用场景

AT模式是实际开发中最常用的模式,他无代码入侵的特性,适应于不希望对代码进行改造的场景,因为AT模式要求数据库是支持本地事务的数据库,但是因为市面上多数使用的都是支持事务的数据库,所以其应用也十分广泛。

同时其性能也还不错,完全满足普通业务,如果对于性能有较高要求的,就要用到我们其他模式的。

AT应用场景总结:

  • 不希望对代码进行改造
  • 数据库支持事务操作
  • 对性能没有特别高的要求

Seata TCC模式

TCC模式,全称Try-Confirm-Cancel,通过名称也能看出来其流程主要有三个步骤:

  • 预处理 Try:实现业务检查和资源预留
  • 确认/提交 Confirm:业务确认和提交
  • 撤销/回滚 Cancel:业务回滚

在这里插入图片描述

理解TCC模式的关键在于理解Try-Confirm阶段,其中Try用来实现业务检查和资源预留,这个概念比较抽象,我们举个例子来看看:

现在我们有一个下订单的操作,订单创建后需要扣减商品库存

​ 完整内容可参考原创博客

案例
订单服务

注意需要在接口上添加@LocalTCC注解

订单新增方法上要添加@TwoPhaseBusinessAction注解,并且声明confirm,cancel方法

import com.baomidou.mybatisplus.extension.service.IService;
import com.example.orderserver.entity.Order;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

/**
 * @author whx
 * @date 2022/4/30
 */
@LocalTCC
public interface IOrderTccService extends IService<Order> {

    /**
     * @TwoPhaseBusinessAction 描述⼆阶段提交
     * name: 为 tcc⽅法的 bean 名称,需要全局唯⼀,⼀般写⽅法名即可
     * commitMethod: Commit/Confirm⽅法的⽅法名
     * rollbackMethod:Rollback/Cancel⽅法的⽅法名
     * @BusinessActionContextParamete 该注解可以将参数传递给声明的commitMethod和rollbackMethod,通过BusinessActionContext 获取。
     */
    @TwoPhaseBusinessAction(name = "addOrder", commitMethod = "addCommit", rollbackMethod = "addRollBack")
    void addOrder(@BusinessActionContextParameter(paramName = "order") Order order);

    boolean addCommit(BusinessActionContext context);

    boolean addRollBack(BusinessActionContext context);
}

实现类

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.orderserver.entity.Order;
import com.example.orderserver.feign.ProductApi;
import com.example.orderserver.mapper.OrderMapper;
import com.example.orderserver.service.IOrderTccService;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
 * @author whx
 * @date 2022/4/30
 */
@Slf4j
@Service
@AllArgsConstructor
public class IOrderTccServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderTccService {

    // 商品feign调用接口
    private final ProductApi productApi;

    @Override
    public void addOrder(Order order) {
        // 设置状态为未提交
        order.setStatus(0);
        // 新增订单
        this.save(order);
        // 扣减库存
        productApi.reduceInventory(order.getProduct().getId(),order.getProduct().getFrozenInventory());
    }

    @Override
    public boolean addCommit(BusinessActionContext context) {
        Order order = JSON.parseObject(context.getActionContext("order").toString(), Order.class);
        int count = this.count(Wrappers.<Order>lambdaQuery().eq(Order::getId,order.getId()));
        if(count > 0){
            order.setStatus(1);
            baseMapper.updateById(order);
        }
        log.info("事务 xid="+context.getXid()+" 提交成功");
        return true;
    }

    @Override
    public boolean addRollBack(BusinessActionContext context) {
        Order order = JSON.parseObject(context.getActionContext("order").toString(), Order.class);
        int count = this.count(Wrappers.<Order>lambdaQuery().eq(Order::getId,order.getId()));
        if(count > 0){
            // 删除数据
            this.removeById(order.getId());
        }
        log.info("事务 xid="+context.getXid()+" 回滚成功");
        return true;
    }
}
商品服务
import com.baomidou.mybatisplus.extension.service.IService;
import com.example.productserver.entity.Product;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

/**
 * @author whx
 * @date 2022/4/30
 */
@LocalTCC
public interface IProductTccService extends IService<Product>{

    /**
     * @TwoPhaseBusinessAction 描述⼆阶段提交
     * name: 为 tcc⽅法的 bean 名称,需要全局唯⼀,⼀般写⽅法名即可
     * commitMethod: Commit/Confirm⽅法的⽅法名
     * rollbackMethod: Rollback/Cancel⽅法的⽅法名
     * @BusinessActionContextParamete 该注解可以将参数传递给声明的commitMethod和rollbackMethod,通过BusinessActionContext 获取。
     */
    @TwoPhaseBusinessAction(name = "reduceInventory", commitMethod = "reduceCommit", rollbackMethod = "reduceRollBack")
    String reduceInventory(@BusinessActionContextParameter(paramName = "productId") Long id,
                         @BusinessActionContextParameter(paramName = "number") Integer number);

    boolean reduceCommit(BusinessActionContext context);

    boolean reduceRollBack(BusinessActionContext context);

}

实现类

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.productserver.entity.Product;
import com.example.productserver.mapper.ProductMapper;
import com.example.productserver.service.IProductTccService;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author whx
 * @date 2022/4/30
 */
@Slf4j
@Service
public class IProductTccServiceImpl extends ServiceImpl<ProductMapper, Product> implements IProductTccService {

    /**
     * Transactional 添加行锁,防止多线程下更新库存时被其他线程读取到脏数据,这样能保证第一次获取时FrozenInventory为0
     * @param productId
     * @param number
     * @return
     */
    @Override
    @Transactional(isolation = Isolation.READ_COMMITTED)
    public String reduceInventory(Long productId,Integer number) {
        Product product = this.getOne(Wrappers.<Product>lambdaQuery()
                .select(Product::getId,Product::getInventory,Product::getFrozenInventory)
                .eq(Product::getId,productId));
        // 当前可用库存需要排除历史冻结库存
        if(product.getInventory() >= number){
            // 更新冻结库存
            product.setFrozenInventory(number);
            this.updateById(product);
            return "库存更新成功";
        }
        return "商品库存不足";

    }

    @Override
    public boolean reduceCommit(BusinessActionContext context) {
        Product product = this.getOne(Wrappers.<Product>lambdaQuery()
                .select(Product::getId,Product::getInventory,Product::getFrozenInventory)
                .eq(Product::getId,context.getActionContext("goodsId")));
        Integer number = Integer.parseInt(context.getActionContext("number").toString());
        if (product != null) {
            //扣减库存
            product.setInventory(product.getInventory() - number);
            product.setFrozenInventory(0);
            this.saveOrUpdate(product);
        }
        log.info("事务 xid="+context.getXid()+" 提交成功");
        return true;
    }

    @Override
    public boolean reduceRollBack(BusinessActionContext context) {
        Product product = this.getOne(Wrappers.<Product>lambdaQuery()
                .select(Product::getId,Product::getFrozenInventory)
                .eq(Product::getId,context.getActionContext("goodsId")));
        Integer number = Integer.parseInt(context.getActionContext("number").toString());
        if (product != null) {
            // 恢复冻结库存
            product.setFrozenInventory(0);
            this.saveOrUpdate(product);
        }
        log.info("事务 xid="+context.getXid()+" 回滚成功");
        return true;
    }
}
TCC模式的补偿措施
重试机制

我们上述的代码中在confirm,cancel方法中实现了事务的提交和回滚,但是因为是我们自己通过代码实现的,所以还需要考虑一个问题:执行失败后的重试机制。当我们的confirm或者cancel方法也出现报错时,为了保证事务的最终一致性,我们应当做好重试机制处理:比如将数据发送到MQ,然后再进行接收处理。

幂等性问题

所谓幂等就是操作一次和操作多次的执行效果是一样的。想象一下,我们的库存扣除操作,如果因为某一步操作报错,导致需要回滚重试,结果每次重试都会重复扣减库存,那这样肯定是不对的。所以为了保证我们在confirm,cancel中进行的重试机制不会使得我们的资源发生重复消耗,那么需要我们对方法做好幂等性处理:比如说通过添加状态字段来判断是否执行过

悬挂问题

所谓悬挂问题,就是二阶段模式中,cancel比try先执行。这是怎么导致的呢?

就拿我们上述的案例来假设,在订单服务中调用商品服务的扣减库存方法reduceInventory时,因为是通过RPC(feign)的方式来调用的,那么如果调用时刚好网络堵塞,或者商品服务出现问题,导致调用失败,出现报错,TM会通知TC出现错误,TC会通知所有的RM进行本地事务回滚,也就是执行cancel方法。当cancel方法执行完成后,try方法偏偏连通了,又执行了,那么就出现了问题,订单会被更新为未提交,但因为事务已经被cancel过了,就不会再执行confirm,也就没有谁再来将资源状态从预处理更新为已处理了。资源就会导致浪费。这时进行的回滚操作其实并没有真实回滚业务,这个现象我们称之为空回滚。所有我们需要针对悬挂问题进行防悬挂处理,方案呢就是限制如果二阶段执行完成,一阶段就不能再执行。比如执行cancel方法时会判断是否是空回滚,出现空回滚注册事务ID,try方法执行前先检查事务ID是否存在,如果存在则不允许执行当然这些处理呢,seata已经帮我们实现了,这也是使用现成的分布式事务框架的好处。省心!但是我们自己要知道这些问题和原理。

seata中的解决方案是增加一个事务记录表,在cancel阶段最后往事务记录表中插入一条记录(xid-status)标记cancel阶段已经执行过。此时try阶段进入时发现已经执行过回滚操作,则放弃try阶段的执行。

Seata Saga模式

Saga的定义是“长时间活动的事务”,是普林斯顿大学教授Hector & Kenneth发表的论文《sagas》中提出的概念。它的思想是允许分布式事务在全部提交前提前释放占用的某些资源。

完整内容可以参考原创博客,代码示例可参考seata-samples

Saga模式依然要求我们自己实现正向服务和补偿服务。但是它于TCC模式的区别之处在于:

  • Saga的模式设计使得它天然适合于长流程的业务。TCC要实现同样的长流程的话,需要多写一个confirm操作,并且要考虑如何将业务拆分为两部分
  • Saga模式在正向服务中时就已经提交了本地事务了,而补偿事务也比较好实现,将正向服务的结合逆向补偿即可。
  • 比如正常服务是update product set price=20 where price=30 and id=1; 那么补偿服务就是update product set price=30 where price=20 and id=1;
  • 比起TCC模式,Saga模式更适用于一些老服务、第三方服务或者其他无法改造的服务,要接入到我们的分布式事务中时,就可以将其作为一个正向服务存在,而直接实现他的补偿服务即可。而TCC因为要对业务进行拆分为try-confirm-cancel,所以它不适用于不可改造的服务

同时,Saga模式同样不需要全局锁,只需要结合本地事务加本地锁即可,所以性能依旧有保证。

SAGA模式的三种事务类型
可补偿性事务

所谓可补偿性事务,也就是可以使用、需要使用补偿事务来回滚数据的事务

比如说下订单,就需要删除订单的补偿事务,因此下订单就是一个可补偿性事务

关键性事务
关键性事务是Saga执行的关键点,如果关键性事务运行成功,则Saga将一直运行到完成。关键性事务不一定是个可补偿性事务或者可重复性事务,但是他可以是最后一个可补偿的事务或第一个可重复的事务——《微服务架构设计模式》

通过书中的描述,我们知道关键性事务的定义从结构上理解,是处于可补偿性事务和可重复性事务的中间。

具体把哪个事务定义为关键性事务,还要根据具体的业务情况而定,我们可以通过以下标准来判断

  • 从结构上是否处于可补偿事务和可重复事务之间
  • 从业务上该事务是否能表示整个业务执行成功的转折点
可重复性事务

关键性事务之后的事务就是可重复性事务,不需要回滚,并且保证能够执行完成。所以我们会通过一些机制来保证这类事务一定能执行成功,比如重试机制。

SAGA模式的补偿措施

与TCC模式类似,SAGA模式也涉及到如下几个问题

幂等性问题

所谓幂等就是操作一次和操作多次的执行效果是一样的。

想象一下,我们的库存扣除操作,如果因为某一步操作报错,导致需要回滚重试,结果每次重试都会重复扣减库存,那这样肯定是不对的。

所以为了保证我们在confirm,cancel中进行的重试机制不会使得我们的资源发生重复消耗,那么需要我们对方法做好幂等性处理:

比如说通过添加状态字段来判断是否执行过。当然这一点在seata等分布式框架中不用我们再手动实现,框架已经帮我们实现了。

悬挂问题

所谓悬挂问题,就是二阶段模式中,二阶段比一阶段先执行

这是怎么导致的呢?

我们拿下订单扣减库存的案例来说,在订单服务中调用商品服务的扣减库存方法reduceInventory时,通常通过RPC(feign)的方式来调用,那么如果调用时刚好网络堵塞,或者商品服务出现问题,导致调用失败,出现报错,TM会通知TC出现错误,TC会通知所有的RM进行本地事务回滚,也就是执行补偿事务。

当补偿事务方法执行完成后,正向事务方法偏偏连通了,又执行了,那么就出现了问题,这个正向服务之前的补偿事务都执行了,但又执行了一个多余的正向服务。

所有我们需要针对悬挂问题进行防悬挂处理,方案呢就是限制如果二阶段执行完成,一阶段就不能再执行。

seata中的解决方案是增加一个事务记录表,在补偿服务执行后往事务记录表中插入一条记录(xid-status)标记补偿服务已经执行过。此时正向服务进入时发现已经执行过回滚操作,则放弃正向服务的执行。

SAGA模式应用场景
  • 适用于长事务业务场景
  • 适用于需要接入老服务、第三方服务或者其他无法改造的服务的业务场景
  • 需要操作更细分散在多个服务、系统中的数据的业务场景
Seata XA模式

使用较少,具体参考https://blog.51cto.com/u_15952602/6034750?articleABtest=0

依赖引入

<dependencies>
    <dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>

    <dependency>
        <groupId>dev.macula.boot</groupId>
        <artifactId>macula-boot-starter-feign</artifactId>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>dev.macula.boot</groupId>
        <artifactId>macula-boot-starter-web</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

服务端介绍

请参考官网

版权说明

  • seata:https://github.com/seata/seata/blob/2.x/LICENSE