微服务事务在K8s部署中如何实现,详细讲解?

2026-05-27 15:131阅读0评论SEO资讯
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计3225个文字,预计阅读时间需要13分钟。

微服务事务在K8s部署中如何实现,详细讲解?

我们通过一系列教程,全面讲解从需求分析到上线、从代码到K8s部署、从日志到监控等微服务各方面的完整实践。整个项目采用go-zero框架进行开发,基本包涵了go-zero及其相关开发者开发的组件。

我们用一个系列来讲解从需求到上线、从代码到k8s部署、从日志到监控等各个方面的微服务完整实践。

整个项目使用了go-zero开发的微服务,基本包含了go-zero以及相关go-zero作者开发的一些中间件,所用到的技术栈基本是go-zero项目组的自研组件,基本是go-zero全家桶了。

实战项目地址:github.com/Mikaelemmmm/go-zero-looklook

关于分布式事务

因为本项目服务划分相对独立一些,所以目前没有使用到分布式事务,不过go-zero结合dtm使用分布式事务的最佳实践,我有整理demo,这里就介绍一下go-zero结合dtm的使用,项目地址go-zero结合dtm最佳实践仓库地址 : github.com/Mikaelemmmm/gozerodtm

下面说的不是go-zero-looklook项目,是这个项目 github.com/Mikaelemmmm/gozerodtm

一、注意事项
  • go-zero 1.2.4版本以上,这个一定要注意

  • dtm 你用最新的就行了

二、clone dtm

git clone github.com/yedf/dtm.git 三、配置文件

1、找到项目跟文件夹下的conf.sample.yml

微服务事务在K8s部署中如何实现,详细讲解?

2、cp conf.sample.yml conf.yml

3、使用etcd , 把配置中下面这段注释打开 (如果没用etcd就更简单了 ,这个都省了,直接链接到dtm server地址就可以了)

MicroService: Driver: 'dtm-driver-gozero' # name of the driver to handle register/discover Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url EndPoint: 'localhost:36790'

解释一下:

MicroService 这个不要动,这个代表要对把dtm注册到那个微服务服务集群里面去,使微服务集群内部服务可以通过grpc直接跟dtm交互

Driver :'dtm-driver-gozero' , 使用go-zero的注册服务发现驱动,支持go-zero

Target: 'etcd://localhost:2379/dtmservice' 将当前dtm的server直接注册到微服务所在的etcd集群中,如果go-zero作为微服务使用的话,就可以直接通过etcd拿到dtm的server grpc链接,直接就可以跟dtm server交互了

EndPoint: 'localhost:36790' , 代表的是dtm的server的连接地址+端口 , 集群中的微服务可以直接通过etcd获得此地址跟dtm交互了,

如果你自己去改了dtm源码grpc端口,记得这里要改下端口

四、启动dtm server

在dtm项目根目录下

go run app/main.go dev 五、使用go-zero的grpc对接dtm

这是一个快速下单扣商品库存的例子

1、order-api

order-api是zhuanlan.zhihu.com/p/388444465 // tx: 本地数据库的事务对象,允许子事务屏障进行事务操作 // busiCall: 业务函数,仅在必要时被调用 func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) { bb.BarrierID = bb.BarrierID + 1 bid := fmt.Sprintf("%02d", bb.BarrierID) defer func() { // Logf("barrier call error is %v", rerr) if x := recover(); x != nil { tx.Rollback() panic(x) } else if rerr != nil { tx.Rollback() } else { tx.Commit() } }() ti := bb originType := map[string]string{ BranchCancel: BranchTry, BranchCompensate: BranchAction, }[ti.Op] originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return } rerr = busiCall(tx) return }

核心其实就是如下几行代码

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return } rerr = busiCall(tx)

func insertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) { if op == "" { return 0, nil } sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier") return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason) }

每一个业务逻辑,dtm server在正常成功请求时候, ti.Op 默认正常执行的操作是action,所以正常第一次请求都是ti.Op值都是action,那originType就是“”

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)

那么上面这个sql就不会执行因为ti.Op == "" 在insertBarrier中直接return了

currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)

那第二个sql的ti.Op是 action, 所以子事务屏障表barrier就会插入一条数据

同理在执行库存也会插入一条

1、整个事务都成功的子事务屏障

那在一次下订单正常成功请求下,由于 ti.Op都是action,所以originType都是"" , 所以不管是下单的barrier 还是扣库存的barrier,在执行他们2次barrier insert时候,originAffected都会忽略,因为originType==“” 会直接被return不插入数据,这样看来 不管是下单还是扣库存,barrier的第二条插入数据生效,所以barrier数据表中就会有2条下单数据,一条是订单的一条是扣库存的

gid : dtm全局事务id

branch_id : 每个全局事务id下的每个业务id

op : 操作,如果是正常成功请求就是action

barrier_id : 同一个业务下开多个会递增

这4个字段在表中是联合唯一索引,在insertBarrier时候,dtm判断如果存在就忽略不插入

2、如果订单成功库存不足回滚子事务屏障

我们库存只有10个 ,我们下单20个

1)当订单下成功,因为订单下单时候并不知道后续库存情况(即使在下单时候先去查库存那也会有查询时候足够,扣除时候不足情况),

所以下单成功barrier表中按照之前梳理的逻辑就会在barrier表中产生一条正确数据执行数据

2)接着执行扣库存操作

func (l *DeductLogic) Deduct(in *pb.DecuctReq) (*pb.DeductResp, error) { fmt.Printf("扣库存start....") stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId) if err != nil && err != model.ErrNotFound { // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return nil, status.Error(codes.Internal, err.Error()) } if stock == nil || stock.Num < in.Num { //库存不足确定需要dtm直接回滚,直接返回 codes.Aborted, dtmcli.ResultFailure 才可以回滚 return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) } ....... }

在执行扣库存业务逻辑之前,由于我们查询库存发现库存不足,所以直接return codes.Aborted 了,不会走到子事务屏障barrier这里,所以barrier表中不会插入数据,而是告诉dtm要回滚

3)调用order回滚操作

订单回滚的时候会开启barrier,这时候又会执行barrier代码(如下),由于回滚代码的ti.Op是compensate ,orginType就是action

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return } rerr = busiCall(tx)

由于我们之前下订单成功了,barrier表里有一条下单成功时候的记录action,所以originAffected==0 ,所以只会插入一条当前回滚记录继续调用 busiCall(tx) 执行后续我们自己写的回滚操作

到此,我们应该只有两条数据,一条订单成功创建记录,一条订单回滚记录

4)库存回滚DeductRollback

订单回滚成功后,会再继续调用库存回滚DeductRollback,库存回滚代码如下

这就是子事务屏障自动帮我们判断的,也就是那两条核心插入语句帮我们判断的,以至于我们业务不会出现脏数据

库存这里回滚分两种情况

  • 没扣成功回滚

  • 扣成功回滚

没扣成功回滚(我们当前举例场景是这个 )

首先调用库存回滚时候ti.Op是compensate ,orginType就是action ,会执行下面2条insert

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return}rerr = busiCall(tx) }

这里结合判断如果是回滚、取消操作,originAffected > 0 当前插入成功了,之前对应的正向扣库存操作没有插入成功,说明之前库存没扣成功,直接return就不需要执行后续补偿了。所以此时会在barrier表中插入2条数据直接return,就不会执行我们后续补偿操作了

到此我们barrier表中有4条数据了

扣成功回滚(这个情况自己可以尝试模拟此场景)

如果我们上一步扣库存成功,在执行此补偿的时候ti.Op是compensate ,orginType就是action ,继续执行2个insert语句

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return}rerr = busiCall(tx) }

这里结合判断如果是回滚、取消操作,originAffected == 0 当前插入忽略了没插入进去,说明之前正向扣库存插入成功了,这里只插入第二个sql语句记录即可,然后在执行后续我们补偿的业务操作。

所以,整体分析下来核心语句就是2条insert,它帮我们解决了重复回滚数据、数据幂等情况,只能说dtm作者想法真的很好,用了最少的代码帮我们解决了一个很麻烦的问题

七、go-zero对接中注意事项 1、dtm的回滚补偿

在使用dtm的grpc时候,当我们使用saga、tcc等如果第一步尝试或者执行失败了,是希望它能执行后面的rollback的,在grpc中的服务如果发生错误了,必须返回 : status.Error(codes.Aborted, dtmcli.ResultFailure) , 返回其他错误,不会执行你的rollback操作,dtm会一直重试,如下:

stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId) if err != nil && err != model.ErrNotFound { // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return nil, status.Error(codes.Internal, err.Error()) } if stock == nil || stock.Num < in.Num { //库存不足确定需要dtm直接回滚,直接返回 codes.Aborted, dtmcli.ResultFailure 才可以回滚 return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) } 2、barrier的空补偿、悬挂等

之前准备工作中,我们创建了dtm_barrier库以及执行了barrier.mysql.sql,这个其实就是为我们的业务服务做了一个检查,防止空补偿,具体可以看barrier.Call中源码,没几行代码可以看懂的。

如果我们线上使用的话,你的每个与db交互的服务只要用到了barrier,这个服务使用到的mysql账号,要给他分配barrier库的权限,这个不要忘记了

3、barrier在rpc中本地事务

在rpc的业务中,如果使用了barrier的话,那么在model中与db交互时候必须要用事务,并且一定要跟barrier用同一个事务

logic

barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB() if err != nil { // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return nil, status.Error(codes.Internal, err.Error()) } if err := barrier.CallWithDB(db, func(tx *sql.Tx) error { sqlResult,err := l.svcCtx.StockModel.DecuctStock(tx, in.GoodsId, in.Num) if err != nil{ // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return status.Error(codes.Internal, err.Error()) } affected, err := sqlResult.RowsAffected() if err != nil{ // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return status.Error(codes.Internal, err.Error()) } // 如果是影响行数为0,直接就告诉dtm失败不需要重试了 if affected <= 0 { return status.Error(codes.Aborted, dtmcli.ResultFailure) } // !!开启测试!! : 测试订单回滚更改状态为失效,并且当前库扣失败不需要回滚 // return fmt.Errorf("扣库存失败 err : %v , in:%+v \n",err,in) return nil }); err != nil { // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return nil, err }

model

func (m *defaultStockModel) DecuctStock(tx *sql.Tx,goodsId , num int64) (sql.Result,error) { query := fmt.Sprintf("update %s set `num` = `num` - ? where `goods_id` = ? and num >= ?", m.table) return tx.Exec(query,num, goodsId,num) } func (m *defaultStockModel) AddStock(tx *sql.Tx,goodsId , num int64) error { query := fmt.Sprintf("update %s set `num` = `num` + ? where `goods_id` = ?", m.table) _, err :=tx.Exec(query, num, goodsId) return err } 七、使用go-zero的github.com/Mikaelemmmm/dtmbarrier-go-zero

项目地址

github.com/zeromicro/go-zero

欢迎使用 go-zerostar 支持我们!

微信交流群

关注『微服务实践』公众号并点击 交流群 获取社区群二维码。

本文共计3225个文字,预计阅读时间需要13分钟。

微服务事务在K8s部署中如何实现,详细讲解?

我们通过一系列教程,全面讲解从需求分析到上线、从代码到K8s部署、从日志到监控等微服务各方面的完整实践。整个项目采用go-zero框架进行开发,基本包涵了go-zero及其相关开发者开发的组件。

我们用一个系列来讲解从需求到上线、从代码到k8s部署、从日志到监控等各个方面的微服务完整实践。

整个项目使用了go-zero开发的微服务,基本包含了go-zero以及相关go-zero作者开发的一些中间件,所用到的技术栈基本是go-zero项目组的自研组件,基本是go-zero全家桶了。

实战项目地址:github.com/Mikaelemmmm/go-zero-looklook

关于分布式事务

因为本项目服务划分相对独立一些,所以目前没有使用到分布式事务,不过go-zero结合dtm使用分布式事务的最佳实践,我有整理demo,这里就介绍一下go-zero结合dtm的使用,项目地址go-zero结合dtm最佳实践仓库地址 : github.com/Mikaelemmmm/gozerodtm

下面说的不是go-zero-looklook项目,是这个项目 github.com/Mikaelemmmm/gozerodtm

一、注意事项
  • go-zero 1.2.4版本以上,这个一定要注意

  • dtm 你用最新的就行了

二、clone dtm

git clone github.com/yedf/dtm.git 三、配置文件

1、找到项目跟文件夹下的conf.sample.yml

微服务事务在K8s部署中如何实现,详细讲解?

2、cp conf.sample.yml conf.yml

3、使用etcd , 把配置中下面这段注释打开 (如果没用etcd就更简单了 ,这个都省了,直接链接到dtm server地址就可以了)

MicroService: Driver: 'dtm-driver-gozero' # name of the driver to handle register/discover Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url EndPoint: 'localhost:36790'

解释一下:

MicroService 这个不要动,这个代表要对把dtm注册到那个微服务服务集群里面去,使微服务集群内部服务可以通过grpc直接跟dtm交互

Driver :'dtm-driver-gozero' , 使用go-zero的注册服务发现驱动,支持go-zero

Target: 'etcd://localhost:2379/dtmservice' 将当前dtm的server直接注册到微服务所在的etcd集群中,如果go-zero作为微服务使用的话,就可以直接通过etcd拿到dtm的server grpc链接,直接就可以跟dtm server交互了

EndPoint: 'localhost:36790' , 代表的是dtm的server的连接地址+端口 , 集群中的微服务可以直接通过etcd获得此地址跟dtm交互了,

如果你自己去改了dtm源码grpc端口,记得这里要改下端口

四、启动dtm server

在dtm项目根目录下

go run app/main.go dev 五、使用go-zero的grpc对接dtm

这是一个快速下单扣商品库存的例子

1、order-api

order-api是zhuanlan.zhihu.com/p/388444465 // tx: 本地数据库的事务对象,允许子事务屏障进行事务操作 // busiCall: 业务函数,仅在必要时被调用 func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) { bb.BarrierID = bb.BarrierID + 1 bid := fmt.Sprintf("%02d", bb.BarrierID) defer func() { // Logf("barrier call error is %v", rerr) if x := recover(); x != nil { tx.Rollback() panic(x) } else if rerr != nil { tx.Rollback() } else { tx.Commit() } }() ti := bb originType := map[string]string{ BranchCancel: BranchTry, BranchCompensate: BranchAction, }[ti.Op] originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return } rerr = busiCall(tx) return }

核心其实就是如下几行代码

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return } rerr = busiCall(tx)

func insertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) { if op == "" { return 0, nil } sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier") return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason) }

每一个业务逻辑,dtm server在正常成功请求时候, ti.Op 默认正常执行的操作是action,所以正常第一次请求都是ti.Op值都是action,那originType就是“”

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)

那么上面这个sql就不会执行因为ti.Op == "" 在insertBarrier中直接return了

currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)

那第二个sql的ti.Op是 action, 所以子事务屏障表barrier就会插入一条数据

同理在执行库存也会插入一条

1、整个事务都成功的子事务屏障

那在一次下订单正常成功请求下,由于 ti.Op都是action,所以originType都是"" , 所以不管是下单的barrier 还是扣库存的barrier,在执行他们2次barrier insert时候,originAffected都会忽略,因为originType==“” 会直接被return不插入数据,这样看来 不管是下单还是扣库存,barrier的第二条插入数据生效,所以barrier数据表中就会有2条下单数据,一条是订单的一条是扣库存的

gid : dtm全局事务id

branch_id : 每个全局事务id下的每个业务id

op : 操作,如果是正常成功请求就是action

barrier_id : 同一个业务下开多个会递增

这4个字段在表中是联合唯一索引,在insertBarrier时候,dtm判断如果存在就忽略不插入

2、如果订单成功库存不足回滚子事务屏障

我们库存只有10个 ,我们下单20个

1)当订单下成功,因为订单下单时候并不知道后续库存情况(即使在下单时候先去查库存那也会有查询时候足够,扣除时候不足情况),

所以下单成功barrier表中按照之前梳理的逻辑就会在barrier表中产生一条正确数据执行数据

2)接着执行扣库存操作

func (l *DeductLogic) Deduct(in *pb.DecuctReq) (*pb.DeductResp, error) { fmt.Printf("扣库存start....") stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId) if err != nil && err != model.ErrNotFound { // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return nil, status.Error(codes.Internal, err.Error()) } if stock == nil || stock.Num < in.Num { //库存不足确定需要dtm直接回滚,直接返回 codes.Aborted, dtmcli.ResultFailure 才可以回滚 return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) } ....... }

在执行扣库存业务逻辑之前,由于我们查询库存发现库存不足,所以直接return codes.Aborted 了,不会走到子事务屏障barrier这里,所以barrier表中不会插入数据,而是告诉dtm要回滚

3)调用order回滚操作

订单回滚的时候会开启barrier,这时候又会执行barrier代码(如下),由于回滚代码的ti.Op是compensate ,orginType就是action

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return } rerr = busiCall(tx)

由于我们之前下订单成功了,barrier表里有一条下单成功时候的记录action,所以originAffected==0 ,所以只会插入一条当前回滚记录继续调用 busiCall(tx) 执行后续我们自己写的回滚操作

到此,我们应该只有两条数据,一条订单成功创建记录,一条订单回滚记录

4)库存回滚DeductRollback

订单回滚成功后,会再继续调用库存回滚DeductRollback,库存回滚代码如下

这就是子事务屏障自动帮我们判断的,也就是那两条核心插入语句帮我们判断的,以至于我们业务不会出现脏数据

库存这里回滚分两种情况

  • 没扣成功回滚

  • 扣成功回滚

没扣成功回滚(我们当前举例场景是这个 )

首先调用库存回滚时候ti.Op是compensate ,orginType就是action ,会执行下面2条insert

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return}rerr = busiCall(tx) }

这里结合判断如果是回滚、取消操作,originAffected > 0 当前插入成功了,之前对应的正向扣库存操作没有插入成功,说明之前库存没扣成功,直接return就不需要执行后续补偿了。所以此时会在barrier表中插入2条数据直接return,就不会执行我们后续补偿操作了

到此我们barrier表中有4条数据了

扣成功回滚(这个情况自己可以尝试模拟此场景)

如果我们上一步扣库存成功,在执行此补偿的时候ti.Op是compensate ,orginType就是action ,继续执行2个insert语句

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return}rerr = busiCall(tx) }

这里结合判断如果是回滚、取消操作,originAffected == 0 当前插入忽略了没插入进去,说明之前正向扣库存插入成功了,这里只插入第二个sql语句记录即可,然后在执行后续我们补偿的业务操作。

所以,整体分析下来核心语句就是2条insert,它帮我们解决了重复回滚数据、数据幂等情况,只能说dtm作者想法真的很好,用了最少的代码帮我们解决了一个很麻烦的问题

七、go-zero对接中注意事项 1、dtm的回滚补偿

在使用dtm的grpc时候,当我们使用saga、tcc等如果第一步尝试或者执行失败了,是希望它能执行后面的rollback的,在grpc中的服务如果发生错误了,必须返回 : status.Error(codes.Aborted, dtmcli.ResultFailure) , 返回其他错误,不会执行你的rollback操作,dtm会一直重试,如下:

stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId) if err != nil && err != model.ErrNotFound { // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return nil, status.Error(codes.Internal, err.Error()) } if stock == nil || stock.Num < in.Num { //库存不足确定需要dtm直接回滚,直接返回 codes.Aborted, dtmcli.ResultFailure 才可以回滚 return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) } 2、barrier的空补偿、悬挂等

之前准备工作中,我们创建了dtm_barrier库以及执行了barrier.mysql.sql,这个其实就是为我们的业务服务做了一个检查,防止空补偿,具体可以看barrier.Call中源码,没几行代码可以看懂的。

如果我们线上使用的话,你的每个与db交互的服务只要用到了barrier,这个服务使用到的mysql账号,要给他分配barrier库的权限,这个不要忘记了

3、barrier在rpc中本地事务

在rpc的业务中,如果使用了barrier的话,那么在model中与db交互时候必须要用事务,并且一定要跟barrier用同一个事务

logic

barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB() if err != nil { // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return nil, status.Error(codes.Internal, err.Error()) } if err := barrier.CallWithDB(db, func(tx *sql.Tx) error { sqlResult,err := l.svcCtx.StockModel.DecuctStock(tx, in.GoodsId, in.Num) if err != nil{ // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return status.Error(codes.Internal, err.Error()) } affected, err := sqlResult.RowsAffected() if err != nil{ // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return status.Error(codes.Internal, err.Error()) } // 如果是影响行数为0,直接就告诉dtm失败不需要重试了 if affected <= 0 { return status.Error(codes.Aborted, dtmcli.ResultFailure) } // !!开启测试!! : 测试订单回滚更改状态为失效,并且当前库扣失败不需要回滚 // return fmt.Errorf("扣库存失败 err : %v , in:%+v \n",err,in) return nil }); err != nil { // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!! return nil, err }

model

func (m *defaultStockModel) DecuctStock(tx *sql.Tx,goodsId , num int64) (sql.Result,error) { query := fmt.Sprintf("update %s set `num` = `num` - ? where `goods_id` = ? and num >= ?", m.table) return tx.Exec(query,num, goodsId,num) } func (m *defaultStockModel) AddStock(tx *sql.Tx,goodsId , num int64) error { query := fmt.Sprintf("update %s set `num` = `num` + ? where `goods_id` = ?", m.table) _, err :=tx.Exec(query, num, goodsId) return err } 七、使用go-zero的github.com/Mikaelemmmm/dtmbarrier-go-zero

项目地址

github.com/zeromicro/go-zero

欢迎使用 go-zerostar 支持我们!

微信交流群

关注『微服务实践』公众号并点击 交流群 获取社区群二维码。