1

使用强大的DBPack处理分布式事务(PHP使用教程) - Scott Lewis

 1 year ago
source link: https://www.cnblogs.com/DKSL/p/16433523.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

主流的分布式事务的处理方案

近些年,随着微服务的广泛使用,业务对系统的分布式事务处理能力的要求越来越高。

早期的基于XA协议的二阶段提交方案,将分布式事务的处理放在数据库驱动层,实现了对业务的无侵入,但是对数据的锁定时间很长,性能较低。

现在主流的TCC事务方案和SAGA事务方案,都是基于业务补偿机制,虽然没有全局锁,性能很高,但是一定程度上入侵了业务逻辑,增加了业务开发人员的开发时间和系统维护成本。

新兴的AT事务解决方案,例如SeataSeata-golang,通过数据源代理层的资源管理器RM记录SQL回滚日志,跟随本地事务一起提交,大幅减少了数据的锁定时间,性能好且对业务几乎没有侵入。其缺点是支持的语言比较单一,例如Seata只支持Java语言类型的微服务,Seata-golang只支持Go语言类型的微服务。

为了突破AT事务对业务编程语言的限制,现在业界正在往DB Mesh的方向发展,通过将事务中间件部署在SideCar的方式,达到任何编程语言都能使用分布式事务中间件的效果。

DBPack是一个处理分布式事务的数据库代理,其能够拦截MySQL流量,生成对应的事务回滚镜像,通过与ETCD协调完成分布式事务,性能很高,且对业务没有入侵,能够自动补偿SQL操作,支持接入任何编程语言。DBPack还支持TCC事务模式,能够自动补偿HTTP请求。目前其demo已经有Java、Go、Python和PHP,TCC的sample也已经在路上了,demo示例可以关注dbpack-samples

最新版DBPack不仅支持预处理的sql语句,还支持text类型的sql。DBPack最新版还兼容了php8的pdo_mysql扩展。Mysql 客户端在给用户发送 sql 执行结果时,如果执行没有异常,发送的第一个包为 OKPacket,该包中有一个标志位可以标识 sql 请求是否在一个事务中。如下图所示

434643-20220701095312787-1715141151.png

这个包的内容为:

07 00 00 // 前 3 个字节表示 payload 的长度为 7 个字节
01 // sequence 响应的序号,前 4 个字节一起构成了 OKPacket 的 header
00 // 标识 payload 为 OKPacket
00 // affected row
00 // last insert id
03 00 // 状态标志位
00 00 // warning 数量

dbpack 之前的版本将标志位设置为 0,java、golang、.net core、php 8.0 之前的 mysql driver 都能正确协调事务,php 8.0 的 pdo driver 会对标志位进行校验,所以 php 8.0 以上版本在使用 dbpack 协调分布式事务时,会抛出 transaction not active 异常。最新版本已经修复了这个问题。

下图是具体的DBPack事务流程图。

434643-20220701095250755-1885591527.png

其事务流程简要描述如下:

  1. 客户端向聚合层服务的DBPack代理发起HTTP请求。
  2. DBPack生成全局唯一的XID,存储到ETCD中。注意请求的地址和端口指向DBPack,并不直接指向实际API。
  3. 如果开启全局事务成功(如果失败则直接结束事务),聚合层服务就可以通过HTTP header(X-Dbpack-Xid)拿到XID了。此时,聚合服务调用服务1并传递XID。
  4. 服务1拿到XID,通过DBPack代理,注册分支事务(生成BranchID等信息,并存储到ETCD)。
  5. 服务1的分支事务注册成功后,生成本地事务的回滚镜像,随着本地事务一起commit。
  6. 服务2进行与服务1相同的步骤4和5。
  7. 聚合层服务根据服务1和服务2的结果,决议是全局事务提交还是回滚,如果是成功,则返回HTTP 200给DBPack(除200以外的状态码都会被DBPack认为是失败)。DBPack更新ETCD中的全局事务状态为全局提交中或回滚中。
  8. 服务1和服务2的DBPack,通过ETCD的watch机制,得知本地的分支事务是该提交还是回滚(如果是提交,则删除回滚日志;如果是回滚,则执行通过回滚日志回滚到事务前镜像)。
  9. 所有的分支事务提交或回滚完成后,聚合层服务的DBPack的协程会检测到事务已经完成,将从ETCD删除XID和BranchID等事务信息。

本文将以PHP语言为例,详细介绍如何使用PHP对接DBPack完成分布式事务。实际使用其他语言时,对接过程也是类似的。

使用PHP对接DBPack实现分布式事务

  • 业务数据库为mysql数据库
  • 业务数据表为innodb类型
  • 业务数据表必须有主键

Step0: 安装ETCD

ETCD_VER=v3.5.3

# choose either URL
GOOGLE_URL=https://storage.googleapis.com/etcd
GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GOOGLE_URL}

rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test

curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz

/tmp/etcd-download-test/etcd --version
/tmp/etcd-download-test/etcdctl version
/tmp/etcd-download-test/etcdutl version

Step1: 在业务数据库中创建undo_log表

undo_log表用于存储本地事务的回滚镜像。

-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_unionkey` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Step2: 编写配置文件,对接DBPack

# 更新distributed_transaction.etcd_config.endpoints
# 更新listeners配置项,调整为实际聚合层服务的地址和端口
# 更新filters配置项,配置聚合层服务的API endpoint
vim /path/to/your/aggregation-service/config-aggregation.yaml

# 更新distributed_transaction.etcd_config.endpoints
# 更新listeners配置项,配置业务数据库信息,包括dbpack代理的端口
# 更新data_source_cluster.dsn
vim /path/to/your/business-service/config-service.yaml

Step3: 运行DBPack

git clone [email protected]:cectc/dbpack.git

cd dbpack
# build on local env
make build-local
# build on production env
make build

./dist/dbpack start --config /path/to/your/config-aggregation.yaml

./dist/dbpack start --config /path/to/your/config-service.yaml

Step4: 配置vhost,监听php项目端口

以Nginx为例,配置如下

server {
    listen 3001; # 暴露的服务端口
    index index.php index.html;
    root /var/www/code/; # 业务代码根目录

    location / {
        try_files $uri /index.php?$args;
    }

    location ~ \.php$ {
        fastcgi_split_path_info ^(.+\.php)(/.+)$;
        fastcgi_pass order-svc-app:9000; # php-fpm 端口
        fastcgi_index index.php;
        include fastcgi_params;
        fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
        fastcgi_param PATH_INFO $fastcgi_path_info;
    }
}

Step5: 编写应用程序

aggregation service example

<?php

class AggregationSvc
{

    public function CreateSo(string $xid, bool $rollback): bool
    {
        $createSoSuccess = $this->createSoRequest($xid);
        if (!$createSoSuccess) {
            return false;
        }
        $allocateInventorySuccess = $this->allocateInventoryRequest($xid);
        if (!$allocateInventorySuccess) {
            return false;
        }
        if ($rollback) {
            return false;
        }
        return true;
    }

    // private function createSoRequest(string $xid) ...
    // private function allocateInventoryRequest(string $xid) ...
}

$reqPath = strtok($_SERVER["REQUEST_URI"], '?');
$reaHeaders = getallheaders();

$xid = $reaHeaders['X-Dbpack-Xid'] ?? '';

if (empty($xid)) {
    die('xid is not provided!');
}

$aggregationSvc = new AggregationSvc();

if ($_SERVER['REQUEST_METHOD'] === 'POST') {
    switch ($reqPath) {
        case '/v1/order/create':
            if ($aggregationSvc->CreateOrder($xid, false)) {
                responseOK();
            } else {
                responseError();
            }
        case '/v1/order/create2':
            if ($aggregationSvc->CreateSo($xid, true)) {
                responseOK();
            } else {
                responseError();
            }
            break;
        default:
            die('api not found');
    }
}

function responseOK() {
    http_response_code(200);
    echo json_encode([
        'success' => true,
        'message' => 'success',
    ]);
}

function responseError() {
    http_response_code(400);
    echo json_encode([
        'success' => false,
        'message' => 'fail',
    ]);
}

order service example

<?php

class OrderDB
{
    private PDO $_connection;
    private static OrderDB $_instance;
    private string $_host = 'dbpack-order';
    private int $_port = 13308;
    private string $_username = 'dksl';
    private string $_password = '123456';
    private string $_database = 'order';

    const insertSoMaster = "INSERT /*+ XID('%s') */ INTO order.so_master (sysno, so_id, buyer_user_sysno, seller_company_code, 
		receive_division_sysno, receive_address, receive_zip, receive_contact, receive_contact_phone, stock_sysno, 
        payment_type, so_amt, status, order_date, appid, memo) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,now(),?,?)";

    const insertSoItem = "INSERT /*+ XID('%s') */ INTO order.so_item(sysno, so_sysno, product_sysno, product_name, cost_price, 
		original_price, deal_price, quantity) VALUES (?,?,?,?,?,?,?,?)";

    public static function getInstance(): OrderDB
    {
        if (empty(self::$_instance)) {
            self::$_instance = new self();
        }
        return self::$_instance;
    }

    private function __construct()
    {
        try {
            $this->_connection = new PDO(
                "mysql:host=$this->_host;port=$this->_port;dbname=$this->_database;charset=utf8",
                $this->_username,
                $this->_password,
                [
                    PDO::ATTR_PERSISTENT => true,
                    PDO::ATTR_EMULATE_PREPARES => false, // to let DBPack handle prepread sql
                ]
            );
        } catch (PDOException $e) {
            die($e->getMessage());
        }
    }

    private function __clone()
    {
    }

    public function getConnection(): PDO
    {
        return $this->_connection;
    }

    public function createSo(string $xid, array $soMasters): bool
    {
        $this->getConnection()->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        try {
            $this->getConnection()->beginTransaction();
            foreach ($soMasters as $master) {
                if (!$this->insertSo($xid, $master)) {
                    throw new PDOException("failed to insert soMaster");
                }
            }
            $this->getConnection()->commit();
        } catch (PDOException $e) {
            $this->getConnection()->rollBack();
            return false;
        }
        return true;
    }

    private function insertSo(string $xid, array $soMaster): bool
    {
        // insert into so_master, so_item ...
    }
}

$reqPath = strtok($_SERVER["REQUEST_URI"], '?');
$reqHeaders = getallheaders();

$xid = $reqHeaders['Xid'] ?? '';

if (empty($xid)) {
    die('xid is not provided!');
}

if ($_SERVER['REQUEST_METHOD'] === 'POST') {
    if ($reqPath === '/createSo') {
        $reqBody = file_get_contents('php://input');
        $soMasters = json_decode($reqBody, true);

        $orderDB = OrderDB::getInstance();
        $result = $orderDB->createSo($xid, $soMasters);

        if ($result) {
            responseOK();
        } else {
            responseError();
        }
    }
}

function responseOK() {
    http_response_code(200);
    echo json_encode([
        'success' => true,
        'message' => 'success',
    ]);
}

function responseError() {
    http_response_code(400);
    echo json_encode([
        'success' => false,
        'message' => 'fail',
    ]);
}

Step6: 访问聚合层业务接口

curl -X{HTTP Method} http://localhost:{DBPack监听的聚合层服务端口}/{聚合层服务的API endpoint}
  • 无论是使用mysqli驱动、pdo_mysql驱动,还是通过mysql_connect()连接数据库(<=php5.4),在start transaction;开始之后,后续的业务操作必须在同一个数据库连接上进行。
  • DBPack通过xid(全局事务唯一ID)在事务上下文中传播,业务数据库执行的业务SQL语句中,需要加入xid注释,这样DBPack才能根据xid处理对应的事务。例如insert /*+ XID('%s') */ into xx ...;

卜贺贺。就职于日本楽天Rakuten CNTD,任Application Engineer,熟悉AT事务、Seata-golang和DBPack。GitHub:https://github.com/bohehe


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK