2

(WebFlux)003、多数据源R2dbc事务失效分析 - 编号94530

 1 year ago
source link: https://www.cnblogs.com/lifacheng/p/16631515.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

最近项目持续改造,然后把SpringMVC换成了SpringWebflux,然后把Mybatis换成了R2dbc。中间没有遇到什么问题,一切都那么的美滋滋,直到最近一个新需求的出现,打破了往日的宁静。

在对需求分析了一番后,需要引入新的数据源,那就是MongoDb。然后出现了MongoDb、Mysql两种数据源,然后原来好好的事物操作就芭比Q(完蛋)了。细细来分析一下原因与解决方法。

题外话:在本地测试的时候强烈建议用虚拟机+Docker来安装MySql与MongoDb,不然Mac直连docker真的麻烦啊!!~

SpringBoot 版本号: 2.6.10, (本文基于已经会在项目中使用R2DBC与MongoDb)

二、武松打虎

2.1 单独solo Mysql

我们创建了一个测试库r2dbc_test,里面有一个user表。

# 创建测试库
create database r2dbc_test;

# 创建表
create table r2dbc_test.user(
    id int auto_increment primary key ,
    name varchar(12)
);

2.1.1 项目引入R2dbc

略..给出链接,如果感兴趣可以看看,Spring Data R2DBC,(实在太多,这个时间点懒得写了,后面有时间再补一下),

2.1.2 测试代码

创建表对结构对应实体类:user

@Data
@Table("user")
@NoArgsConstructor
@AllArgsConstructor
public class User implements Persistable<Integer> {
    @Id
    private Integer id;
    private String name;

    @Override
    public boolean isNew() {
        return true;
    }
}

这里面有个坑点,那就是为什么实现org.springframework.data.domain.Persistable这个接口呢,先卖个关子,看完Repository后在描述哈。

Repository如下代码所示。

/**
 * <br>User Repository</br>
 *
 * @author [email protected]
 * @since 2022/8/26
 */
@Repository
public interface UserR2dbcRepository extends R2dbcRepository<User, Integer> {

}

我们直接使用了Spring提供好的org.springframework.data.r2dbc.repository.R2dbcRepository,里面有一些基础的实现类。我们在测试的时候使用了org.springframework.data.repository.reactive.ReactiveCrudRepository#save()方法,这个方法会去判断这个实体对象是不是new object,如果不是,则会去Update。而判断的方法则是org.springframework.data.domain.Persistable#isNew()方法。所以这就是我们为啥要实现这个接口。

接着写一个简单测试的Controller,代码如下所示。

@RestController
@EnableR2dbcRepositories
public class TransactionController {
    @Autowired
    private UserR2dbcRepository repository;
    @Autowired
    private TransactionalOperator operator;

    // 根据seed当做初始ID,初始化数据库对象, 便于测试
    @RequestMapping("/r2dbc/init")
    public Flux<User> init(Integer seed) {
        Flux<User> userFlux = Flux.range(seed, 5).map(id -> new User(id,"name" + id))
                .flatMap(repository::save);
        return userFlux;
    }

    // 先删除一条记录, 然后在添加一条记录
    @RequestMapping("/r2dbc/delete")
    public Mono<User> delete(Integer id1, Integer id2) {
        Mono<Void> id1Mono = repository.deleteById(id1);
        Mono<User> id2Mono = repository.save(new User(id2, "name" + id2));
        return id1Mono.then(id2Mono).as(operator::transactional);
    }
}

不要纠结没有service啥的哈,我们仅仅为了测试哈。两个方法

  • 方法一:init, 用seed当做起始Id, 然后在数据库生成数据存储起来
  • 方法二:delete, 先删处一条数据,然后在插入一条已存在的数据,通过数据库异常来回滚数据。

我们调用init方法,生成数据id=1和id=100以后的数据,如下图所示。

生成测试数据

为了查看我们是不是插入成功,我们查一下数据库看看。结果如下图。

查询数据库测试数据

数据看起来是没问题的哈,是我们想要的,从1-5, 100-105

2.1.3 测试事务

数据已经准备好了,我们来进行事务测试,看看现在只有R2DBC的时候,事务是否生效。

我们来删除id=1,然后保存id=100的情况试一下看看。结果如图所示。

删除事物操作

通过日志,我们看到结果的确是我们想要的,当id2=100的时候,抛出了Dulicate entry异常, 那我们在查询一下数据库,看看数据库的数据是否有删除掉。

结果还是用图展示。

发生删除异常

我们通过查看数据库的查询记录,发现id=1数据没有删除。那也说明了事务是生效的,在正常情况下,发生异常不会提交事务。

2.2 引入MongoDb

略...感兴趣的老哥参考Spring Data MongoDb引入MongoDB

2.2.1 开启MongoDb事务

官方文档中有这样一句话:

Unless you specify a MongoTransactionManager within your application context, transaction support is DISABLED. You can use setSessionSynchronization(ALWAYS) to participate in ongoing non-native MongoDB transactions.

需要手动指定MongoTransactionManager,否则不可用。 引入事务,参考文档,需要如下代码。

@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {  
  return new MongoTransactionManager(dbFactory);
}

我们按照文档指示,在项目中添加了如下代码。因为我们用的是Webflux,所以我们创建的是Reactive的。

@EnableReactiveMongoRepositories
@Configuration
public class MongoConfig {
    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
        return new ReactiveMongoTransactionManager(dbFactory);
    }
}

这样,我们MongoDB的事物也搞定了,直接美滋滋,上手开干CRUD。

2.2.2 再来一次----测试数据删除

我们引入了新的数据源,本该美滋滋的,但是,问题也来了。我们在来进行一次数据删除操作。这次删除,我们修改一下Id,删除id=2和添加id=102的。测试如下图所示。

删除Id=2和添加Id=102的数据

我们再一次看到了同样的情况,抛出了异常Duplicate entry,是我们预期的结果。那我们接着看看数据库的数据。如下图所示。

添加Mongo后删除数据

这个时候我们在查询数据,发现id=2的数据已经被删除了。这次事务没有回滚! 真是F了个K,啥情况呢?我们得一探究竟。

三、智取谜底

我们带着问题来找原因,现在事务失效了,项目能起来,没有报错。那么最有的可能那就是TransactionalOperator失效了,TransactionalOperator是Spring帮我们初始化的,我们要找问题,那就得要看看这个TransactionalOperator是如何初始化的了

3.1 看源码找原因

3.1.1 从根本入手

我们直接从TransactionalOperator代码进入,发现其需要传入ReactiveTransactionManager,部分代码如下。

final class TransactionalOperatorImpl implements TransactionalOperator {

	private final ReactiveTransactionManager transactionManager;
	private final TransactionDefinition transactionDefinition;

	/**
	 * Construct a new TransactionTemplate using the given transaction manager,
	 * taking its default settings from the given transaction definition.
	 * @param transactionManager the transaction management strategy to be used
	 * @param transactionDefinition the transaction definition to copy the
	 * default settings from. Local properties can still be set to change values.
	 */
	TransactionalOperatorImpl(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
		
		this.transactionManager = transactionManager;
		this.transactionDefinition = transactionDefinition;
	}
}

按照一般逻辑来说,事务是放在TransactionManager中来管理的,这个符合我们的预期,我们接着看看TransactionManager的实现类有哪些。经过查看,发现有R2dbcTransactionManager实现。如下图所示。

![TransactionManager实现类]](https://img2022.cnblogs.com/blog/1495071/202208/1495071-20220827211141944-495282422.jpg)

3.1.2 按照猜想继续

我们找到了R2dbcTransactionManager,那我们就有两个思路。

1、查看其实现方式,有哪些需要我们关注的,哪些因素是可能造成事务不生效。

2、启动方式。因为R2dbcTransactionManager初始化是交由SpringBoot实现,那会不会有什么特别之处。

3.1.2.1 思路1

我们打开R2dbcTransactionManager代码,发现其实现没有特别之处。部分代码如下。

public class R2dbcTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {

	@Nullable
	private ConnectionFactory connectionFactory;
	/**
	 * Create a new {@code R2dbcTransactionManager} instance.
	 * A ConnectionFactory has to be set to be able to use it.
	 * @see #setConnectionFactory
	 */
	public R2dbcTransactionManager() {}
	/**
	 * Create a new {@code R2dbcTransactionManager} instance.
	 * @param connectionFactory the R2DBC ConnectionFactory to manage transactions for
	 */
	public R2dbcTransactionManager(ConnectionFactory connectionFactory) {
		this();
		setConnectionFactory(connectionFactory);
		afterPropertiesSet();
	}
}

可以看到,无参初始化可以不需要ConnectionFactory,也可以传入ConnectionFactory进行初始化。 也没有什么特别之处。

3.1.2.2 思路2

我们看完其实现,并没有特别之处,那就看它初始化有什么特别的地方。Double Shift 来一波,我们看到了有AutoConfiguration,来让我们瞧一瞧。

R2dbcTransactionManagerAutoConfiguration

我们点进去瞧一瞧,便发现了端倪,嘴上一句 原来如此 蹦了出来。部分代码如下。

public class R2dbcTransactionManagerAutoConfiguration {
	@Bean
	@ConditionalOnMissingBean(ReactiveTransactionManager.class)
	public R2dbcTransactionManager connectionFactoryTransactionManager(ConnectionFactory connectionFactory) {
		return new R2dbcTransactionManager(connectionFactory);
	}
}

我们看到,其初始化的时候,采用了ConditionalOnMissingBean,只有在没有ReactiveTransactionManager的时候才会初始化。但是我们在初始化MongoDB事务的时候,已经初始化过ReactiveTransactionManager了啊!赶紧看看ReactiveMongoTransactionManager。

打开ReactiveMongoTransactionManager代码,果然如此。代码如下。

public class ReactiveMongoTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {
	// ...略
}

AbstractReactiveTransactionManager这个不就是ReactiveTransactionManager嘛, 已经初始化过一次了,所以导致R2dbcTransactionManager无法进行初始化,所以TransactionalOperatorImpl里面传入的不是R2dbcTransactionManager,那肯定对mysql无法失误操作了啊。

3.1.3 怎么办?

至此,我们已经找到原因了,但是,这也紧紧是猜想。我们还是得分2步骤来啊!!

  • 1、针对问题,提出具体的解决方案,并实现
  • 2、针对实现的方案进行验证
3.1.3.1 解决方案

我们知道事务没有实现的原因是R2dbcTransactionManager没有初始化,然后再TransactionalOperatorImpl种注入的不是R2dbcTransactionManager,那么我们就自己动手初始化Bean。

我们创建2个对象,分别为MongoConfig和R2dbcConfig,代码如下所示。

R2dbcConfig:

/**
 * <br>r2dbc 配置</br>
 *
 * @author [email protected]
 * @since 2022/8/27
 */
@EnableR2dbcRepositories
@Configuration
public class R2dbcConfig {
  
    @Bean("r2dbcTransactionManager")
    public R2dbcTransactionManager transactionManager(ConnectionFactory pool) {
        return new R2dbcTransactionManager(pool);
    }

    @Bean("r2dbcTransactionalOperator")
    public TransactionalOperator transactionalOperator(R2dbcTransactionManager transactionManager){
        return TransactionalOperator.create(transactionManager);
    }
}

MongoConfig:

/**
 * <br>mongo transaction manager</br>
 *
 * @author [email protected]
 * @since 2022/8/27
 */
@EnableReactiveMongoRepositories
@Configuration
public class MongoConfig {

    @Bean("mongoTransactionManager")
    public ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
        return new ReactiveMongoTransactionManager(dbFactory);
    }

    @Bean("mongoTransactionalOperator")
    public TransactionalOperator transactionalOperator(ReactiveMongoTransactionManager transactionManager){
        return TransactionalOperator.create(transactionManager);
    }
}

我们通过别名的方式,创建两个TransactionalOperator,这样就可以解决R2bdc无法自动创建TransactionManager的问题。

3.1.3.2 验证

我们在Controller中的TransactionalOperator指定名称。代码如下所示。

@RestController
public class TransactionController {
    @Autowired
    private UserR2dbcRepository repository;
    
    @Autowired
    @Qualifier("r2dbcTransactionalOperator") // 在这指定使用哪个operator
    private TransactionalOperator operator;
		// ... 略
}

指定了具体的名称,我们就可以接着在来测试一次。这次我们删除Id=3,然后添加id=103的数据试试看。测试过程如下图。

删除Id=3,添加Id=103数据

还是和我们刚一下,出现了Duplicate entry的问题。我们要关注事物是否回滚。

接下来就是激动人心的时刻,我们直接查库,看看事务是否回滚了。结果如下图所示。

验证结果

哇喔!棒!我们看到,数据库查询出来的结果中还是包含了Id=3的数据,那完全说明了事务回滚了!

至此我们的问题算是完全解决了,舒坦!(心里长舒一口气,解决问题就这么简单?)

3.2 偷鸡

看了这么多,我们都是手动,一步步验证结果的,哪有没有快捷的方式呢?说到这,那肯定是有的。

在使用R2dbc的时候,我们其实是没有添加日志的。我们可以打开日志。可以看到操作是记录了完整的日志。我们添加日志配置(log配置文件自己添加一下)。

logging.level.org.springframework.r2dbc=debug

3.2.1 再次验证

添加完日志,我们在执行一下删除id=3,添加id=104的操作,看看日志记录了什么。贴出来测试结果。

1495071-20220827211259305-22753168.jpg

我们可以看到,日志中清晰的记录着,创建事务,回滚事务!完全验证了我们的操作方案是对的,NO爬不浪~!

上述的所有操作,都可以通过日志验证,我就不一步步验证,大家可以自己试验一下~

在使用新东西的时候,还是要多实验,验证结果!

遇到问题,不要慌,一步步来,就是干!

如有问题,欢迎指正,交流。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK