63

阿里开源分布式事务组件 seata :demo 环境搭建以及运行流程简析

 4 years ago
source link: https://www.tuicool.com/articles/fYz6rmV
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

案例设计

seata 官方给出了一系列 demo 样例,不过我在用的过程中发现总有这个那个的问题,所以自己维护了一份基于 dubbo 的 demo 在 github 上,适配的 seata 版本是 0.8.0。

案例的设计直接参考官方 quick start给出的案例:

uM3632r.jpg!web

整个案例分为三个服务,分别是存储服务、订单服务和账户服务,这些服务通过 dubbo 进行发布和调用,内部调用逻辑如上面图所示。  

整个 demo 的工程样例如下所示:

2UVbAvZ.jpg!web

undo_log 表

这个案例除了在数据库需要建立业务表以外,还要额外建立一张 undo_log 表,这个表的主要作用是记录事务的前置镜像和后置镜像。

全局事务进行到提交阶段,则删除该表对应的记录,全局事务如果需要回滚,则会利用这个表里记录的镜像数据,恢复数据。

undo_log 表里的数据实际上是“朝生夕死”的,数据不需要在表里存活太久。表结构如下所示:

CREATE TABLE `undo_log` (

`id` bigint(20) NOT NULL AUTO_INCREMENT,

`branch_id` bigint(20) NOT NULL,

`xid` varchar(100) NOT NULL,

`context` varchar(128) NOT NULL,

`rollback_info` longblob NOT NULL,

`log_status` int(11) NOT NULL,

`log_created` datetime NOT NULL,

`log_modified` datetime NOT NULL,

`ext` varchar(100) DEFAULT NULL,

PRIMARY KEY (`id`),

UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

服务逻辑

每个服务都对应了一个 starter 类,这个类主要用来在 spring 环境下,将该服务启动,并通过 dubbo 发布出去,以账户服务为例:

/**

* The type Dubbo account service starter.

*/

public class DubboAccountServiceStarter {

/**

* 2. Account service is ready . A buyer register an account: U100001 on my e-commerce platform

*

* @param args the input arguments

*/

public static void main(String[] args) {

ClassPathXmlApplicationContext accountContext = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-account-service.xml"});

accountContext.getBean("service");

JdbcTemplate accountJdbcTemplate = (JdbcTemplate) accountContext.getBean("jdbcTemplate");

accountJdbcTemplate.update("delete from account_tbl where user_id = 'U100001'");

accountJdbcTemplate.update("insert into account_tbl(user_id, money) values ('U100001', 999)");


new ApplicationKeeper(accountContext).keep();

}

}

首先通过   ClassPathXmlApplicationContext   读取 dubbo-account-service.xml 这个 spring 配置文件并启动 spring 容器环境,并通过 spring 的 jdbc template 对账户表的数据进行初始化。  

dubbo-account-service.xml 配置文件中进行了各类 bean 的配置,包括 dubbo 与 spring 结合时的标准配置:

<bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">

<constructor-arg ref="accountDataSource" />

</bean>


<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">

<property name="dataSource" ref="accountDataSourceProxy" />

</bean>


<dubbo:application name="dubbo-demo-account-service" />

<dubbo:registry address="zookeeper://localhost:2181" />

<dubbo:protocol name="dubbo" port="20881" />

<dubbo:service interface="io.seata.samples.dubbo.service.AccountService" ref="service" timeout="10000"/>


<bean id="service" class="io.seata.samples.dubbo.service.impl.AccountServiceImpl">

<property name="jdbcTemplate" ref="jdbcTemplate"/>

</bean>


<bean class="io.seata.spring.annotation.GlobalTransactionScanner">

<constructor-arg value="dubbo-demo-account-service"/>

<constructor-arg value="my_test_tx_group"/>

</bean>

这份配置里主要有两个需要引起注意的关键点

  1. jdbcTemplate 这个 bean 所依赖的数据源 bean,是一个类名为 io.seata.rm.datasource.DataSourceProxy 的数据源类,通过它的名字可以很明显地看出这是一个代理模式的应用,因为 seata 为完成全局事务的逻辑,需要在普通的 sql 操作前后添加一些逻辑,比如说 sql 执行前对 sql 进行语法解析,生成前置镜像,sql 执行后生成后置镜像,通过代理的方式,可以方便地对 connection,statement 等进行代理包装,在调用的时候进行拦截,加入自己的逻辑。

  2. 配置文件中还有一个 io.seata.spring.annotation.GlobalTransactionScanner 类型的 bean,这个 bean 是支撑 seata 能在 spring 环境中通过注解的方式来划定事务边界的基础。在 spring 容器启动时,会扫描 @GlobalTransactional   注解是否存在,这个注解标识了全局事务的开始和结束,也就是我们常说的“事务的边界”

业务逻辑

业务逻辑的具体详情在 BusinessServiceImpl   类中可以看到:

@Override

@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")

public void purchase(String userId, String commodityCode, int orderCount) {

LOGGER.info("purchase begin ... xid: " + RootContext.getXID());

storageService.deduct(commodityCode, orderCount);

orderService.create(userId, commodityCode, orderCount);

// throw new RuntimeException("xxx");

}

先调用存储服务,减少库存,然后调用订单服务,新建订单。这两个动作属于一个整体的事务,任何一个动作失败,都需要撤销所有的操作。

这个方法也有两个需要注意的点:

  1. 该方法上声明了 @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx") 这样的注解,用于让上文提到的 GlobalTransactionScanner 扫描的时候发现这是一个全局事务。

  2. 方法的最后有一行代码抛出了 RuntimeException,这主要是为了模仿全局事务的失败,并让 seata 走全局事务回滚逻辑。

事务扫描与边界定义

上文提到的 GlobalTransactionScanner 类,会在 spring 容器启动的时候,也被初始化。

在它的 afterPropertiesSet 方法被调用时,会触发 seata client 的初始化

@Override

public void afterPropertiesSet() {

if (disableGlobalTransaction) {

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Global transaction is disabled.");

}

return;

}

initClient();


}

