15

分布式事务太繁琐?官方推荐 Atomikos,5 分钟帮你搞定

 3 years ago
source link: https://xie.infoq.cn/article/bb452190e896053c735fd7126
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

互联网应用架构:专注编程教学,架构,JAVA,Python,微服务,机器学习等领域,欢迎关注,一起学习。

IvYR3ym.jpg!mobile

前言

最近有个项目,里面涉及到多个数据源的操作,按照以前的做法采用MQ来做最终一致性,但是又觉得繁琐些,项目的量能其实也不大很小,想来想去最终采用Atomikos来实现。

XA是啥

在做Atomikos之前,我们先来了解一下什么是XA。XA是由X/Open组织提出的分布式事务的一种协议(或者称之为分布式架构)。它主要定义了两部分的管理器,全局事务管理器及资源管理器。在XA的设计理念中,把不同资源纳入到一个事务管理器进行统一管理,例如数据库资源,消息中间件资源等,从而进行全部资源的事务提交或者取消,目前主流的数据库,消息中间件都支持XA协议。

zqaeYrM.jpg!mobile

JTA又是啥

上面讲完XA协议,我们来聊聊JTA,JTA叫做Java Transaction API,它是XA协议的JAVA实现。目前在JAVA里面,关于JTA的定义主要是两部分

1、事务管理器接口-----javax.transaction.TransactionManager

2、资源管理器接口-----javax.transaction.xa.XAResource

在一般应用采用JTA接口实现事务,需要一个外置的JTA容器来存储这些事务,像Tomcat。今天我们要讲的是Atomikos,它是一个独立实现了JTA的框架,能够在我们的应用服务器中运行JTA事务。

接下来我们直接进入到主题,在一个微服务应用中,针对多数据源的时候如何实现分布式事务。

BJviErM.jpg!mobile

基础包引入

<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.boots</groupId>
<artifactId>boots</artifactId>
<version>3.0.0.RELEASE</version>
</parent>
<artifactId>boots-atomikos</artifactId>
<name>boots-atomikos</name>
<description>分布式事务</description>
<dependencies>

<!-- jta-atomikos 分布式事务管理 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

<!-- 公共组件:swagger服务+入参出参+统一异常拦截 -->
<dependency>
<groupId>com.boots</groupId>
<artifactId>module-boots-api</artifactId>
<version>${parent.version}</version>
</dependency>

<!--springboot与mybatis整合 -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.4</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.2</version>
</dependency>

<!--阿里druid数据量连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.21</version>
</dependency>

<!--数据库连接驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<!-- mybatisplus配置 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>

<!-- 采用p6spy做SQL代理 -->
<dependency>
<groupId>p6spy</groupId>
<artifactId>p6spy</artifactId>
<version>3.9.0</version>
</dependency>

<!-- 用于敏感信息加密 -->
<dependency>
<groupId>com.github.ulisesbocchio</groupId>
<artifactId>jasypt-spring-boot-starter</artifactId>
<version>3.0.2</version>
</dependency>

<dependency>
<groupId> org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>



</dependencies>
</project>


配置第一个数据源

/**
* All rights Reserved, Designed By 林溪
* Copyright: Copyright(C) 2016-2020
*/

package com.boots.atomikos.common.config;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.boots.atomikos.common.constants.AtomikosConstant;
import com.boots.atomikos.common.data.FirstDbData;
import com.boots.atomikos.common.utils.JasyptUtils;
import com.mysql.cj.jdbc.MysqlXADataSource;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
* 第一数据源配置
* @author:林溪
* @date:2020年11月19日
*/
@Configuration
@MapperScan(basePackages = AtomikosConstant.FIRST_DAO, sqlSessionFactoryRef = AtomikosConstant.FIRST_SESSIONFACTORY)
@Slf4j
public class FirstDataSourceConfig {

@Autowired
private FirstDbData firstDbData;

/**
* first数据源配置
* @author OprCalf
* @return DataSource
*/
@Bean(AtomikosConstant.FIRST_DATASOURCE)
@Primary
public DataSource firstDataSource() {
final MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(firstDbData.getFirstUrl());
mysqlXaDataSource.setPassword(JasyptUtils.decryptMsg(firstDbData.getJasyptPassword(), firstDbData.getFirstPassword()));
mysqlXaDataSource.setUser(firstDbData.getFirstUsername());
final AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName(AtomikosConstant.FIRST_DATASOURCE);
xaDataSource.setPoolSize(firstDbData.getMaxPoolPreparedStatementPerConnectionSize());
xaDataSource.setMinPoolSize(firstDbData.getMinIdle());
xaDataSource.setMaxPoolSize(firstDbData.getMaxActive());
xaDataSource.setMaxIdleTime(firstDbData.getMinIdle());
xaDataSource.setMaxLifetime(firstDbData.getMinEvictableIdleTimeMillis());
xaDataSource.setConcurrentConnectionValidation(true);
xaDataSource.setTestQuery("select 1 from dual");
log.info("初始化第一数据库成功");
return xaDataSource;
}

/**
* 创建第一个SqlSessionFactory
* @param firstDataSource
* @return
* @throws Exception
*/
@Primary
@Bean(AtomikosConstant.FIRST_SESSIONFACTORY)
@SneakyThrows(Exception.class)
public SqlSessionFactory firstSqlSessionFactory(@Qualifier(AtomikosConstant.FIRST_DATASOURCE) DataSource firstDataSource) {
final MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
bean.setDataSource(firstDataSource);
// 设置mapper位置
bean.setTypeAliasesPackage(AtomikosConstant.FIRST_MODELS);
// 设置mapper.xml文件的路径
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(AtomikosConstant.FIRST_MAPPER));
return bean.getObject();
}

}


配置第二个数据源

/**
* All rights Reserved, Designed By 林溪
* Copyright: Copyright(C) 2016-2020
*/

package com.boots.atomikos.common.config;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.boots.atomikos.common.constants.AtomikosConstant;
import com.boots.atomikos.common.data.SecondDbData;
import com.boots.atomikos.common.utils.JasyptUtils;
import com.mysql.cj.jdbc.MysqlXADataSource;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
* 第二数据源配置
* @author:林溪
* @date:2020年11月19日
*/
@Configuration
@MapperScan(basePackages = AtomikosConstant.SECOND_DAO, sqlSessionFactoryRef = AtomikosConstant.SECOND_SESSIONFACTORY)
@Slf4j
public class SecondDataSourceConfig {

@Autowired
private SecondDbData secondDbData;

/**
* second数据源配置
* @author OprCalf
* @return DataSource
*/
@Bean(AtomikosConstant.SECOND_DATASOURCE)
public DataSource secondDataSource() {
// 使用mysql的分布式驱动,支持MySql5.*、MySql8.* 以上版本
final MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(secondDbData.getSecondUrl());
mysqlXaDataSource.setPassword(JasyptUtils.decryptMsg(secondDbData.getJasyptPassword(), secondDbData.getSecondPassword()));
mysqlXaDataSource.setUser(secondDbData.getSecondUsername());
final AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName(AtomikosConstant.SECOND_DATASOURCE);
xaDataSource.setPoolSize(secondDbData.getMaxPoolPreparedStatementPerConnectionSize());
xaDataSource.setMinPoolSize(secondDbData.getMinIdle());
xaDataSource.setMaxPoolSize(secondDbData.getMaxActive());
xaDataSource.setMaxIdleTime(secondDbData.getMinIdle());
xaDataSource.setMaxLifetime(secondDbData.getMinEvictableIdleTimeMillis());
xaDataSource.setConcurrentConnectionValidation(true);
xaDataSource.setTestQuery("select 1 from dual");
log.info("初始化第二数据库成功");
return xaDataSource;
}

/**
* 创建第一个SqlSessionFactory
* @param secondDataSource
* @return
* @throws Exception
*/
@Bean(AtomikosConstant.SECOND_SESSIONFACTORY)
@SneakyThrows(Exception.class)
public SqlSessionFactory secondSqlSessionFactory(@Qualifier(AtomikosConstant.SECOND_DATASOURCE) DataSource secondDataSource) {
final MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
bean.setDataSource(secondDataSource);
// 设置mapper位置
bean.setTypeAliasesPackage(AtomikosConstant.SECOND_MODELS);
// 设置mapper.xml文件的路径
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(AtomikosConstant.SECOND_MAPPER));
return bean.getObject();
}

}


配置数据源管理器

/**
* All rights Reserved, Designed By 林溪
* Copyright: Copyright(C) 2016-2020
*/

package com.boots.atomikos.common.config;

import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;

import lombok.SneakyThrows;

/**
* Atomikos事务管理器
* @author:林溪
* @date:2020年11月17日
*/
@Configuration
@EnableTransactionManagement
public class AtomikosConfig {

/**
* 初始化JTA事务管理器
* @author 林溪
* @return UserTransaction
*/
@Bean(name = "userTransaction")
@SneakyThrows(Exception.class)
public UserTransaction userTransaction() {
final UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(20000);
return userTransactionImp;
}

/**
* 初始化Atomikos事务管理器
* @author 林溪
* @return TransactionManager
*/
@Bean(name = "atomikosTransactionManager")
@SneakyThrows(Exception.class)
public TransactionManager atomikosTransactionManager() {
final UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}

/**
* 加载事务管理
* @author 林溪
* @param atomikosTransactionManager
* @param userTransaction
* @return PlatformTransactionManager
*/
@Bean(name = "transactionManager")
@SneakyThrows(Throwable.class)
public PlatformTransactionManager transactionManager(@Qualifier("atomikosTransactionManager") TransactionManager atomikosTransactionManager, @Qualifier("userTransaction") UserTransaction userTransaction) {
return new JtaTransactionManager(userTransaction(), atomikosTransactionManager());
}

}


配置常量

/**
* All rights Reserved, Designed By 林溪
* Copyright: Copyright(C) 2016-2020
*/

package com.boots.atomikos.common.constants;

/**
* 分布式事务常量
* @author:林溪
* @date:2020年11月16日
*/

public class AtomikosConstant {

/*****************第一数据库配置****************************/

// 数据源配置
public final static String FIRST_DATASOURCE = "firstDataSource";

// 会话工厂配置
public final static String FIRST_SESSIONFACTORY = "firstSessionFactory";

// 映射接口配置
public final static String FIRST_DAO= "com.boots.atomikos.business.afuser.dao";

// 数据对象路径
public final static String FIRST_MODELS = "com.boots.atomikos.business.afuser.model";

// 映射目录配置
public final static String FIRST_MAPPER = "classpath:mappers/AfUserMapper.xml";

/*****************第二数据库配置****************************/

// 数据源配置
public final static String SECOND_DATASOURCE = "secondDataSource";

// 会话工厂配置
public final static String SECOND_SESSIONFACTORY = "secondSessionFactory";

// 映射接口配置
public final static String SECOND_DAO= "com.boots.atomikos.business.afcustomer.dao";

// 数据对象路径
public final static String SECOND_MODELS = "com.boots.atomikos.business.afcustomer.model";

// 映射目录配置
public final static String SECOND_MAPPER = "classpath:mappers/AfCustomerMapper.xml";

}


配置信息

######配置基本信息######
##配置应用名称
spring.application.name: boots-atomikos
##配置时间格式,为了避免精度丢失,全部换成字符串
spring.jackson.timeZone: GMT+8
spring.jackson.dateFormat: yyyy-MM-dd HH:mm:ss
spring.jackson.generator.writeNumbersAsStrings: true
#####配置数据源#######
first.datasource.url: jdbc:mysql://127.0.0.1:3306/atomikos_first?autoReconnect=true&useSSL=false&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowMultiQueries=true
first.datasource.username: root
first.datasource.password: yiOtQ2YkCWwOvRNmLI4eaPG/fx/q3AIB20JFFz87T96+udBorAm0tNxI2YKfFdeA
#####配置数据源#######
second.datasource.url: jdbc:mysql://127.0.0.1:3306/atomikos_second?autoReconnect=true&useSSL=false&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowMultiQueries=true
second.datasource.username: root
second.datasource.password: yiOtQ2YkCWwOvRNmLI4eaPG/fx/q3AIB20JFFz87T96+udBorAm0tNxI2YKfFdeA

运行测试

我们定义了一个接口并实现该接口,定义了一个test方法,根据不同情况手动抛出异常,在运行后可以直接看到数据并没有被插入到数据中

/**
* All rights Reserved, Designed By 林溪开源
* Copyright: Copyright(C) 2016-2020
* Company 林溪开源 Ltd.
*/

package com.boots.atomikos.business.afcustomer.service.impl;

import javax.transaction.Transactional;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.boots.atomikos.business.afcustomer.dao.IAfCustomerDao;
import com.boots.atomikos.business.afcustomer.model.AfCustomer;
import com.boots.atomikos.business.afcustomer.service.IAfCustomerService;
import com.boots.atomikos.business.afuser.dao.IAfUserDao;
import com.boots.atomikos.business.afuser.model.AfUser;
import com.module.boots.exception.CommonRuntimeException;

/**
* 客户表逻辑服务实现层
* @author:林溪
* @date: 2020年11月17日
*/
@Service
public class AfCustomerServiceImpl extends ServiceImpl<IAfCustomerDao, AfCustomer> implements IAfCustomerService {

@Autowired
private IAfCustomerDao afCustomerDao;

@Autowired
private IAfUserDao afUserDao;

@Override
@Transactional(rollbackOn = CommonRuntimeException.class)
public void test() {
final AfCustomer afCustomer = AfCustomer.builder().customerName("客户1").build();
final AfUser afUser = AfUser.builder().userName("用户1").build();
final int i = afCustomerDao.insert(afCustomer);
if (i > 0) {
throw new CommonRuntimeException("新增失败");
}
afUserDao.insert(afUser);
}

}


总结

实验结果测试没问题,这里就不贴出来,有兴趣的同学可以通过以下获取源码

h ttps://gitee.com/lemeno/boots

--END--

作者: @互联网应用架构

原创作品,抄袭必究

如需要源码,转发,关注后私信我

部分图片或代码来源网络,如侵权请联系删除,谢谢!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK