如何用C#实现SAGA分布式事务

目录
  • 背景
  • 成功的 SAGA
  • 异常的 SAGA
  • 子事务屏障
  • 写在最后

背景

银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID ,只能够通过分布式事务来解决。

市面上使用比较多的分布式事务框架,支持 SAGA 的,大部分都是 JAVA 为主的,没有提供 C# 的对接方式,或者是对接难度大,一定程度上让人望而却步。

下面就基于这个框架来实践一下银行转账的例子。

前置工作

dotnet add package Dtmcli --version 0.3.0

成功的 SAGA

先来看一下一个成功完成的 SAGA 时序图。

上图的微服务1,对应我们示例的 OutApi,也就是转钱出去的那个服务。

微服务2,对应我们示例的 InApi,也就是转钱进来的那个服务。

下面是两个服务的正向操作和补偿操作的处理。

OutApi

app.MapPost("/api/TransOut", (string branch_id, string gid, string op, TransRequest req) =>
{
    // 进行 数据库操作
    Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransOutCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
    // 进行 数据库操作
    Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

InApi

app.MapPost("/api/TransIn", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransInCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

注:示例为了简单,没有进行实际的数据库操作。

到此各个子事务的处理已经 OK 了,然后是开启 SAGA 事务,进行分支调用

var userOutReq = new TransRequest() { UserId = "1", Amount = -30 };
var userInReq = new TransRequest() { UserId = "2", Amount = 30 };

var ct = new CancellationToken();
var gid = await dtmClient.GenGid(ct);
var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq)
    ;

var flag = await saga.Submit(ct);

Console.WriteLine($"case1, {gid} saga 提交结果 = {flag}");

到这里,一个完整的 SAGA 分布式事务就编写完成了。

搭建好 dtm 的环境后,运行上面的例子,会看到下面的输出。

当然,上面的情况太理想了,转出转入都是一次性就成功了。

但是实际上我们会遇到许许多多的问题,最常见的应该就是网络故障了。

下面来看一个异常的 SAGA 示例

异常的 SAGA

做一个假设,用户1的转出是正常的,但是用户2在转入的时候出现了问题。

由于事务已经提交给 dtm 了,按照 SAGA 事务的协议,dtm 会重试未完成的操作。

这个时候用户2 这边会出现什么样的情况呢?

转入其实成功了,但是 dtm 收到错误 (网络故障等)转入没有成功,直接告诉 dtm 失败了 (应用异常等)

无论是那一种,dtm 都会进行重试操作。这个时候会发生什么呢?我们继续往下看。

先看一下事务失败交互的时序图

再通过调整上面成功的例子,来比较直观的看看出现的情况。

在 InApi 加多一个转入失败的处理接口

app.MapPost("/api/TransInError", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作--失败,gid={gid}, branch_id={branch_id}, op={op}");

    //return Results.BadRequest();
    return Results.Ok(TransResponse.BuildFailureResponse());
});

失败的返回有两种,一种是状态码大于 400,一种是状态码是 200 并且响应体包含 FAILURE,上面的例子是第二种

调整一下调用方,把转入正向操作替换成上面这个返回错误的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq);

运行结果如下:

在这个例子中,只考虑补偿/重试成功的情况下。

用户1 转出的 30 块钱最终是回到了他的帐号上,他没有出现损失。

用户2 就有点苦逼了,转入没有成功,返回了失败,还触发了转入的补偿机制,结果就是把用户2 还没进帐的 30 块钱给多扣了,这个就是上面的情况2,常见的空补偿问题。

这个时候就要在进行转入补偿的时候做一系列的判断,转入有没有成功,转出有没有失败等等,把业务变的十分复杂。

如果出现了上述的情况1,会发生什么呢?

用户2 第一次已经成功转入 30 块钱,返回的也是成功,但是网络出了点问题,导致 dtm 认为失败了,它就会进行重试,相当于用户2 还会收到第二个转入 30 块钱的请求!也就是说这次转帐,用户2 会进账 60 块钱,翻倍了,也就是说这个请求不是幂等。

同样的,要处理这个问题,在进行转入的正向操作中也要进行一系列的判断,同样会把复杂度上升一个级别。

前面有提到 dtm 提供了子事务屏障的功能,保证了幂等、空补偿等常见问题。

再来看看这个子事务屏障的功能有没有帮我们简化上面异常处理。

子事务屏障

子事务屏障,需要根据 trans_type,gid,branch_id 和 op 四个内容进行创建。

这4个内容 dtm 在回调时会放在 querysting 上面。

客户端里面提供了 IBranchBarrierFactory 来供我们使用。

空补偿

针对上面的异常情况(用户2 凭空消失 30 块钱),对转入的补偿进行子事务屏障的改造。

app.MapPost("/api/BarrierTransInCompensate", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        // 转入失败的情况下,不应该输出下面这个
        Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");
        // tx 参数是事务,可和本地事务一起提交回滚
        await Task.CompletedTask;
    });

    Console.WriteLine($"子事务屏障-补偿操作,gid={gid}, branch_id={branch_id}, op={op}");
    return Results.Ok(TransResponse.BuildSucceedResponse());
});

Call 方法就是关键所在了,需要传入一个 DbConnection 和真正的业务操作,这里的业务操作就是在控制台输出补偿操作的信息。

同样的,我们再调整一下调用方,把转入补偿操作替换成上面带子事务屏障的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再来运行这个例子。

会发现转入的补偿操作并没执行,控制台没有输出补偿信息,而是输出了

Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False

这个就表明了,这个请求是个空补偿,是不应该执行业务方法的,既空操作。

再来看一下,转入成功的,但是 dtm 收到了失败的信号,不断重试造成重复请求的情况。

幂等

针对用户2 转入两次 30 块钱的异常情况,对转入的正向操作进行子事务屏障的改造。

app.MapPost("/api/BarrierTransIn", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】请求来了!!! gid={gid}, branch_id={branch_id}, op={op}");

    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        var c = Interlocked.Increment(ref _errCount);

        // 模拟一个超时执行
        if (c > 0 && c < 2) await Task.Delay(10000);

        Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

这里通过一个超时执行来让 dtm 进行转入正向操作的重试。

同样的,我们再调整一下调用方,把转入的正向操作也替换成上面带子事务屏障的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/BarrierTransIn", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再来运行这个例子。

可以看到转入的正向操作确实是触发了多次,第一次实际上是成功,只是响应比较慢,导致 dtm 认为是失败了,触发了第二次请求,但是第二次请求并没有执行业务操作,而是输出了

Will not exec busiCall, isNullCompensation=False, isDuplicateOrPend=True

这个就表明了,这个请求是个重复请求,是不应该执行业务方法的,保证了幂等。

到这里,可以看出,子事务屏障确实解决了幂等和空补偿的问题,大大降低了业务判断的复杂度和出错的可能性。

写在最后

在这篇文章里,也通过几个例子,完整给出了编写一个 SAGA 事务的过程,涵盖了正常成功完成,异常情况,以及成功回滚的情况。希望对研究分布式事务的您有所帮助。

本文示例代码: DtmSagaSample

到此这篇关于如何用C#实现SAGA分布式事务的文章就介绍到这了,更多相关C#实现SAGA内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 利用C#实现分布式数据库查询

    随着传统的数据库.计算机网络和数字通信技术的飞速发展,以数据分布存储和分布处理为主要特征的分布式数据库系统的研究和开发越来越受到人们的关注.但由于其开发较为复杂,在一定程度上制约了它的发展.基于此,本文提出了在.Net环境下使用一种新的开发语言C#结合ADO.Net数据访问模型来开发分布式数据库系统,大大简化了开发过程. 1 分布式数据库系统 就其本质而言,分布式数据库系统的数据在逻辑上是统一的,而在物理上却是分散的.与集中式数据库相比它有如下主要优点: · 解决组织机构分散而数据需要相互联系的

  • C#分布式事务的超时处理实例分析

    本文实例讲述了C#分布式事务的超时处理的方法.分享给大家供大家参考.具体分析如下: 事务是个很精妙的存在,我们在数据层.服务层.业务逻辑层等多处地方都会使用到. 在这里我只说下TransactionScope这个微软推荐使用的隐式事务.它是从Framework 2.0开始引入的一个事务管理类,在使用隐式事务时,事务完成前 程序应调用TransactionScope的Complete()方法,将事务提交,然后利用Dispose()释放事务对象.若执行期间出现错误,事务将自动回滚. 比如: usin

  • 如何用C#实现SAGA分布式事务

    目录 背景 成功的 SAGA 异常的 SAGA 子事务屏障 写在最后 背景 银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID ,只能够通过分布式事务来解决. 市面上使用比较多的分布式事务框架,支持 SAGA 的,大部分都是 JAVA 为主的,没有提供 C# 的对接方式,或者是对接难度大,一定程度上让人望而却步. 下面就基于这个框架来实践一下银行转账的例子. 前置工作 dotnet add packa

  • 带你用Python实现Saga 分布式事务的方法

    目录 分布式事务 SAGA SAGA实践 处理网络异常 处理回滚 小结 银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID,只能够通过分布式事务来解决. 分布式事务 分布式事务在分布式环境下,为了满足可用性.性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 BASE 理论: 基本业务可用性( Basic Availability ) 柔性状态( Soft state ) 最终一致性( Eve

  • 讲解如何利用 Python完成 Saga 分布式事务

    目录 1.分布式事务 2.SAGA 3.SAGA 实践 4.处理网络异常 5.处理回滚 6.小结 银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID,只能够通过分布式事务来解决. 1.分布式事务 分布式事务在分布式环境下,为了满足可用性.性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 BASE 理论: 基本业务可用性( Basic Availability ) 柔性状态( Soft sta

  • 关于MySQL与Golan分布式事务经典的七种解决方案

    目录 1.基础理论 1.1 事务 1.2 分布式事务 2.分布式事务的解决方案 2.1 两阶段提交/XA 2.2 SAGA 2.3 TCC 2.4 本地消息表 2.5 事务消息 2.6 最大努力通知 2.7 AT事务模式 3.异常处理 3.1 异常情况 3.2 子事务屏障 3.3 子事务屏障原理 3.4 子事务屏障小结 4.分布式事务实践 4.1 一个SAGA事务 4.2 处理网络异常 4.3 处理回滚 5.总结 前言: 随着业务的快速发展.业务复杂度越来越高,几乎每个公司的系统都会从单体走向分

  • 详解SpringCloud-Alibaba-Seata分布式事务

    前言 Seata 是一款阿里巴巴开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务. Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务.Seata 将为用户提供了 AT.TCC.SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案. 术语 TC (Transaction Coordinator) - 事务协调者 维护全局和分支事务的状态,驱动全局事务提交或回滚. TM (Transaction Manager) -

  • 用python完成一个分布式事务TCC

    前言: 什么是分布式事务?银行跨行转账业务是一个典型分布式事务场景,假设A需要跨行转账给B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的ACID,只能够通过分布式事务来解决. 分布式事务就是指事务的发起者.资源及资源管理器和事务协调者分别位于分布式系统的不同节点之上.在上述转账的业务中,用户A-100操作和用户B+100操作不是位于同一个节点上.本质上来说,分布式事务就是为了保证在分布式场景下,数据操作的正确执行. 什么是TCC分布式事务,TCC是Try.Confirm.Ca

  • 一文搞明白Java Spring Boot分布式事务解决方案

    目录 前言 1. 什么是反向补偿 2. 基本概念梳理 3. 什么是两阶段提交 4. AT 模式 5. TCC 模式 6. XA 模式 7. Saga 模式 前言 分布式事务,咱们前边也聊过很多次了,网上其实也有不少文章在介绍分布式事务,不过里边都会涉及到不少专业名词,看的大家云里雾里,所以还是有一些小伙伴在微信上问我. 那么今天,我就再来一篇文章,和大家捋一捋这个话题.以下的内容主要围绕阿里的 seata 来和大家解释. 1. 什么是反向补偿 首先,来和大家解释一个名词,大家在看分布式事务相关资

  • Java分布式事务管理框架之Seata

    目录 Seata介绍 三大组件 实现原理 四种事务模式 搭建seata服务端 单机版安装 集群安装 Seata介绍 Seata:Simple Extensible Autonomous Transaction Architecture,简易可扩展的自治式分布式事务管理框架,其前身是fescar.是一种简单分布式事务的解决方案.Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务.Seata 将为用户提供了 AT.TCC.SAGA 和 XA 事务模式,为用户打造一

  • Java中JDBC事务与JTA分布式事务总结与区别

    Java事务的类型有三种:JDBC事务.JTA(Java Transaction API)事务.容器事务.常见的容器事务如Spring事务,容器事务主要是J2EE应用服务器提供的,容器事务大多是基于JTA完成,这是一个基于JNDI的,相当复杂的API实现.所以本文暂不讨论容器事务.本文主要介绍J2EE开发中两个比较基本的事务:JDBC事务和JTA事务. JDBC事务 JDBC的一切行为包括事务是基于一个Connection的,在JDBC中是通过Connection对象进行事务管理.在JDBC中,

  • EntityFramework 6.x学习之多个上下文迁移实现分布式事务详解

    前言 自从项目上了.NET Core平台用上了EntityFramework Core就再没碰过EntityFramework 6.x版本,目前而言EntityFramework 6.x是用的最多,无论是找工作而言还是提升自身技术而言皆自身收益,同时呢,大多数时间除了工作之外,还留有一小部分时间在写EntityFramework 6.x和EntityFramework Core的书籍,所以将EntityFramework 6.x相当于是从零学起,EntityFramework 6.x又添加了许多

随机推荐