关于 seata client 的初始化的细节,可以看看我写的另外一篇文章 《阿里开源分布式事务组件 seata : seata client 通信层解析》  

初始化客户端做的事情主要是建立与 seata server 的连接,并注册 TM 和 RM。接下来,在 wrapIfNecessary 方法里,实现对注解的扫描,并对添加了注解的方法添加 interceptor。  

这篇文章里我们暂时不讨论 TCC 模式,只讨论 AT 模式,也暂不讨论全局事务锁 GlobalLock 的实现,先忽略这些有关的逻辑,只关注事务处理逻辑。

Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);

Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

if (!existsAnnotation(new Class[] {serviceInterface})

&& !existsAnnotation(interfacesIfJdk)) {

return bean;

}

if (interceptor == null) {

interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);

}

在这里,interceptor 的实现是 GlobalTransactionalInterceptor,也就是说,以上文的案例为例子,当 BusinessServiceImpl 的 purchase 方法被调用的时候,实际上这个方法会被拦截器拦截,执行拦截器里的逻辑:

@Override

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {

Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);

Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);

final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);


final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);

final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);

if (globalTransactionalAnnotation != null) {

return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);

} else if (globalLockAnnotation != null) {

return handleGlobalLock(methodInvocation);

} else {

return methodInvocation.proceed();

}

}


private Object handleGlobalTransaction(final MethodInvocation methodInvocation,

final GlobalTransactional globalTrxAnno) throws Throwable {

try {

return transactionalTemplate.execute(new TransactionalExecutor() {

@Override

public Object execute() throws Throwable {

return methodInvocation.proceed();

}


public String name() {

String name = globalTrxAnno.name();

if (!StringUtils.isNullOrEmpty(name)) {

return name;

}

return formatMethod(methodInvocation.getMethod());

}


@Override

public TransactionInfo getTransactionInfo() {

TransactionInfo transactionInfo = new TransactionInfo();

transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());

transactionInfo.setName(name());

Set<RollbackRule> rollbackRules = new LinkedHashSet<>();

for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {

rollbackRules.add(new RollbackRule(rbRule));

}

for (String rbRule : globalTrxAnno.rollbackForClassName()) {

rollbackRules.add(new RollbackRule(rbRule));

}

for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {

rollbackRules.add(new NoRollbackRule(rbRule));

}

for (String rbRule : globalTrxAnno.noRollbackForClassName()) {

rollbackRules.add(new NoRollbackRule(rbRule));

}

transactionInfo.setRollbackRules(rollbackRules);

return transactionInfo;

}

});

} catch (TransactionalExecutor.ExecutionException e) {

TransactionalExecutor.Code code = e.getCode();

switch (code) {

case RollbackDone:

throw e.getOriginalException();

case BeginFailure:

failureHandler.onBeginFailure(e.getTransaction(), e.getCause());

throw e.getCause();

case CommitFailure:

failureHandler.onCommitFailure(e.getTransaction(), e.getCause());

throw e.getCause();

case RollbackFailure:

failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());

throw e.getCause();

default:

throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

}

}

}

在执行 handleGlobalTransaction 方法时,实际上采用模板模式,委托给了 TransactionalTemplate 类去执行标准的事务处理流程。如下所示:

/**

* Execute object.

*

* @param business the business

* @return the object

* @throws TransactionalExecutor.ExecutionException the execution exception

*/

public Object execute(TransactionalExecutor business) throws Throwable {

// 1. get or create a transaction

GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();


// 1.1 get transactionInfo

TransactionInfo txInfo = business.getTransactionInfo();

if (txInfo == null) {

throw new ShouldNeverHappenException("transactionInfo does not exist");

}

try {


// 2. begin transaction

beginTransaction(txInfo, tx);


Object rs = null;

try {


// Do Your Business

rs = business.execute();


} catch (Throwable ex) {


// 3.the needed business exception to rollback.

completeTransactionAfterThrowing(txInfo,tx,ex);

throw ex;

}


// 4. everything is fine, commit.

commitTransaction(tx);


return rs;

} finally {

//5. clear

triggerAfterCompletion();

cleanUp();

}

}

事务处理逻辑实际上是一种模板,将事务相关的处理逻辑放在 try 块里,发现异常后执行回滚,正常执行则执行提交。  

在这里有个需要注意的地方是,seata 不把提交这个动作放在 try 块里,因为在 seata 里,全局事务的提交实际上是可以异步执行的。  

因为全局事务如果进行到提交这一阶段,那么意味着各个分支事务已经执行过本地提交,全局事务的提交阶段仅仅是删除 undo_log 里的记录,这个记录删除或者不删除,实际上不会改变全局事务已经正常完成的事实。所以它可以用程序异步去做,或者以人工介入的方式去做,所以 seata 认为,全局事务提交失败,不需要执行回滚流程。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK