20

.NET Core开发实战(第28课:工作单元模式(UnitOfWork):管理好你的事务)--学习笔...

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzAwNTMxMzg1MA%3D%3D&%3Bmid=2654077848&%3Bidx=3&%3Bsn=d4bad088df72594080f15889720dd0e5
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

28 | 工作单元模式(UnitOfWork):管理好你的事务

工作单元模式有如下几个特性:

1、使用同一上下文

2、跟踪实体的状态

3、保障事务一致性

我们对实体的操作,最终的状态都是应该如实保存到我们的存储中,进行持久化

接下来看一下代码

为了实现工作单元模式,这里定义了一个工作单元的接口

public interface IUnitOfWork : IDisposable
{
    Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
    Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default);
}

这两个方法的区别是:一个是返回的 int 是指我们影响的数据条数,另外一个返回 bool 表示我们保存是否成功,本质上这两个方法达到的效果是相同的

另外还定义了一个事务管理的接口

public interface ITransaction
{
    // 获取当前事务
    IDbContextTransaction GetCurrentTransaction();

    // 判断当前事务是否开启
    bool HasActiveTransaction { get; }

    // 开启事务
    Task<IDbContextTransaction> BeginTransactionAsync();

    // 提交事务
    Task CommitTransactionAsync(IDbContextTransaction transaction);

    // 事务回滚
    void RollbackTransaction();
}

在实现上我们是借助 EF 来实现工作单元模式的

看一下 EFContext 的定义

/// <summary>
/// DbContext 是 EF 的基类,然后实现了 UnitOfWork 的接口和事务的接口
/// </summary>
public class EFContext : DbContext, IUnitOfWork, ITransaction
{
    protected IMediator _mediator;
    ICapPublisher _capBus;

    // 后面的章节会详细讲到这两个参数
    public EFContext(DbContextOptions options, IMediator mediator, ICapPublisher capBus) : base(options)
    {
        _mediator = mediator;
        _capBus = capBus;
    }

    #region IUnitOfWork

    public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
    {
        var result = await base.SaveChangesAsync(cancellationToken);
        //await _mediator.DispatchDomainEventsAsync(this);
        return true;
    }

    //// 可以看到这个方法实际上与上面的方法是相同的,所以这个方法可以不实现
    //public override Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = default)
    //{
    //    return base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
    //}

    #endregion

    #region ITransaction

    private IDbContextTransaction _currentTransaction;// 把当前的事务用一个字段存储

    public IDbContextTransaction GetCurrentTransaction() => _currentTransaction;// 获取当前的事务就是返回存储的私有对象

    public bool HasActiveTransaction => _currentTransaction != null;// 事务是否开启是判断当前这个事务是否为空
    
    /// <summary>
    /// 开启事务
    /// </summary>
    /// <returns></returns>
    public Task<IDbContextTransaction> BeginTransactionAsync()
    {
        if (_currentTransaction != null) return null;
        _currentTransaction = Database.BeginTransaction(_capBus, autoCommit: false);
        return Task.FromResult(_currentTransaction);
    }

    /// <summary>
    /// 提交事务
    /// </summary>
    /// <param name="transaction">当前事务</param>
    /// <returns></returns>
    public async Task CommitTransactionAsync(IDbContextTransaction transaction)
    {
        if (transaction == null) throw new ArgumentNullException(nameof(transaction));
        if (transaction != _currentTransaction) throw new InvalidOperationException($"Transaction {transaction.TransactionId} is not current");

        try
        {
            await SaveChangesAsync();// 将当前所有的变更都保存到数据库
            transaction.Commit();
        }
        catch
        {
            RollbackTransaction();
            throw;
        }
        finally
        {
            if (_currentTransaction != null)
            {
                // 最终需要把当前事务进行释放,并且置为空
                // 这样就可以多次的开启事务和提交事务
                _currentTransaction.Dispose();
                _currentTransaction = null;
            }
        }
    }

    /// <summary>
    /// 回滚
    /// </summary>
    public void RollbackTransaction()
    {
        try
        {
            _currentTransaction?.Rollback();
        }
        finally
        {
            if (_currentTransaction != null)
            {
                _currentTransaction.Dispose();
                _currentTransaction = null;
            }
        }
    }

    #endregion
}

另外一个我们还是需要关注的一点就是如何管理我们的事务

这里有一个类 TransactionBehavior,这个类是用来注入我们的事务的管理过程的,具体它是怎么工作的在后续的章节会讲到,这里先关注它的实现过程

public class TransactionBehavior<TDbContext, TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TDbContext : EFContext
{
    ILogger _logger;
    TDbContext _dbContext;
    ICapPublisher _capBus;
    public TransactionBehavior(TDbContext dbContext, ICapPublisher capBus, ILogger logger)
    {
        _dbContext = dbContext ?? throw new ArgumentNullException(nameof(dbContext));
        _capBus = capBus ?? throw new ArgumentNullException(nameof(capBus));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }


    public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
    {
        var response = default(TResponse);
        var typeName = request.GetGenericTypeName();

        try
        {
            // 首先判断当前是否有开启事务
            if (_dbContext.HasActiveTransaction)
            {
                return await next();
            }

            // 定义了一个数据库操作执行的策略,比如说可以在里面嵌入一些重试的逻辑,这里创建了一个默认的策略
            var strategy = _dbContext.Database.CreateExecutionStrategy();

            await strategy.ExecuteAsync(async () =>
            {
                Guid transactionId;
                using (var transaction = await _dbContext.BeginTransactionAsync())
                using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId))
                {
                    _logger.LogInformation("----- 开始事务 {TransactionId} ({@Command})", transaction.TransactionId, typeName, request);

                    response = await next();// next 实际上是指我们的后续操作,这里的模式有点像之前讲的中间件模式

                    _logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName);


                    await _dbContext.CommitTransactionAsync(transaction);

                    transactionId = transaction.TransactionId;
                }
            });

            return response;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request);

            throw;
        }
    }
}

回过头来看一下我们的 EFContext,EFContext 实现 IUnitOfWork,工作单元模式的核心,它实现了事务的管理和工作单元模式,我们就可以借助 EFContext 来实现我们的仓储层

ARZRF3J.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK