PHP队列场景以及实现代码实例详解

为了降低单点压力,通常会根据业务情况进行分表分库,将表分布在不同的库中(库可能分布在不同的机器上),但是一个业务场景可能会同时处理两个表的操作。在这种场景下,事务的提交会变得相对复杂,因为多个节点(库)的存在,可能存在部分节点提交失败的情况,即事务的ACID特性需要在各个不同的数据库实例中保证。比如更新db1库的A表时,必须同步更新db2库的B表,两个更新形成一个事务,要么都成功,要么都失败。

那么我们如何利用mysql实现分布式数据库的事务呢?

mysql是从5.0开始支持分布式事务

这里先声明两个概念:

资源管理器(resource manager):用来管理系统资源,是通向事务资源的途径。数据库就是一种资源管理器。资源管理还应该具有管理事务提交或回滚的能力。
事务管理器(transaction manager):事务管理器是分布式事务的核心管理者。事务管理器与每个资源管理器(resource
manager)进行通信,协调并完成事务的处理。事务的各个分支由唯一命名进行标识。
mysql在执行分布式事务(外部XA)的时候,mysql服务器相当于xa事务资源管理器,与mysql链接的客户端相当于事务管理器。

分布式事务原理:分段式提交
分布式事务通常采用2PC协议,全称Two Phase Commitment Protocol。该协议主要为了解决在分布式数据库场景下,所有节点间数据一致性的问题。分布式事务通过2PC协议将提交分成两个阶段:

prepare;commit/rollback

阶段一为准备(prepare)阶段。即所有的参与者准备执行事务并锁住需要的资源。参与者ready时,向transaction manager报告已准备就绪。
阶段二为提交阶段(commit)。当transaction manager确认所有参与者都ready后,向所有参与者发送commit命令。

事务协调者transaction manager
因为XA 事务是基于两阶段提交协议的,所以需要有一个事务协调者(transaction manager)来保证所有的事务参与者都完成了准备工作(第一阶段)。如果事务协调者(transaction manager)收到所有参与者都准备好的消息,就会通知所有的事务都可以提交了(第二阶段)。MySQL 在这个XA事务中扮演的是参与者的角色,而不是事务协调者(transaction manager)。

Mysql的XA事务分为外部XA和内部XA

外部XA用于跨多MySQL实例的分布式事务,需要应用层作为协调者,通俗的说就是比如我们在PHP中写代码,那么PHP书写的逻辑就是协调者。应用层负责决定提交还是回滚,崩溃时的悬挂事务。MySQL数据库外部XA可以用在分布式数据库代理层,实现对MySQL数据库的分布式事务支持,例如开源的代理工具:网易的DDB,淘宝的TDDL等等。

内部XA事务用于同一实例下跨多引擎事务,由Binlog作为协调者,比如在一个存储引擎提交时,需要将提交信息写入二进制日志,这就是一个分布式内部XA事务,只不过二进制日志的参与者是MySQL本身。Binlog作为内部XA的协调者,在binlog中出现的内部xid,在crash recover时,由binlog负责提交。(这是因为,binlog不进行prepare,只进行commit,因此在binlog中出现的内部xid,一定能够保证其在底层各存储引擎中已经完成prepare)。

mysql xa事务的语法

1、首先要确保mysql开启XA事务支持

SHOW VARIABLES LIKE '%xa%'

如果innodb_support_xa的值是ON就说明mysql已经开启对XA事务的支持了。 如果不是就执行:

SET innodb_support_xa = ON

主要有:

XA START 'any_unique_id'; // 'any_unique_id' 是用户给的,全局唯一在一台mysql中开启一个XA事务
XA END 'any_unique_id '; //标识XA事务的操作结束
XA PREPARE 'any_unique_id'; //告知mysql 准备提交这个xa事务
XA COMMIT 'any_unique_id'; //告知mysql提交这个xa事务
XA ROLLBACK 'any_unique_id'; //告知mysql回滚这个xa事务
XA RECOVER;//查看本机mysql目前有哪些xa事务处于prepare状态

XA事务恢复
如果执行分布式事务的mysql crash了,mysql 按照如下逻辑进行恢复:
a. 如果这个xa事务commit了,那么什么也不用做
b. 如果这个xa事务还没有prepare,那么直接回滚它
c. 如果这个xa事务prepare了,还没commit, 那么把它恢复到prepare的状态,由用户去决定commit或rollback
当mysql crash后重新启动之后,执行“XA RECOVER;”查看当前处于prepare状态的xa事务,然后commit或rollback它们。

使用限制
a. XA事务和本地事务以及锁表操作是互斥的
开启了xa事务就无法使用本地事务和锁表操作

mysql> xa start 't1xa';
Query OK, 0 rows affected (0.04 sec)
mysql> begin;
ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state
mysql> lock table t1 read;
ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state

开启了本地事务就无法使用xa事务

mysql> begin;
Query OK, 0 rows affected (0.00 sec)
mysql> xa start 'rrrr';
ERROR 1400 (XAE09): XAER_OUTSIDE: Some work is done outside global transaction

b. xa start 之后必须xa end, 否则不能执行xa commitxa rollback

所以如果在执行xa事务过程中有语句出错了,你也需要先xa end一下,然后才能xarollback

注意事项

a. mysql只是提供了xa事务的接口,分布式事务中的mysql实例之间是互相独立的不感知的。 所以用户必须自己实现分布式事务的调度器
b. xa事务有一些使用上的bug, 参考http://www.mysqlops.com/2012/02/24/mysql-xa-optimize.html
主要是
“MySQL数据库的主备数据库的同步,通过Binlog的复制完成。而Binlog是MySQL数据库内部XA事务的协调者,并且MySQL数据库为binlog做了优化——binlog不写prepare日志,只写commit日志。
所有的参与节点prepare完成,在进行xa commit前crash。crash recover如果选择commit此事务。由于binlog在prepare阶段未写,因此主库中看来,此分布式事务最终提交了,但是此事务的操作并未 写到binlog中,因此也就未能成功复制到备库,从而导致主备库数据不一致的情况出现。
而crash recover如果选rollback, 那么就会出现全局不一致(该分布式事务对应的节点,部分已经提交,无法回滚,而部分节点回滚。最终导致同一分布式事务,在各参与节点,最终状态不一致)”

参考的那篇blog中给出的办法是修改mysql代码,这个无法在DBScale中使用。 所以可选的替代方案是不使用
主从复制进行备份,而是直接使用xa事务实现同步写来作为备份。

php+mysql实现分布式事务案例

保证数据表是innodb的

//db_finance库下
CREATE TABLE `t_user_account` (
 `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
 `username` varchar(255) NOT NULL DEFAULT '' COMMENT '用户名',
 `money` int(11) NOT NULL DEFAULT '0' COMMENT '账户金额',
 PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
//db_order库下
CREATE TABLE `t_user_orders` (
 `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
 `username` varchar(255) NOT NULL DEFAULT '',
 `money` int(11) NOT NULL DEFAULT '0' COMMENT '订单扣款金额',
 PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=44 DEFAULT CHARSET=utf8;

php代码

$username = '公众号PHP开源社区';
$order_money = 100;

$addOrder_success = addOrder($username,$order_money);
$upAccount_success = updateAccount($username,$order_money);

if($addOrder_success['state'] =="yes" && $upAccount_success['state']=="yes"){
  commitdb($addOrder_success['xa']);
  commitdb1($upAccount_success['xa']);
}else{
  rollbackdb($addOrder_success['xa']);
  rollbackdb1($upAccount_success['xa']);
}
die;
function addOrder ($username, $order_money){

  $xa = uniqid("");

  $sql_xa = "XA START '$xa'";
  $db = Yii::app()->dborder_readonly;
  $db->createCommand($sql_xa)->execute();

  $insert_sql = "INSERT INTO t_user_orders (`username`,`money`) VALUES ($username,$order_money)";
  $id = $db->createCommand($insert_sql)->execute();

  $db->createCommand("XA END '$xa'")->execute();
  if ($id) {

    $db->createCommand("XA PREPARE '$xa'")->execute();
    return ['state' => 'yes', 'xa' => $xa];
  }else {
    return ['state' => 'no', 'xa' => $xa];
  }

}

function updateAccount($username, $order_money){
  $xa = uniqid("");
  $sql_xa = "XA START '$xa'";
  $db = Yii::app()->db_finance;
  $db->createCommand($sql_xa)->execute();

  $sql = "update t_user_account set money=money-".$order_money." where username='$username'";

  $id = $db->createCommand($sql)->execute();

  $db->createCommand("XA END '$xa'")->execute();
  if ($id) {
    $db->createCommand("XA PREPARE '$xa'")->execute();
    return ['state' => 'yes', 'xa' => $xa];
  }else {
    return ['state' => 'no', 'xa' => $xa];
  }

}

//提交事务!
function commitdb($xa){

  $db = Yii::app()->dborder_readonly;
  return $db->createCommand("XA COMMIT '$xa'")->execute();
}

//回滚事务
function rollbackdb($xa){

  $db = Yii::app()->dborder_readonly;
  return $db->createCommand("XA COMMIT '$xa'")->execute();
}

//提交事务!
function commitdb1($xa){

  $db = Yii::app()->db_finance;
  return $db->createCommand("XA COMMIT '$xa'")->execute();

}
//回滚事务
function rollbackdb1($xa){
  $db = Yii::app()->db_finance;
  return $db->createCommand("XA ROLLBACK '$xa'")->execute();

到此这篇关于PHP队列场景以及实现代码实例详解的文章就介绍到这了,更多相关PHP队列场景以及实现内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 详解PHP队列的实现

    队列是一种特殊的线性表,它只允许在表的前端,可以称之为front,进行删除操作:而在表的后端,可以称之为rear进行插入操作.队列和堆栈一样,是一种操作受限制的线性表,和堆栈不同之处在于:队列是遵循"先进先出"原则,而堆栈遵循的是"先进后出"原则.队列进行插入操作的端称为队尾,进行删除操作的称为队头,只允许在队尾进行插入操作,在队头进行删除操作. 队列的数据元素又称为队列元素,在队尾中插入一个元素称为入队,在队头删除一个元素称为出队.具体实现参考代码: <?p

  • php基于Redis消息队列实现的消息推送的方法

    基本知识点 重点用到了以下命令实现我们的消息推送 brpop 阻塞模式 从队列右边获取值之后删除 brpoplpush 从队列A的右边取值之后删除,从左侧放置到队列B中 逻辑分析 在普通的任务脚本中写入push_queue队列要发送消息的目标,并为目标设置一个要推送的内容,永不过期 RedisPushQueue中brpoplpush处理,处理后的值放到temp_queue,主要防止程序崩溃造成推送失败 RedisAutoDeleteTempqueueItems处理temp_queue,这里用到了

  • PHP Beanstalkd消息队列的安装与使用方法实例详解

    本文实例讲述了PHP Beanstalkd消息队列的安装与使用方法.分享给大家供大家参考,具体如下: 一.Beanstalkd是什么? Beanstalkd是一个高性能,轻量级的分布式内存队列 二.Beanstalkd特性 1.支持优先级(支持任务插队) 2.延迟(实现定时任务) 3.持久化(定时把内存中的数据刷到binlog日志) 4.预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理) 5.任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败,重新进入队列

  • PHP7生产环境队列Beanstalkd用法详解

    应用场景 为什么要用呢,有什么好处?这应该放在最开头说,一件东西你只有了解它是干什么的,适合干什么,才能更好的与自己的项目相结合,用到哪里学到哪里,学了不用等于不会,我们平时就应该多考虑一些这样的问题:自己做个什么项目功能能跟 xx 技术相结合呢?这个 xx 技术放在这种业务场景下行不行呢?而不是 "学了这个 xx 技术能干嘛呢,公司现在也没有用这个的呀,学了也没用啊",带着这样心情去学习 xx 技术,肯定很痛苦. 队列大家都知道是将一些耗时的操作先不去做,先埋点,再异步去处理,这样对

  • PHP如何通过带尾指针的链表实现'队列'

    这篇文章是展示通过 PHP 语言实现一种带 尾指针 的链表,然后通过链表来实现队列,其中链表的头元素 head 是用于列队 出队 的,它的时间复杂度 O(1) ,若在 head 的基础上实现链表尾部 入队 时间度为 O(n),为了降低入队操作的时间复杂度,可以给链表维护一个带有尾指针的变量 tail ,这样每次入队的时候直接操作 tail ,出队的时候直接操作 head ,这样可以使得 入队 和 出队 时间复杂度都是 O(1). 1.output_queue_by_liked_list.php

  • PHP+RabbitMQ实现消息队列的完整代码

    前言 为什么使用RabbitMq而不是ActiveMq或者RocketMq? 首先,从业务上来讲,我并不要求消息的100%接受率,并且,我需要结合php开发,RabbitMq相较RocketMq,延迟较低(微妙级).至于ActiveMq,貌似问题较多.RabbitMq对各种语言的支持较好,所以选择RabbitMq. 先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异. php扩展地址: http://pecl.php.net/package/amq

  • PHP队列场景以及实现代码实例详解

    为了降低单点压力,通常会根据业务情况进行分表分库,将表分布在不同的库中(库可能分布在不同的机器上),但是一个业务场景可能会同时处理两个表的操作.在这种场景下,事务的提交会变得相对复杂,因为多个节点(库)的存在,可能存在部分节点提交失败的情况,即事务的ACID特性需要在各个不同的数据库实例中保证.比如更新db1库的A表时,必须同步更新db2库的B表,两个更新形成一个事务,要么都成功,要么都失败. 那么我们如何利用mysql实现分布式数据库的事务呢? mysql是从5.0开始支持分布式事务 这里先声

  • Android开心消消乐代码实例详解

    突然想要在android上写一个消消乐的代码,在此之前没有系统地学过java的面向对象,也没有任何android相关知识,不过还是会一点C++.8月初开始搭建环境,在这上面花了相当多的时间,然后看了一些视频和电子书,对android有了一个大概的了解,感觉差不多了的时候就开始写了. 疯狂地查阅各种资料,反反复复了好几天后,也算是写出了个成品.原计划有很多地方还是可以继续写下去的,比如UI设计,比如动画特效,时间设计,关卡设计,以及与数据库的连接,如果可以的话还能写个联网功能,当然因为写到后期内心

  • JAVA类变量及类方法代码实例详解

    这篇文章主要介绍了JAVA类变量及类方法代码实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 类变量(static) 类变量是该类的所有对象共享的变量,任何一个该类的对象去访问它时,取到的都是相同的值,同样任何一个该类的对象去修改它时,修改的也是同一个变量. public class C { public static void main(String[] args){ Child ch1 = new Child(12,"小小"

  • JavaWeb Refresh响应头代码实例详解

    这篇文章主要介绍了JavaWeb Refresh响应头代码实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.Refresh响应头:可以理解为定时的重定向,在指定的时间后发生页面的跳转 二.代码示例: package cn.xxx.Servlet; import java.io.IOException; import javax.servlet.ServletException; import javax.servlet.http.Ht

  • php命令行模式代码实例详解

    php全集行模式,即php-cli,官方文档中称为: CLI SAPI(Server Application Programming Interface,服务端应用编程端口).听着挺复杂.其实是因为php原本为服务器端的脚本语言,所以引申出这个叫法. 与服务端模式的不同 服务端模式主要有两种工作方式: 作为web server的模式方式或作为一个cgi可执行程序. 前者,比如作为apach中的一个模块(如:php5apache2.dll); 后者作为可执行程序,如php-cig. 现在的替代者为

  • php自动加载代码实例详解

    1. 让我们为 PHP 创建枚举,提供一些代码示例 如果我们的代码需要对枚举常量和值进行更多验证,该怎么办? 根据使用情况,我通常会使用类似以下的简单内容: abstract class DaysOfWeek { const Sunday = 0; const Monday = 1; // etc. } $today = DaysOfWeek::Sunday; 这是一个扩展的示例,可以更好地服务于更广泛的案例: abstract class BasicEnum { private static

  • laravel下trait的使用代码实例详解

    前言 今天在整理laravel的练习项目时,发现自己的代码结构中有很多重复的代码.于是搜索了一下laravel框架的代码复用机制.知道了Trait的存在,于是学习使用了一下. 关于Trait的理解 Trait是PHP5.4引入的新概念,定义方式和class的定义方式类似.但是并不具备class的完整性.Trait看上去更像是一个class的一部分.它使不相关的两个class能够具有类似的行为. Trait的简单使用 新建一个Trait 本人由于数据库操作上使用了eloquent模型,在多表查询时

  • PHP的重载使用魔术方法代码实例详解

    摘录PHP官网对PHP重载的解释: PHP所提供的"重载"(overloading)是指动态地"创建"类属性和方法.我们是通过魔术方法(magic methods)来实现的. 当调用当前环境下未定义或不可见的类属性或方法时,重载方法会被调用.本节后面将使用"不可访问属性(inaccessible properties)"和"不可访问方法(inaccessible methods)"来称呼这些未定义或不可见的类属性或方法. 所有

  • python字符串,元组,列表,字典互转代码实例详解

    python字符串,元组,列表,字典互相转换直接给大家上代码实例 #-*-coding:utf-8-*- #1.字典 dict = {'name': 'Zara', 'age': 7, 'class': 'First'} #字典转为字符串,返回:<type 'str'> {'age': 7, 'name': 'Zara', 'class': 'First'} print type(str(dict)), str(dict) #字典可以转为元组,返回:('age', 'name', 'class

  • ActiveMQ消息签收机制代码实例详解

    这篇文章主要介绍了ActiveMQ消息签收机制代码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 消费者客户端成功接收一条消息的标志是:这条消息被签收. 消费者客户端成功接收一条消息一般包括三个阶段: 1.消费者接收消息,也即从MessageConsumer的receive方法返回 2.消费者处理消息 3.消息被签收 其中,第三阶段的签收可以有ActiveMQ发起,也可以由消费者客户端发起,取决于Session是否开启事务以及签收模式的

随机推荐