7

Shardingsphere 整合 Narayana 对 XA 分布式事务的支持(4)

 3 years ago
source link: https://www.infoq.cn/article/TcTXQWMYzxWHmBIu2CBZ
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Apache ShardingSphere 是一套开源的分布式数据库中间件解决方案组成的生态圈,它由 JDBC、Proxy 和 Sidecar(规划中)这 3 款相互独立,却又能够混合部署配合使用的产品组成。它们均提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如 Java 同构、异构语言、云原生等各种多样化的应用场景。

ShardingSphere 已于 2020 年 4 月 16 日成为 Apache 软件基金会的顶级项目。

Narayana 简单介绍

Narayana( https://narayana.io/ ),是由 Jboss 团队提供的 XA 分布式事务的解决方案。

它具有以下特点:

  • 标准的基于JTA实现。

  • TransactionManager(TM) 完全去中心化设计,与业务耦合,无需单独部署。

  • 事务日志支持数据库存储,支持集群模式下的事务恢复。

ShardingTransactionManager 初始化 XATransactionDataSource 流程

ShardingSphere 对 XA 的支持提供一整套的 SPI 接口,在初始化话的时候,根据事务类型,先进行 TransactionManager 的初始化。我们先进入 org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager 。代码如下:

 private final Map<String, XATransactionDataSource> cachedDataSources = new HashMap<>();
private final XATransactionManager xaTransactionManager = XATransactionManagerLoader.getInstance().getTransactionManager();
@Override public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) { for (ResourceDataSource each : resourceDataSources) { cachedDataSources.put(each.getOriginalName(), new XATransactionDataSource(databaseType, each.getUniqueResourceName(), each.getDataSource(), xaTransactionManager)); } // Narayana的初始化 xaTransactionManager.init(); }

复制代码

  • 首先会根据配置的datasource将其转换成XATransactionDataSource,具体代码在 new XATransactionDataSource(databaseType, each.getUniqueResourceName(), each.getDataSource(), xaTransactionManager)) 。我们跟进去,代码如下:

public XATransactionDataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource, final XATransactionManager xaTransactionManager) {        this.databaseType = databaseType;        this.resourceName = resourceName;        this.dataSource = dataSource;        if (!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {            // 重点关注 1 ,返回了xaDatasource            xaDataSource = XADataSourceFactory.build(databaseType, dataSource);            this.xaTransactionManager = xaTransactionManager;            // 重点关注2 注册资源            xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource);        }    }

复制代码

  • 我们重点来关注 XADataSourceFactory.build(databaseType, dataSource) ,从名字我们就可以看出,这应该是返回 JTA规范里面的XADataSource ,在ShardingSphere里面很多的功能,可以从代码风格的命名上就能猜出来,这就是优雅代码(吹一波)。不多逼逼,我们进入该方法。

public final class XADataSourceFactory {
public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) { return new DataSourceSwapper(XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType)).swap(dataSource); }}

复制代码

  • 首先又是一个SPI定义的 XADataSourceDefinitionFactory ,它根据不同的数据库类型,来加载不同的方言。然后我们进入 swap 方法。

 public XADataSource swap(final DataSource dataSource) {        XADataSource result = createXADataSource();        setProperties(result, getDatabaseAccessConfiguration(dataSource));        return result;    }

复制代码

  • 很简明,第一步创建, XADataSource ,第二步给它设置属性(包含数据的连接,用户名密码等),然后返回。

Narayana 初始化过程详解

BzmM73n.png!mobile

我们首先进入 org.apache.shardingsphere.transaction.xa.narayana.manager.NarayanaXATransactionManager

public final class NarayanaXATransactionManager implements XATransactionManager {    //加载transactionManger    private final TransactionManager transactionManager = jtaPropertyManager.getJTAEnvironmentBean().getTransactionManager();
//获取事务恢复模块 private final XARecoveryModule xaRecoveryModule = XARecoveryModule.getRegisteredXARecoveryModule();
private final RecoveryManagerService recoveryManagerService = new RecoveryManagerService();
@Override public void init() { RecoveryManager.delayRecoveryManagerThread(); recoveryManagerService.create();//开启事务恢复 recoveryManagerService.start(); }
@Override public void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { xaRecoveryModule.addXAResourceRecoveryHelper(new DataSourceXAResourceRecoveryHelper(xaDataSource)); }
@Override public void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { xaRecoveryModule.removeXAResourceRecoveryHelper(new DataSourceXAResourceRecoveryHelper(xaDataSource)); }
@SneakyThrows({SystemException.class, RollbackException.class}) @Override public void enlistResource(final SingleXAResource singleXAResource) { transactionManager.getTransaction().enlistResource(singleXAResource.getDelegate()); }
@Override public TransactionManager getTransactionManager() { return transactionManager; }
@Override public void close() throws Exception { recoveryManagerService.stop(); recoveryManagerService.destroy(); }}

复制代码

  • 首先我们关注 jtaPropertyManager.getJTAEnvironmentBean().getTransactionManager() 获取TransactionManager,这是整个 Narayana初始化的核心。进入代码 com.arjuna.common.internal.util.propertyservice.BeanPopulator.getNamedInstance()

private static <T> T getNamedInstance(Class<T> beanClass, String name, Properties properties) throws RuntimeException {        StringBuilder sb = new StringBuilder().append(beanClass.getName());        if (name != null)           sb.append(":").append(name);        String key = sb.toString();        // we don't mind sometimes instantiating the bean multiple times,        // as long as the duplicates never escape into the outside world.        if(!beanInstances.containsKey(key)) {            T bean = null;            try {               // 初始化 JTAEnvironmentBean 这个类                bean = beanClass.newInstance();                if (properties != null) {                    configureFromProperties(bean, name, properties);                } else {                    //初始化属性配置                    Properties defaultProperties = PropertiesFactory.getDefaultProperties();                    configureFromProperties(bean, name, defaultProperties);                }            } catch (Throwable e) {                throw new RuntimeException(e);            }            beanInstances.putIfAbsent(key, bean);        }        return (T) beanInstances.get(key);    }

复制代码

  • 我们重点关注 Properties defaultProperties = PropertiesFactory.getDefaultProperties(); 。最后会进入 com.arjuna.common.util.propertyservice.AbstractPropertiesFactory.getPropertiesFromFile()

 public Properties getPropertiesFromFile(String propertyFileName, ClassLoader classLoader) {        String propertiesSourceUri = null;        try        {            // 文件名称为:jbossts-properties.xml 加载顺序为:This is the point where the search path is applied - user.dir (pwd), user.home, java.home, classpath            propertiesSourceUri = com.arjuna.common.util.propertyservice.FileLocator.locateFile(propertyFileName, classLoader);        }        catch(FileNotFoundException fileNotFoundException)        {            // try falling back to a default file built into the .jar            // Note the default- prefix on the name, to avoid finding it from the .jar at the previous stage            // in cases where the .jar comes before the etc dir on the classpath.            URL url = AbstractPropertiesFactory.class.getResource("/default-"+propertyFileName);            if(url == null) {            commonLogger.i18NLogger.warn_could_not_find_config_file(url);            } else {                propertiesSourceUri = url.toString();            }        }        catch (IOException e)        {            throw new RuntimeException("invalid property file "+propertiesSourceUri, e);        }        Properties properties = null;        try {            if (propertiesSourceUri != null) {               //加载配置文件                properties = loadFromFile(propertiesSourceUri);            }            // 叠加系统配置属性            properties = applySystemProperties(properties);        } catch(Exception e) {            throw new RuntimeException("unable to load properties from "+propertiesSourceUri, e);        }        return properties;    }

复制代码

  • 加载文件名称为 jbossts-properties.xml , 加载路径优先级别为 :user.dir > user.home >java.home >classpath。最后再叠加上系统属性,然后返回。

我们再来看一下 jbossts-properties.xml 的参考格式如下:

<properties>    <entry key="CoordinatorEnvironmentBean.commitOnePhase">YES</entry>    <entry key="ObjectStoreEnvironmentBean.objectStoreType">com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCStore</entry>    <entry key="ObjectStoreEnvironmentBean.jdbcAccess">com.arjuna.ats.internal.arjuna.objectstore.jdbc.accessors.DynamicDataSourceJDBCAccess;ClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource;DatabaseName=jbossts;ServerName=172.25.4.62;PortNumber=3306;User=j_jbossts;Password=9MfNHoRncCi8</entry>    <entry key="ObjectStoreEnvironmentBean.tablePrefix">Action</entry>    <entry key="ObjectStoreEnvironmentBean.dropTable">true</entry>    <entry key="ObjectStoreEnvironmentBean.stateStore.objectStoreType">com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCStore</entry>    <entry key="ObjectStoreEnvironmentBean.stateStore.jdbcAccess">com.arjuna.ats.internal.arjuna.objectstore.jdbc.accessors.DynamicDataSourceJDBCAccess;ClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource;DatabaseName=jbossts;ServerName=172.25.4.62;PortNumber=3306;User=j_jbossts;Password=9MfNHoRncCi8</entry>    <entry key="ObjectStoreEnvironmentBean.stateStore.tablePrefix">stateStore</entry>    <entry key="ObjectStoreEnvironmentBean.stateStore.dropTable">true</entry>    <entry key="ObjectStoreEnvironmentBean.communicationStore.objectStoreType">com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCStore</entry>    <entry key="ObjectStoreEnvironmentBean.communicationStore.jdbcAccess">com.arjuna.ats.internal.arjuna.objectstore.jdbc.accessors.DynamicDataSourceJDBCAccess;ClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource;DatabaseName=jbossts;ServerName=172.25.4.62;PortNumber=3306;User=j_jbossts;Password=9MfNHoRncCi8</entry>    <entry key="ObjectStoreEnvironmentBean.communicationStore.tablePrefix">Communication</entry>    <entry key="ObjectStoreEnvironmentBean.communicationStore.dropTable">true</entry>    <entry key="ObjectStoreEnvironmentBean.transactionSync">ON</entry>    <entry key="CoreEnvironmentBean.nodeIdentifier">1</entry>    <entry key="JTAEnvironmentBean.xaRecoveryNodes">1</entry>    <entry key="JTAEnvironmentBean.xaResourceOrphanFilterClassNames">        com.arjuna.ats.internal.jta.recovery.arjunacore.JTATransactionLogXAResourceOrphanFilter        com.arjuna.ats.internal.jta.recovery.arjunacore.JTANodeNameXAResourceOrphanFilter        com.arjuna.ats.internal.jta.recovery.arjunacore.JTAActionStatusServiceXAResourceOrphanFilter    </entry>    <entry key="CoreEnvironmentBean.socketProcessIdPort">0</entry>    <entry key="RecoveryEnvironmentBean.recoveryModuleClassNames">        com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule        com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule    </entry>    <entry key="RecoveryEnvironmentBean.expiryScannerClassNames">        com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner    </entry>    <entry key="RecoveryEnvironmentBean.recoveryPort">4712</entry>    <entry key="RecoveryEnvironmentBean.recoveryAddress"></entry>    <entry key="RecoveryEnvironmentBean.transactionStatusManagerPort">0</entry>    <entry key="RecoveryEnvironmentBean.transactionStatusManagerAddress"></entry>    <entry key="RecoveryEnvironmentBean.recoveryListener">NO</entry>    <entry key="RecoveryEnvironmentBean.recoveryBackoffPeriod">1</entry></properties>

复制代码

它被视为标准 java.util.Properties 文件的 XML 格式并按需加载。entry 名称的形式为: 类名.属性名 。提供的配置类都在 com.arjuna.ats.arjuna.common 包下,以 bean 结尾的实体类。

  • 文件加载后,它会被缓存,直到JVM重新启动才重新读取。对属性文件的更改需要重新启动才能生效

  • 在属性加载之后,将检查EnvironmentBean,对于每个字段,如果属性在搜索顺序中包含如下匹配的键,则使用属性的值调用该字段的setter方法,或者使用不同的系统属性调用该字段的setter方法。

  • 然后将bean返回给调用者,调用者可以通过调用setter方法进一步覆盖值。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK