讲解如何利用 Python完成 Saga 分布式事务

目录
  • 1、分布式事务
  • 2、SAGA
  • 3、SAGA 实践
  • 4、处理网络异常
  • 5、处理回滚
  • 6、小结

银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID,只能够通过分布式事务来解决。

1、分布式事务

分布式事务在分布式环境下,为了满足可用性、性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 BASE 理论:

  • 基本业务可用性( Basic Availability )
  • 柔性状态( Soft state )
  • 最终一致性( Eventual consistency )

另一方面,分布式事务也部分遵循 ACID 规范:

  • 原子性:严格遵循
  • 一致性:事务完成后的一致性严格遵循;事务中的一致性可适当放宽
  • 隔离性:并行事务间不可影响;事务中间结果可见性允许安全放宽
  • 持久性:严格遵循

2、SAGA

Saga 是这一篇数据库论文SAGAS提到的一个分布式事务方案。其核心思想是将长事务拆分为多个本地短事务,由 Saga 事务协调器协调,如果各个本地事务成功完成那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

目前可用于 SAGA 的开源框架,主要为 Java 语言,其中以 seata 为代表。我们的例子采用 go 语言,使用的分布式事务框架为https://github.com/yedf/dtm,它对分布式事务的支持非常优雅。下面来详细讲解 SAGA 的组成:

DTM 事务框架里,有 3 个角色,与经典的 XA 分布式事务一样:

  • AP/应用程序,发起全局事务,定义全局事务包含哪些事务分支
  • RM/资源管理器,负责分支事务各项资源的管理
  • TM/事务管理器,负责协调全局事务的正确执行,包括 SAGA 正向 /逆向操作的执行

下面看一个成功完成的 SAGA 时序图,就很容易理解 SAGA 分布式事务:

3、SAGA 实践

对于我们要进行的银行转账的例子,我们将在正向操作中,进行转入转出,在补偿操作中,做相反的调整。

首先我们创建账户余额表:

CREATE TABLE dtm_busi.`user_account` (
  `id` int(11) AUTO_INCREMENT PRIMARY KEY,
  `user_id` int(11) not NULL UNIQUE ,
  `balance` decimal(10,2) NOT NULL DEFAULT '0.00',
  `create_time` datetime DEFAULT now(),
  `update_time` datetime DEFAULT now()
);

我们先编写核心业务代码,调整用户的账户余额

def saga_adjust_balance(cursor, uid, amount):
  affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d and balance >= -%d" %(amount, uid, amount))
  if affected == 0:
    raise Exception("update error, balance not enough")

下面我们来编写具体的正向操作 /补偿操作的处理函数

@app.post("/api/TransOutSaga")
def trans_out_saga():
  saga_adjust_balance(c, out_uid, -30)
  return {"dtm_result": "SUCCESS"} 

@app.post("/api/TransOutCompensate")
def trans_out_compensate():
  saga_adjust_balance(c, out_uid, 30)
  return {"dtm_result": "SUCCESS"} 

@app.post("/api/TransInSaga")
def trans_in_saga():
  saga_adjust_balance(c, in_uid, 30)
  return {"dtm_result": "SUCCESS"} 

@app.post("/api/TransInCompensate")
def trans_in_compensate():
  saga_adjust_balance(c, in_uid, -30)
  return {"dtm_result": "SUCCESS"}

到此各个子事务的处理函数已经 OK 了,然后是开启 SAGA 事务,进行分支调用

# 这是 dtm 服务地址
dtm = "http://localhost:8080/api/dtmsvr"
# 这是业务微服务地址
svc = "http://localhost:5000/api" 

    req = {"amount": 30}
    s = saga.Saga(dtm, utils.gen_gid(dtm))
    s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate")
    s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate")
    s.submit()

至此,一个完整的 SAGA 分布式事务编写完成。

如果您想要完整运行一个成功的示例,那么参考这个例子yedf/dtmcli-py-sample,将它运行起来非常简单

# 部署启动 dtm
# 需要 docker 版本 18 以上
git clone https://github.com/yedf/dtm
cd dtm
docker-compose up 

# 另起一个命令行
git clone https://github.com/yedf/dtmcli-py-sample
cd dtmcli-py-sample
pip3 install flask dtmcli requests
flask run 

# 另起一个命令行
curl localhost:5000/api/fireSaga

4、处理网络异常

假设提交给 dtm 的事务中,调用转入操作时,出现短暂的故障怎么办?按照 SAGA 事务的协议,dtm 会重试未完成的操作,这时我们要如何处理?故障有可能是转入操作完成后出网络故障,也有可能是转入操作完成中出现机器宕机。如何处理才能够保障账户余额的调整是正确无问题的?

这类网络异常的妥当处理,是分布式事务中的大难题,异常情况包括三类:重复请求、空补偿、悬挂,都需要正确处理

DTM 提供了子事务屏障功能,保证上述异常情况下的业务逻辑,只会有一次正确顺序下的成功提交。(子事务屏障详情参考分布式事务最经典的七种解决方案的子事务屏障环节)

我们把处理函数调整为:

@app.post("/api/TransOutSaga")
def trans_out_saga():
  with barrier.AutoCursor(conn_new()) as cursor:
    def busi_callback(c):
      saga_adjust_balance(c, out_uid, -30)
    barrier_from_req(request).call(cursor, busi_callback)
  return {"dtm_result": "SUCCESS"}

这里的 barrier_from_req(request).call(cursor, busi_callback)调用会使用子事务屏障技术,保证 busi_callback 回调函数仅被提交一次

您可以尝试多次调用这个 TransIn 服务,仅有一次余额调整。

5、处理回滚

假如银行将金额准备转入用户 2 时,发现用户 2 的账户异常,返回失败,会怎么样?我们调整处理函数,让转入操作返回失败

@app.post("/api/TransInSaga")
def trans_in_saga():
  return {"dtm_result": "FAILURE"}

我们给出事务失败交互的时序图:

这里有一点,TransIn 的正向操作什么都没有做,就返回了失败,此时调用 TransIn 的补偿操作,会不会导致反向调整出错了呢?

不用担心,前面的子事务屏障技术,能够保证 TransIn 的错误如果发生在提交之前,则补偿为空操作;TransIn 的错误如果发生在提交之后,则补偿操作会将数据提交一次。

我们可以将返回错误的 TransIn 改成:

@app.post("/api/TransInSaga")
def trans_in_saga():
  with barrier.AutoCursor(conn_new()) as cursor:
    def busi_callback(c):
      saga_adjust_balance(c, in_uid, 30)
    barrier_from_req(request).call(cursor, busi_callback)
  return {"dtm_result": "FAILURE"}

最后的结果余额依旧会是对的,原理可以参考:分布式事务最经典的七种解决方案的子事务屏障环节

6、小结

在这篇文章里,我们介绍了 SAGA 的理论知识,也通过一个例子,完整给出了编写一个 SAGA 事务的过程,涵盖了正常成功完成,异常情况,以及成功回滚的情况。相信读者通过这边文章,对 SAGA 已经有了深入的理解。

文中使用的 dtm 是新开源的 Golang 分布式事务管理框架,功能强大,支持 TCC 、SAGA 、XA 、事务消息等事务模式,支持 Go 、python 、PHP 、node 、csharp 等语言的。同时提供了非常简单易用的接口。

以上就是利用 Python 轻松完成一个 Saga 分布式事务的详细内容,更多关于Python完成一个 Saga 分布式事务的资料请关注我们其它相关文章!

(0)

相关推荐

  • 详解分布式系统中如何用python实现Paxos

    一致性算法背景 1.Paxos一致性算法解决的问题:分布式系统中数据不能存在单个节点(主机)上,否则可能出现单点故障:多个节点(主机)需要保证具有相同的数据. 2.什么是一致性:一致性就是数据保持一致,在分布式系统中,可以理解为多个节点中数据的值是一致的. 3.一致性模型分类:一般分为强一致性和弱一致性,强一致性保证系统改变提交以后立即改变集群的状态.常见模型包括:Paxos,Raft(muti-paxos),ZAB(muti-paxos): 弱一致性也叫最终一致性,系统不保证改变提交以后立即改

  • Python分布式进程中你会遇到的问题解析

    小惊大怪 你是不是在用Python3或者在windows系统上编程?最重要的是你对进程和线程不是很清楚?那么恭喜你,在python分布式进程中,会有坑等着你去挖...(hahahaha,此处允许我吓唬一下你)开玩笑的啦,不过,如果你知道序列中不支持匿名函数,那这个坑就和你say byebye了.好了话不多数,直接进入正题. 分布式进程 正如大家所知道的Process比Thread更稳定,而且Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上.Python的m

  • Python搭建Spark分布式集群环境

    前言 Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象.Spark 最大的特点就是快,可比 Hadoop MapReduce 的处理速度快 100 倍.本文没有使用一台电脑上构建多个虚拟机的方法来模拟集群,而是使用三台电脑来搭建一个小型分布式集群环境安装. 本教程采用Spark2.0以上版本(比如Spark2.0.2.Spark2.1.0等)搭建集群,同样适用于搭建Spark1.6.2集群. 安装Hadoop并搭建好Hadoop集群环境 Spark分布式集群的安装

  • python django框架中使用FastDFS分布式文件系统的安装方法

    一.安装FastDFS 1-1:执行docker命令安装 # 安装tracker docker run -dti --network=host --name tracker -v /var/fdfs/tracker:/var/fdfs youkou1/fastdfs tracker # 安装storage docker run -dti --network=host --name storage -e TRACKER_SERVER=IP地址:22122 -v /var/fdfs/storage:

  • 带你用Python实现Saga 分布式事务的方法

    目录 分布式事务 SAGA SAGA实践 处理网络异常 处理回滚 小结 银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID,只能够通过分布式事务来解决. 分布式事务 分布式事务在分布式环境下,为了满足可用性.性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 BASE 理论: 基本业务可用性( Basic Availability ) 柔性状态( Soft state ) 最终一致性( Eve

  • Python多进程入门、分布式进程数据共享实例详解

    本文实例讲述了Python多进程入门.分布式进程数据共享.分享给大家供大家参考,具体如下: python多进程入门 https://docs.python.org/3/library/multiprocessing.html 1.先来个简单的 # coding: utf-8 from multiprocessing import Process # 定义函数 def addUser(): print("addUser") if __name__ == "__main__&qu

  • 讲解如何利用 Python完成 Saga 分布式事务

    目录 1.分布式事务 2.SAGA 3.SAGA 实践 4.处理网络异常 5.处理回滚 6.小结 银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID,只能够通过分布式事务来解决. 1.分布式事务 分布式事务在分布式环境下,为了满足可用性.性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 BASE 理论: 基本业务可用性( Basic Availability ) 柔性状态( Soft sta

  • 如何用C#实现SAGA分布式事务

    目录 背景 成功的 SAGA 异常的 SAGA 子事务屏障 写在最后 背景 银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID ,只能够通过分布式事务来解决. 市面上使用比较多的分布式事务框架,支持 SAGA 的,大部分都是 JAVA 为主的,没有提供 C# 的对接方式,或者是对接难度大,一定程度上让人望而却步. 下面就基于这个框架来实践一下银行转账的例子. 前置工作 dotnet add packa

  • 用python完成一个分布式事务TCC

    前言: 什么是分布式事务?银行跨行转账业务是一个典型分布式事务场景,假设A需要跨行转账给B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的ACID,只能够通过分布式事务来解决. 分布式事务就是指事务的发起者.资源及资源管理器和事务协调者分别位于分布式系统的不同节点之上.在上述转账的业务中,用户A-100操作和用户B+100操作不是位于同一个节点上.本质上来说,分布式事务就是为了保证在分布式场景下,数据操作的正确执行. 什么是TCC分布式事务,TCC是Try.Confirm.Ca

  • 关于MySQL与Golan分布式事务经典的七种解决方案

    目录 1.基础理论 1.1 事务 1.2 分布式事务 2.分布式事务的解决方案 2.1 两阶段提交/XA 2.2 SAGA 2.3 TCC 2.4 本地消息表 2.5 事务消息 2.6 最大努力通知 2.7 AT事务模式 3.异常处理 3.1 异常情况 3.2 子事务屏障 3.3 子事务屏障原理 3.4 子事务屏障小结 4.分布式事务实践 4.1 一个SAGA事务 4.2 处理网络异常 4.3 处理回滚 5.总结 前言: 随着业务的快速发展.业务复杂度越来越高,几乎每个公司的系统都会从单体走向分

  • 利用python将pdf输出为txt的实例讲解

    一个礼拜前一个同学问我这个事情,由于之前在参加华为的比赛,所以赛后看了一下,据说需要用到pdfminer这个包.于是安装了一下,安装过程很简单: sudo pip install pdfminer; 中间也没有任何的报错.至于如何调用,本人也没有很好的研究过pdfminer这个库,于是开始了百度-- 官方文档:http://www.unixuser.org/~euske/python/pdfminer/index.html 完全使用python编写. (适用于2.4或更新版本) 解析,分析,并转

  • 利用Python读取txt文档的方法讲解

    在G:/PythonPractise文件夹下新建一个名为record.txt的文本文档,写入如下图所示四行内容并保存. 打开python3的idle,开始写代码. 方法一代码和运行结果如下: 如上面运行结果所示,上面的结果是省略end=的写法,等价于end="\n"(回车); 下面的结果是end=""(空字符串)的写法,等价于end="\r"(换行) 方法二代码和运行结果如下: 方法三代码结果如下: 比较三种方法,方法一先将该路径下的文件返回成一

  • 利用Python代码实现数据可视化的5种方法详解

    前言 数据科学家并不逊色于艺术家.他们用数据可视化的方式绘画,试图展现数据内隐藏的模式或表达对数据的见解.更有趣的是,一旦接触到任何可视化的内容.数据时,人类会有更强烈的知觉.认知和交流. 数据可视化是数据科学家工作中的重要组成部分.在项目的早期阶段,你通常会进行探索性数据分析(Exploratory Data Analysis,EDA)以获取对数据的一些理解.创建可视化方法确实有助于使事情变得更加清晰易懂,特别是对于大型.高维数据集.在项目结束时,以清晰.简洁和引人注目的方式展现最终结果是非常

  • Java分布式事务管理框架之Seata

    目录 Seata介绍 三大组件 实现原理 四种事务模式 搭建seata服务端 单机版安装 集群安装 Seata介绍 Seata:Simple Extensible Autonomous Transaction Architecture,简易可扩展的自治式分布式事务管理框架,其前身是fescar.是一种简单分布式事务的解决方案.Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务.Seata 将为用户提供了 AT.TCC.SAGA 和 XA 事务模式,为用户打造一

  • EntityFramework 6.x学习之多个上下文迁移实现分布式事务详解

    前言 自从项目上了.NET Core平台用上了EntityFramework Core就再没碰过EntityFramework 6.x版本,目前而言EntityFramework 6.x是用的最多,无论是找工作而言还是提升自身技术而言皆自身收益,同时呢,大多数时间除了工作之外,还留有一小部分时间在写EntityFramework 6.x和EntityFramework Core的书籍,所以将EntityFramework 6.x相当于是从零学起,EntityFramework 6.x又添加了许多

随机推荐