fixed transaction bug published x.3.1.06

This commit is contained in:
xuejiaming 2021-10-20 11:08:44 +08:00
parent 4b69cd6b76
commit 6539e1e74a
30 changed files with 590 additions and 371 deletions

View File

@ -200,7 +200,8 @@ or
{
o.CreateShardingTableOnStart = true;//create sharding table
o.EnsureCreatedWithOutShardingTable = true;//create data source with out sharding table
})
}) .AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection))
.AddDefaultDataSource("ds0", "Data Source=localhost;Initial Catalog=ShardingCoreDB1;Integrated Security=True;")
.AddShardingTableRoute(o =>
{
@ -389,6 +390,8 @@ or
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
})
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection))
.AddDefaultDataSource("ds0","Data Source=localhost;Initial Catalog=ShardingCoreDBxx0;Integrated Security=True;")
.AddShardingDataSource(sp =>
{

View File

@ -1,8 +1,8 @@
:start
::定义版本
set EFCORE2=2.3.1.05
set EFCORE3=3.3.1.05
set EFCORE5=5.3.1.05
set EFCORE2=2.3.1.06
set EFCORE3=3.3.1.06
set EFCORE5=5.3.1.06
::删除所有bin与obj下的文件
@echo off

View File

@ -32,6 +32,8 @@ namespace Sample.BulkConsole
o.EnsureCreatedWithOutShardingTable = true;
o.AutoTrackEntity = true;
})
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection).UseLoggerFactory(efLogger))
.AddDefaultDataSource("ds0", "Data Source=localhost;Initial Catalog=MyOrderSharding;Integrated Security=True;")
.AddShardingTableRoute(op=> {
op.AddShardingTableRoute<OrderVirtualRoute>();

View File

@ -26,6 +26,8 @@ namespace Sample.Migrations
o.EnsureCreatedWithOutShardingTable = false;
o.AutoTrackEntity = true;
})
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection))
.AddDefaultDataSource("ds0",
"Data Source=localhost;Initial Catalog=ShardingCoreDBMigration;Integrated Security=True;")
.AddShardingTableRoute(o =>

View File

@ -41,6 +41,8 @@ namespace Sample.Migrations
o.EnsureCreatedWithOutShardingTable = false;
o.AutoTrackEntity = true;
})
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection))
.AddDefaultDataSource("ds0",
"Data Source=localhost;Initial Catalog=ShardingCoreDBMigration;Integrated Security=True;")
.AddShardingTableRoute(o =>

View File

@ -73,11 +73,17 @@ namespace Sample.SqlServer
}
}
virtualDbContext.AddRange(userMods);
virtualDbContext.AddRange(SysTests);
virtualDbContext.AddRange(userSalaries);
virtualDbContext.SaveChanges();
using (var tran = virtualDbContext.Database.BeginTransaction())
{
virtualDbContext.AddRange(userMods);
virtualDbContext.AddRange(SysTests);
virtualDbContext.AddRange(userSalaries);
virtualDbContext.SaveChanges();
tran.Commit();
}
}
}
}

View File

@ -43,8 +43,8 @@ namespace Sample.SqlServer
o.ParallelQueryTimeOut=TimeSpan.FromSeconds(10);
})
//.AddShardingQuery((conStr, builder) => builder.UseSqlServer(conStr).UseLoggerFactory(efLogger))//无需添加.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking) 并发查询系统会自动添加NoTracking
//.AddShardingTransaction((connection, builder) =>
// builder.UseSqlServer(connection).UseLoggerFactory(efLogger))
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection).UseLoggerFactory(efLogger))
.AddDefaultDataSource("ds0",
"Data Source=localhost;Initial Catalog=ShardingCoreDB1;Integrated Security=True;")
.AddShardingTableRoute(o =>

View File

@ -44,6 +44,8 @@ namespace Sample.SqlServerShardingDataSource
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
})
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection).UseLoggerFactory(efLogger))
.AddDefaultDataSource("ds0","Data Source=localhost;Initial Catalog=ShardingCoreDBxx0;Integrated Security=True;")
.AddShardingDataSource(sp =>
{

View File

@ -41,6 +41,8 @@ namespace Samples.AutoByDate.SqlServer
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
})
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection))
.AddDefaultDataSource("ds0",
"Data Source=localhost;Initial Catalog=ShardingCoreDB;Integrated Security=True;")
.AddShardingTableRoute(o =>

View File

@ -219,10 +219,13 @@ namespace ShardingCore
}
public static DbContextOptionsBuilder UseSharding<TShardingDbContext>(this DbContextOptionsBuilder optionsBuilder) where TShardingDbContext : DbContext, IShardingDbContext
{
return optionsBuilder.UseShardingWrapMark().ReplaceService<IDbSetSource, ShardingDbSetSource>()
return optionsBuilder.UseShardingWrapMark()
.ReplaceService<IDbSetSource, ShardingDbSetSource>()
.ReplaceService<IQueryCompiler, ShardingQueryCompiler>()
.ReplaceService<IDbContextTransactionManager, ShardingRelationalTransactionManager<TShardingDbContext>>()
.ReplaceService<IRelationalTransactionFactory, ShardingRelationalTransactionFactory<TShardingDbContext>>();
}
public static DbContextOptionsBuilder UseShardingWrapMark(this DbContextOptionsBuilder optionsBuilder)
{

View File

@ -31,7 +31,7 @@ namespace ShardingCore.DIExtensions
}
public ShardingDefaultDataSourceBuilder<TShardingDbContext> Begin(Action<ShardingCoreBeginOptions> shardingCoreBeginOptionsConfigure)
public ShardingTransactionBuilder<TShardingDbContext> Begin(Action<ShardingCoreBeginOptions> shardingCoreBeginOptionsConfigure)
{
var shardingCoreBeginOptions = new ShardingCoreBeginOptions();
shardingCoreBeginOptionsConfigure?.Invoke(shardingCoreBeginOptions);
@ -48,7 +48,7 @@ namespace ShardingCore.DIExtensions
ShardingConfigOption.CreateShardingTableOnStart = shardingCoreBeginOptions.CreateShardingTableOnStart;
ShardingConfigOption.IgnoreCreateTableError = shardingCoreBeginOptions.IgnoreCreateTableError;
return new ShardingDefaultDataSourceBuilder<TShardingDbContext>(this);
return new ShardingTransactionBuilder<TShardingDbContext>(this);
//return new ShardingQueryBuilder<TShardingDbContext>(this);
}
//public ShardingCoreConfigBuilder<TShardingDbContext, TActualDbContext> AddDefaultDataSource(string dataSourceName, string connectionString)

View File

@ -39,6 +39,7 @@ namespace ShardingCore.DIExtensions
//添加创建TActualDbContext 的DbContextOptionsBuilder创建者
var config = new ShardingDbContextOptionsBuilderConfig<TShardingDbContext>(
_shardingCoreConfigBuilder.ShardingConfigOption.SameConnectionConfigure,
_shardingCoreConfigBuilder.ShardingConfigOption.DefaultQueryConfigure);
services
.AddSingleton<IShardingDbContextOptionsBuilderConfig<TShardingDbContext>,

View File

@ -1,32 +1,32 @@
//using System;
//using System.Collections.Generic;
//using System.Data.Common;
//using System.Text;
//using Microsoft.EntityFrameworkCore;
//using ShardingCore.Sharding.Abstractions;
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Text;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
//namespace ShardingCore.DIExtensions
//{
// /*
// * @Author: xjm
// * @Description:
// * @Date: 2021/9/19 21:13:43
// * @Ver: 1.0
// * @Email: 326308290@qq.com
// */
// public class ShardingTransactionBuilder<TShardingDbContext>
// where TShardingDbContext : DbContext, IShardingDbContext
// {
// private readonly ShardingCoreConfigBuilder<TShardingDbContext> _shardingCoreConfigBuilder;
namespace ShardingCore.DIExtensions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/19 21:13:43
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingTransactionBuilder<TShardingDbContext>
where TShardingDbContext : DbContext, IShardingDbContext
{
private readonly ShardingCoreConfigBuilder<TShardingDbContext> _shardingCoreConfigBuilder;
// public ShardingTransactionBuilder(ShardingCoreConfigBuilder<TShardingDbContext> shardingCoreConfigBuilder)
// {
// _shardingCoreConfigBuilder = shardingCoreConfigBuilder;
// }
// public ShardingDefaultDataSourceBuilder<TShardingDbContext> AddShardingTransaction(Action<DbConnection, DbContextOptionsBuilder> transactionConfigure)
// {
// _shardingCoreConfigBuilder.ShardingConfigOption.UseShardingTransaction(transactionConfigure);
// return new ShardingDefaultDataSourceBuilder<TShardingDbContext>(_shardingCoreConfigBuilder);
// }
// }
//}
public ShardingTransactionBuilder(ShardingCoreConfigBuilder<TShardingDbContext> shardingCoreConfigBuilder)
{
_shardingCoreConfigBuilder = shardingCoreConfigBuilder;
}
public ShardingDefaultDataSourceBuilder<TShardingDbContext> AddShardingTransaction(Action<DbConnection, DbContextOptionsBuilder> transactionConfigure)
{
_shardingCoreConfigBuilder.ShardingConfigOption.UseShardingTransaction(transactionConfigure);
return new ShardingDefaultDataSourceBuilder<TShardingDbContext>(_shardingCoreConfigBuilder);
}
}
}

View File

@ -24,43 +24,41 @@ namespace ShardingCore.EFCores
private readonly ISupportShardingTransaction _supportShardingTransaction;
private bool supportShardingTransaction => _supportShardingTransaction != null;
#if !EFCORE2
public ShardingRelationalTransaction(DbContext dbContext, IRelationalConnection connection, DbTransaction transaction, Guid transactionId, IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger, bool transactionOwned) : base(connection, transaction, transactionId, logger, transactionOwned)
public ShardingRelationalTransaction(ISupportShardingTransaction supportShardingTransaction, IRelationalConnection connection, DbTransaction transaction, Guid transactionId, IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger, bool transactionOwned) : base(connection, transaction, transactionId, logger, transactionOwned)
{
if (dbContext is ISupportShardingTransaction supportShardingTransaction)
{
_supportShardingTransaction = supportShardingTransaction;
}
_supportShardingTransaction = supportShardingTransaction;
}
#endif
#if EFCORE2
public ShardingRelationalTransaction(DbContext dbContext, IRelationalConnection connection, DbTransaction transaction,IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger, bool transactionOwned) : base(connection, transaction, logger, transactionOwned)
public ShardingRelationalTransaction(ISupportShardingTransaction supportShardingTransaction, IRelationalConnection connection, DbTransaction transaction,IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger, bool transactionOwned) : base(connection, transaction, logger, transactionOwned)
{
if (dbContext is ISupportShardingTransaction supportShardingTransaction)
{
_supportShardingTransaction = supportShardingTransaction;
}
_supportShardingTransaction = supportShardingTransaction;
}
#endif
protected override void ClearTransaction()
{
base.ClearTransaction();
_supportShardingTransaction.UseShardingTransaction(null);
}
//protected override void ClearTransaction()
//{
// if (_canClear)
// {
// base.ClearTransaction();
// _supportShardingTransaction.NotifyShardingTransaction(null);
// }
//}
public override void Commit()
{
base.Commit();
_supportShardingTransaction?.Commit();
_supportShardingTransaction.NotifyShardingTransaction();
}
public override void Rollback()
{
base.Rollback();
_supportShardingTransaction?.Rollback();
_supportShardingTransaction.NotifyShardingTransaction();
}
#if !EFCORE2
@ -72,6 +70,7 @@ namespace ShardingCore.EFCores
{
await _supportShardingTransaction.RollbackAsync(cancellationToken);
}
_supportShardingTransaction.NotifyShardingTransaction();
}
public override async Task CommitAsync(CancellationToken cancellationToken = new CancellationToken())
@ -81,12 +80,7 @@ namespace ShardingCore.EFCores
{
await _supportShardingTransaction.CommitAsync(cancellationToken);
}
}
protected override async Task ClearTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
await base.ClearTransactionAsync(cancellationToken);
_supportShardingTransaction.UseShardingTransaction(null);
_supportShardingTransaction.NotifyShardingTransaction();
}
#endif
}

View File

@ -1,236 +1,240 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.EntityFrameworkCore.Query.Internal;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.EntityFrameworkCore.Storage.Internal;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
//using System;
//using System.Collections.Generic;
//using System.Data;
//using System.Data.Common;
//using System.Text;
//using System.Threading;
//using System.Threading.Tasks;
//using Microsoft.EntityFrameworkCore;
//using Microsoft.EntityFrameworkCore.Internal;
//using Microsoft.EntityFrameworkCore.Query.Internal;
//using Microsoft.EntityFrameworkCore.Storage;
//using Microsoft.EntityFrameworkCore.Storage.Internal;
//using Microsoft.Extensions.DependencyInjection;
//using ShardingCore.Extensions;
//using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.EFCores
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/5 15:41:20
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingRelationalConnection : IRelationalConnection
{
private readonly IRelationalConnection _relationalConnection;
private readonly ISupportShardingTransaction _supportShardingTransaction;
//namespace ShardingCore.EFCores
//{
// /*
// * @Author: xjm
// * @Description:
// * @Date: 2021/9/5 15:41:20
// * @Ver: 1.0
// * @Email: 326308290@qq.com
// */
// public class ShardingRelationalConnection : IRelationalConnection
// {
// private readonly IRelationalConnection _relationalConnection;
// private readonly ISupportShardingTransaction _supportShardingTransaction;
#if !EFCORE2
public ShardingRelationalConnection(IRelationalConnection _relationalConnection, DbTransaction transaction)
{
this._relationalConnection = _relationalConnection;
if (Context is ISupportShardingTransaction supportShardingTransaction)
{
_supportShardingTransaction = supportShardingTransaction;
}
}
//#if !EFCORE2
// public ShardingRelationalConnection(IRelationalConnection _relationalConnection, DbTransaction transaction)
// {
// this._relationalConnection = _relationalConnection;
// if (Context is ISupportShardingTransaction supportShardingTransaction)
// {
// _supportShardingTransaction = supportShardingTransaction;
// }
// }
#endif
#if EFCORE2
private readonly Type _dbContextType;
public ShardingRelationalConnection(IRelationalConnection _relationalConnection,DbTransaction transaction,Type dbContextType)
{
this._relationalConnection = _relationalConnection;
_dbContextType = dbContextType;
if (Context is ISupportShardingTransaction supportShardingTransaction)
{
_supportShardingTransaction = supportShardingTransaction;
}
}
#endif
//#endif
//#if EFCORE2
// private readonly Type _dbContextType;
// public ShardingRelationalConnection(IRelationalConnection _relationalConnection,DbTransaction transaction,Type dbContextType)
// {
// this._relationalConnection = _relationalConnection;
// _dbContextType = dbContextType;
// if (Context is ISupportShardingTransaction supportShardingTransaction)
// {
// _supportShardingTransaction = supportShardingTransaction;
// }
// }
//#endif
public void ResetState()
{
_relationalConnection.ResetState();
}
// public void ResetState()
// {
// _relationalConnection.ResetState();
// }
#if !EFCORE2
public Task ResetStateAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _relationalConnection.ResetStateAsync(cancellationToken);
}
//#if !EFCORE2
// public Task ResetStateAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// return _relationalConnection.ResetStateAsync(cancellationToken);
// }
#endif
//#endif
public IDbContextTransaction BeginTransaction()
{
var dbContextTransaction = _relationalConnection.BeginTransaction();
_supportShardingTransaction?.UseShardingTransaction(dbContextTransaction);
return dbContextTransaction;
}
// public IDbContextTransaction BeginTransaction()
// {
// var dbContextTransaction = _relationalConnection.BeginTransaction();
// _supportShardingTransaction?.NotifyShardingTransaction(dbContextTransaction);
// return dbContextTransaction;
// }
public async Task<IDbContextTransaction> BeginTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
var dbContextTransaction = await _relationalConnection.BeginTransactionAsync(cancellationToken);
_supportShardingTransaction?.UseShardingTransaction(dbContextTransaction);
return dbContextTransaction;
}
// public async Task<IDbContextTransaction> BeginTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// var dbContextTransaction = await _relationalConnection.BeginTransactionAsync(cancellationToken);
// _supportShardingTransaction?.NotifyShardingTransaction(dbContextTransaction);
// return dbContextTransaction;
// }
public void CommitTransaction()
{
_relationalConnection.CommitTransaction();
}
// public void CommitTransaction()
// {
// _relationalConnection.CommitTransaction();
// }
public void RollbackTransaction()
{
_relationalConnection.RollbackTransaction();
}
#if EFCORE5
public IDbContextTransaction UseTransaction(DbTransaction transaction, Guid transactionId)
{
var dbContextTransaction = _relationalConnection.UseTransaction(transaction, transactionId);
_supportShardingTransaction?.UseShardingTransaction(dbContextTransaction);
return dbContextTransaction;
}
public async Task<IDbContextTransaction> UseTransactionAsync(DbTransaction transaction, Guid transactionId,
CancellationToken cancellationToken = new CancellationToken())
{
var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, transactionId, cancellationToken);
_supportShardingTransaction?.UseShardingTransaction(dbContextTransaction);
return dbContextTransaction;
}
// public void RollbackTransaction()
// {
// _relationalConnection.RollbackTransaction();
// }
//#if EFCORE5
// public IDbContextTransaction NotifyTransaction(DbTransaction transaction, Guid transactionId)
// {
// var dbContextTransaction = _relationalConnection.NotifyTransaction(transaction, transactionId);
// _supportShardingTransaction?.NotifyShardingTransaction(dbContextTransaction);
// return dbContextTransaction;
// }
// public async Task<IDbContextTransaction> UseTransactionAsync(DbTransaction transaction, Guid transactionId,
// CancellationToken cancellationToken = new CancellationToken())
// {
// var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, transactionId, cancellationToken);
// _supportShardingTransaction?.NotifyShardingTransaction(dbContextTransaction);
// return dbContextTransaction;
// }
public Task CommitTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _relationalConnection.CommitTransactionAsync(cancellationToken);
}
public Task RollbackTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _relationalConnection.RollbackTransactionAsync(cancellationToken);
}
#endif
// public Task CommitTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// return _relationalConnection.CommitTransactionAsync(cancellationToken);
// }
// public Task RollbackTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// return _relationalConnection.RollbackTransactionAsync(cancellationToken);
// }
//#endif
#if !EFCORE5
public bool IsMultipleActiveResultSetsEnabled => _relationalConnection.IsMultipleActiveResultSetsEnabled;
//#if !EFCORE5
// public bool IsMultipleActiveResultSetsEnabled => _relationalConnection.IsMultipleActiveResultSetsEnabled;
# endif
IDbContextTransaction IRelationalConnection.CurrentTransaction => _relationalConnection.CurrentTransaction;
//# endif
// IDbContextTransaction IRelationalConnection.CurrentTransaction => _relationalConnection.CurrentTransaction;
IDbContextTransaction IDbContextTransactionManager.CurrentTransaction => _relationalConnection.CurrentTransaction;
// IDbContextTransaction IDbContextTransactionManager.CurrentTransaction => _relationalConnection.CurrentTransaction;
public SemaphoreSlim Semaphore => _relationalConnection.Semaphore;
// public SemaphoreSlim Semaphore => _relationalConnection.Semaphore;
public bool Open(bool errorsExpected = false)
{
return _relationalConnection.Open(errorsExpected);
}
// public bool Open(bool errorsExpected = false)
// {
// return _relationalConnection.Open(errorsExpected);
// }
public Task<bool> OpenAsync(CancellationToken cancellationToken, bool errorsExpected = false)
{
return _relationalConnection.OpenAsync(cancellationToken, errorsExpected);
}
// public Task<bool> OpenAsync(CancellationToken cancellationToken, bool errorsExpected = false)
// {
// return _relationalConnection.OpenAsync(cancellationToken, errorsExpected);
// }
public bool Close()
{
return _relationalConnection.Close();
}
// public bool Close()
// {
// return _relationalConnection.Close();
// }
public DbConnection DbConnection => _relationalConnection.DbConnection;
// public DbConnection DbConnection => _relationalConnection.DbConnection;
public DbContext Context =>
#if !EFCORE2
_relationalConnection.Context;
#endif
#if EFCORE2
GetDbContext();
// public DbContext Context =>
//#if !EFCORE2
// _relationalConnection.Context;
//#endif
//#if EFCORE2
// GetDbContext();
private DbContext GetDbContext()
{
var namedConnectionStringResolver = ((RelationalConnectionDependencies)_relationalConnection.GetPropertyValue("Dependencies")).ConnectionStringResolver;
var serviceProvider = (IServiceProvider)namedConnectionStringResolver.GetPropertyValue("ApplicationServiceProvider");
var dbContext = (DbContext)serviceProvider.GetService(_dbContextType);
return dbContext;
}
// private DbContext GetDbContext()
// {
// var namedConnectionStringResolver = ((RelationalConnectionDependencies)_relationalConnection.GetPropertyValue("Dependencies")).ConnectionStringResolver;
// var serviceProvider = (IServiceProvider)namedConnectionStringResolver.GetPropertyValue("ApplicationServiceProvider");
// var dbContext = (DbContext)serviceProvider.GetService(_dbContextType);
// return dbContext;
// }
public void RegisterBufferable(IBufferable bufferable)
{
_relationalConnection.RegisterBufferable(bufferable);
}
// public void RegisterBufferable(IBufferable bufferable)
// {
// _relationalConnection.RegisterBufferable(bufferable);
// }
public Task RegisterBufferableAsync(IBufferable bufferable, CancellationToken cancellationToken)
{
return _relationalConnection.RegisterBufferableAsync(bufferable, cancellationToken);
}
#endif
public Guid ConnectionId => _relationalConnection.ConnectionId;
// public Task RegisterBufferableAsync(IBufferable bufferable, CancellationToken cancellationToken)
// {
// return _relationalConnection.RegisterBufferableAsync(bufferable, cancellationToken);
// }
//#endif
// public Guid ConnectionId => _relationalConnection.ConnectionId;
public int? CommandTimeout
{
get
{
return _relationalConnection.CommandTimeout;
}
set
{
_relationalConnection.CommandTimeout = value;
}
}
// public int? CommandTimeout
// {
// get
// {
// return _relationalConnection.CommandTimeout;
// }
// set
// {
// _relationalConnection.CommandTimeout = value;
// }
// }
public IDbContextTransaction BeginTransaction(IsolationLevel isolationLevel)
{
return _relationalConnection.BeginTransaction(isolationLevel);
}
// public IDbContextTransaction BeginTransaction(IsolationLevel isolationLevel)
// {
// var dbContextTransaction = _relationalConnection.BeginTransaction(isolationLevel);
// _supportShardingTransaction?.NotifyShardingTransaction(dbContextTransaction);
// return dbContextTransaction;
// }
public Task<IDbContextTransaction> BeginTransactionAsync(IsolationLevel isolationLevel,
CancellationToken cancellationToken = new CancellationToken())
{
return _relationalConnection.BeginTransactionAsync(isolationLevel, cancellationToken);
}
// public async Task<IDbContextTransaction> BeginTransactionAsync(IsolationLevel isolationLevel,
// CancellationToken cancellationToken = new CancellationToken())
// {
// var dbContextTransaction=await _relationalConnection.BeginTransactionAsync(isolationLevel, cancellationToken);
// _supportShardingTransaction?.NotifyShardingTransaction(dbContextTransaction);
// return dbContextTransaction;
// }
public IDbContextTransaction UseTransaction(DbTransaction transaction)
{
var dbContextTransaction = _relationalConnection.UseTransaction(transaction);
_supportShardingTransaction?.UseShardingTransaction(dbContextTransaction);
return dbContextTransaction;
}
// public IDbContextTransaction NotifyTransaction(DbTransaction transaction)
// {
// var dbContextTransaction = _relationalConnection.NotifyTransaction(transaction);
// _supportShardingTransaction?.NotifyShardingTransaction(dbContextTransaction);
// return dbContextTransaction;
// }
public void Dispose()
{
_relationalConnection.Dispose();
}
// public void Dispose()
// {
// _relationalConnection.Dispose();
// }
public string ConnectionString => _relationalConnection.ConnectionString;
#if !EFCORE2
// public string ConnectionString => _relationalConnection.ConnectionString;
//#if !EFCORE2
public async Task<IDbContextTransaction> UseTransactionAsync(DbTransaction transaction, CancellationToken cancellationToken = new CancellationToken())
{
var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, cancellationToken);
_supportShardingTransaction?.UseShardingTransaction(dbContextTransaction);
return dbContextTransaction;
// public async Task<IDbContextTransaction> UseTransactionAsync(DbTransaction transaction, CancellationToken cancellationToken = new CancellationToken())
// {
// var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, cancellationToken);
// _supportShardingTransaction?.NotifyShardingTransaction(dbContextTransaction);
// return dbContextTransaction;
}
// }
public Task<bool> CloseAsync()
{
return _relationalConnection.CloseAsync();
}
// public Task<bool> CloseAsync()
// {
// return _relationalConnection.CloseAsync();
// }
public ValueTask DisposeAsync()
{
return _relationalConnection.DisposeAsync();
}
#endif
}
}
// public ValueTask DisposeAsync()
// {
// return _relationalConnection.DisposeAsync();
// }
//#endif
// }
//}

View File

@ -6,6 +6,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.EFCores
@ -29,14 +30,14 @@ namespace ShardingCore.EFCores
public override RelationalTransaction Create(IRelationalConnection connection, DbTransaction transaction, Guid transactionId,
IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger, bool transactionOwned)
{
var shardingRelationalConnection = new ShardingRelationalConnection(connection, transaction);
return new ShardingRelationalTransaction(shardingRelationalConnection.Context, shardingRelationalConnection, transaction, transactionId, logger, transactionOwned);
var supportShardingTransaction = connection.Context as ISupportShardingTransaction;
return new ShardingRelationalTransaction(supportShardingTransaction, connection, transaction, transactionId, logger, transactionOwned);
}
}
#endif
#if EFCORE2
public class ShardingRelationalTransactionFactory<TShardingDbContext> : RelationalTransactionFactory where TShardingDbContext:DbContext,IShardingDbContext
public class ShardingRelationalTransactionFactory<TShardingDbContext> : RelationalTransactionFactory where TShardingDbContext : DbContext, IShardingDbContext
{
private readonly RelationalTransactionFactoryDependencies _dependencies;
public ShardingRelationalTransactionFactory(RelationalTransactionFactoryDependencies dependencies) : base(dependencies)
@ -47,10 +48,18 @@ namespace ShardingCore.EFCores
, IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger,
bool transactionOwned)
{
var shardingRelationalConnection = new ShardingRelationalConnection(connection, transaction, typeof(TShardingDbContext));
return new ShardingRelationalTransaction(shardingRelationalConnection.Context, shardingRelationalConnection, transaction, logger,
var supportShardingTransaction = GetDbContext(connection) as ISupportShardingTransaction;
return new ShardingRelationalTransaction(supportShardingTransaction, connection, transaction, logger,
transactionOwned);
}
private DbContext GetDbContext(IRelationalConnection connection)
{
var namedConnectionStringResolver = ((RelationalConnectionDependencies)connection.GetPropertyValue("Dependencies")).ConnectionStringResolver;
var serviceProvider = (IServiceProvider)namedConnectionStringResolver.GetPropertyValue("ApplicationServiceProvider");
var dbContext = (DbContext)serviceProvider.GetService(typeof(TShardingDbContext));
return dbContext;
}
}
#endif
}

View File

@ -0,0 +1,142 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.EFCores
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/10/20 10:08:42
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
/// <summary>
/// manage transaction
/// </summary>
public class ShardingRelationalTransactionManager<TShardingDbContext> : IRelationalTransactionManager where TShardingDbContext : DbContext, IShardingDbContext
{
private readonly IRelationalConnection _relationalConnection;
private readonly ISupportShardingTransaction _supportShardingTransaction;
#if !EFCORE2
public ShardingRelationalTransactionManager(IRelationalConnection relationalConnection)
{
_relationalConnection = relationalConnection;
_supportShardingTransaction = relationalConnection.Context as ISupportShardingTransaction;
}
#endif
#if EFCORE2
public ShardingRelationalTransactionManager(IRelationalConnection relationalConnection)
{
_relationalConnection = relationalConnection;
_supportShardingTransaction = GetDbContext(relationalConnection) as ISupportShardingTransaction;
}
private DbContext GetDbContext(IRelationalConnection connection)
{
var namedConnectionStringResolver = ((RelationalConnectionDependencies)connection.GetPropertyValue("Dependencies")).ConnectionStringResolver;
var serviceProvider = (IServiceProvider)namedConnectionStringResolver.GetPropertyValue("ApplicationServiceProvider");
var dbContext = (DbContext)serviceProvider.GetService(typeof(TShardingDbContext));
return dbContext;
}
#endif
public void ResetState()
{
_relationalConnection.ResetState();
}
public IDbContextTransaction BeginTransaction()
{
return BeginTransaction(IsolationLevel.Unspecified);
}
public Task<IDbContextTransaction> BeginTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
return BeginTransactionAsync(IsolationLevel.Unspecified, cancellationToken);
}
public void CommitTransaction()
{
_relationalConnection.CommitTransaction();
}
public void RollbackTransaction()
{
_relationalConnection.RollbackTransaction();
}
public IDbContextTransaction CurrentTransaction => _relationalConnection.CurrentTransaction;
public IDbContextTransaction BeginTransaction(IsolationLevel isolationLevel)
{
var dbContextTransaction = _relationalConnection.BeginTransaction(isolationLevel);
_supportShardingTransaction?.NotifyShardingTransaction();
return dbContextTransaction;
}
public async Task<IDbContextTransaction> BeginTransactionAsync(IsolationLevel isolationLevel,
CancellationToken cancellationToken = new CancellationToken())
{
var dbContextTransaction = await _relationalConnection.BeginTransactionAsync(isolationLevel, cancellationToken);
_supportShardingTransaction?.NotifyShardingTransaction();
return dbContextTransaction;
}
public IDbContextTransaction UseTransaction(DbTransaction transaction)
{
var dbContextTransaction = _relationalConnection.UseTransaction(transaction);
_supportShardingTransaction?.NotifyShardingTransaction();
return dbContextTransaction;
}
#if !EFCORE2
public Task ResetStateAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _relationalConnection.ResetStateAsync(cancellationToken);
}
public async Task<IDbContextTransaction> UseTransactionAsync(DbTransaction transaction, CancellationToken cancellationToken = new CancellationToken())
{
var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, cancellationToken);
_supportShardingTransaction?.NotifyShardingTransaction();
return dbContextTransaction;
}
#if !EFCORE3
public Task CommitTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _relationalConnection.CommitTransactionAsync(cancellationToken);
}
public Task RollbackTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _relationalConnection.RollbackTransactionAsync(cancellationToken);
}
public IDbContextTransaction UseTransaction(DbTransaction transaction, Guid transactionId)
{
var dbContextTransaction = _relationalConnection.UseTransaction(transaction, transactionId);
_supportShardingTransaction?.NotifyShardingTransaction();
return dbContextTransaction;
}
public async Task<IDbContextTransaction> UseTransactionAsync(DbTransaction transaction, Guid transactionId,
CancellationToken cancellationToken = new CancellationToken())
{
var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, transactionId, cancellationToken);
_supportShardingTransaction?.NotifyShardingTransaction();
return dbContextTransaction;
}
#endif
#endif
}
}

View File

@ -17,5 +17,6 @@ namespace ShardingCore
public interface IShardingDbContextOptionsBuilderConfig<TShardingDbContext> where TShardingDbContext:DbContext,IShardingDbContext
{
DbContextOptionsBuilder UseDbContextOptionsBuilder(string connectionString, DbContextOptionsBuilder dbContextOptionsBuilder);
DbContextOptionsBuilder UseDbContextOptionsBuilder(DbConnection dbConnection, DbContextOptionsBuilder dbContextOptionsBuilder);
}
}

View File

@ -37,7 +37,7 @@ namespace ShardingCore.Sharding
{
_shardingDbContextExecutor =
(IShardingDbContextExecutor)Activator.CreateInstance(
typeof(ShardingDbContextExecutor<>).GetGenericType0(this.GetType()));
typeof(ShardingDbContextExecutor<>).GetGenericType0(this.GetType()),this);
}
}
/// <summary>
@ -560,9 +560,9 @@ namespace ShardingCore.Sharding
}
#endif
public void UseShardingTransaction(IDbContextTransaction wrapDbContextTransaction)
public void NotifyShardingTransaction()
{
_shardingDbContextExecutor.UseShardingTransaction(wrapDbContextTransaction);
_shardingDbContextExecutor.NotifyShardingTransaction();
}
public void Rollback()

View File

@ -20,7 +20,6 @@ namespace ShardingCore.Sharding.Abstractions
#endif
{
IDbContextTransaction CurrentTransaction { get; }
/// <summary>
/// 读写分离优先级
/// </summary>
@ -48,7 +47,7 @@ namespace ShardingCore.Sharding.Abstractions
CancellationToken cancellationToken = new CancellationToken());
int SaveChanges(bool acceptAllChangesOnSuccess);
void UseShardingTransaction(IDbContextTransaction wrapDbContextTransaction);
void NotifyShardingTransaction();

View File

@ -18,7 +18,7 @@ namespace ShardingCore.Sharding.Abstractions
*/
public interface ISupportShardingTransaction
{
void UseShardingTransaction(IDbContextTransaction wrapDbContextTransaction);
void NotifyShardingTransaction();
void Rollback();
void Commit();
#if !EFCORE2

View File

@ -27,7 +27,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
*/
public class DataSourceDbContext<TShardingDbContext> : IDataSourceDbContext where TShardingDbContext : DbContext, IShardingDbContext
{
public bool IsDefault { get; }
public bool IsDefault { get; }
private readonly IShardingDbContextOptionsBuilderConfig<TShardingDbContext> _shardingDbContextOptionsBuilderConfig;
private readonly IShardingDbContextFactory<TShardingDbContext> _shardingDbContextFactory;
private readonly ActualConnectionStringManager<TShardingDbContext> _actualConnectionStringManager;
@ -40,41 +40,78 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
private ConcurrentDictionary<string, DbContext> _dataSourceDbContexts =
new ConcurrentDictionary<string, DbContext>();
private IDbContextTransaction _dbContextTransaction;
private bool _isBeginTransaction =false;
private IsolationLevel _isolationLevel = IsolationLevel.Unspecified;
private bool _isBeginTransaction => _shardingDbContext.Database.CurrentTransaction != null;
private readonly DbContext _shardingDbContext;
private IDbContextTransaction _shardingContextTransaction => _shardingDbContext?.Database?.CurrentTransaction;
private readonly DbContextOptionsBuilder _dbContextOptionsBuilder;
private readonly ILogger<DataSourceDbContext<TShardingDbContext>> _logger;
private DbContextOptions<TShardingDbContext> _dbContextOptions;
private object SLOCK = new object();
private IDbContextTransaction CurrentDbContextTransaction => IsDefault
? _shardingContextTransaction
: _dataSourceDbContexts.Values.FirstOrDefault(o => o.Database.CurrentTransaction != null)?.Database
?.CurrentTransaction;
/// <summary>
///
/// </summary>
/// <param name="dataSourceName"></param>
/// <param name="isDefault"></param>
/// <param name="shardingDbContext"></param>
/// <param name="shardingDbContextOptionsBuilderConfig"></param>
/// <param name="shardingDbContextFactory"></param>
/// <param name="actualConnectionStringManager"></param>
public DataSourceDbContext(string dataSourceName,
bool isDefault,
bool isBeginTransaction,
DbContext shardingDbContext,
IShardingDbContextOptionsBuilderConfig<TShardingDbContext> shardingDbContextOptionsBuilderConfig,
IShardingDbContextFactory<TShardingDbContext> shardingDbContextFactory,
ActualConnectionStringManager<TShardingDbContext> actualConnectionStringManager)
{
DataSourceName = dataSourceName;
IsDefault = isDefault;
_isBeginTransaction = isBeginTransaction;
_shardingDbContext = shardingDbContext;
_shardingDbContextOptionsBuilderConfig = shardingDbContextOptionsBuilderConfig;
_shardingDbContextFactory = shardingDbContextFactory;
_actualConnectionStringManager = actualConnectionStringManager;
_logger = ShardingContainer.GetService<ILogger<DataSourceDbContext<TShardingDbContext>>>();
_dbContextOptionsBuilder = CreateDbContextOptionBuilder();
InitDbContextOptionsBuilder();
}
private void InitDbContextOptionsBuilder()
}
/// <summary>
/// 不支持并发后期发现直接报错而不是用lock
/// </summary>
/// <returns></returns>
private DbContextOptions<TShardingDbContext> CreateShareDbContextOptionsBuilder()
{
var connectionString = _actualConnectionStringManager.GetConnectionString(DataSourceName, true);
_shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(connectionString, _dbContextOptionsBuilder);
if (_dbContextOptions != null)
{
return _dbContextOptions;
}
lock (SLOCK)
{
if (_dbContextOptions != null)
{
return _dbContextOptions;
}
var dbContextOptionsBuilder = CreateDbContextOptionBuilder();
if (IsDefault)
{
var dbConnection = _shardingDbContext.Database.GetDbConnection();
_shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(dbConnection, dbContextOptionsBuilder);
}
else
{
var connectionString = _actualConnectionStringManager.GetConnectionString(DataSourceName, true);
_shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(connectionString, dbContextOptionsBuilder);
}
_dbContextOptions = dbContextOptionsBuilder.Options;
return _dbContextOptions;
}
}
public static DbContextOptionsBuilder<TShardingDbContext> CreateDbContextOptionBuilder()
@ -84,7 +121,11 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
return (DbContextOptionsBuilder<TShardingDbContext>)Activator.CreateInstance(type);
}
/// <summary>
/// 不支持并发后期发现直接报错而不是用lock
/// </summary>
/// <param name="routeTail"></param>
/// <returns></returns>
public DbContext CreateDbContext(IRouteTail routeTail)
{
if (routeTail.IsMultiEntityQuery())
@ -95,87 +136,79 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
var cacheKey = routeTail.GetRouteTailIdentity();
if (!_dataSourceDbContexts.TryGetValue(cacheKey, out var dbContext))
{
dbContext = _shardingDbContextFactory.Create(_dbContextOptionsBuilder.Options, routeTail);
dbContext = _shardingDbContextFactory.Create(CreateShareDbContextOptionsBuilder(), routeTail);
_dataSourceDbContexts.TryAdd(cacheKey, dbContext);
ShardingDbTransaction(dbContext);
ShardingDbTransaction();
}
return dbContext;
}
private void ShardingDbTransaction(DbContext dbContext)
private void ShardingDbTransaction()
{
if (_isBeginTransaction)
{
if (_dbContextTransaction != null)
{
dbContext.Database.UseTransaction(_dbContextTransaction.GetDbTransaction());
}
else
{
_dbContextTransaction = dbContext.Database.BeginTransaction();
}
}
}
public void UseTransaction(IDbContextTransaction dbContextTransaction)
{
if (dbContextTransaction == null)
{
ClearTransaction();
}
else
{
ResSetTransaction(dbContextTransaction);
BeginAnyTransaction();
JoinCurrentTransaction();
}
}
/// <summary>
/// 重新设置当前的事务
/// </summary>
/// <param name="dbContextTransaction"></param>
private void ResSetTransaction(IDbContextTransaction dbContextTransaction)
{
if (dbContextTransaction == null)
throw new ArgumentNullException(nameof(dbContextTransaction));
if (IsDefault)
{
_dbContextTransaction = dbContextTransaction;
}
else
{
_isolationLevel = dbContextTransaction.GetDbTransaction().IsolationLevel;
_isBeginTransaction = true;
if (!_dataSourceDbContexts.IsEmpty)
{
_dbContextTransaction = _dataSourceDbContexts.First().Value.Database.BeginTransaction(_isolationLevel);
}
}
}
/// <summary>
/// 加入到当前事务
/// </summary>
private void JoinCurrentTransaction()
{
//如果当前的dbcontext有的话
if (_dbContextTransaction != null)
if (CurrentDbContextTransaction != null)
{
var dbTransaction = CurrentDbContextTransaction.GetDbTransaction();
foreach (var dataSourceDbContext in _dataSourceDbContexts)
{
if (dataSourceDbContext.Value.Database.CurrentTransaction == null)
dataSourceDbContext.Value.Database.UseTransaction(_dbContextTransaction.GetDbTransaction());
dataSourceDbContext.Value.Database.UseTransaction(dbTransaction);
}
}
}
private void BeginAnyTransaction()
{
if (_isBeginTransaction)
{
if (!IsDefault)
{
if (!_dataSourceDbContexts.IsEmpty)
{
var isolationLevel = _shardingContextTransaction.GetDbTransaction().IsolationLevel;
var firstTransaction = _dataSourceDbContexts.Values
.FirstOrDefault(o => o.Database.CurrentTransaction != null)
?.Database?.CurrentTransaction;
if (firstTransaction == null)
{
_ = _dataSourceDbContexts.First().Value.Database
.BeginTransaction(isolationLevel);
}
}
}
}
}
public void NotifyTransaction()
{
if (!_isBeginTransaction)
{
ClearTransaction();
}
else
{
BeginAnyTransaction();
JoinCurrentTransaction();
}
}
private void ClearTransaction()
{
_dbContextTransaction = null;
foreach (var dataSourceDbContext in _dataSourceDbContexts)
{
if (dataSourceDbContext.Value.Database.CurrentTransaction != null)
dataSourceDbContext.Value.Database.UseTransaction(null);
}
_isBeginTransaction = false;
_isolationLevel = IsolationLevel.Unspecified;
}
/// <summary>
@ -211,11 +244,11 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
return;
try
{
_dbContextTransaction.Rollback();
CurrentDbContextTransaction?.Rollback();
}
catch(Exception e)
catch (Exception e)
{
_logger.LogError(e,"rollback error.");
_logger.LogError(e, "rollback error.");
}
}
@ -225,7 +258,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
return;
try
{
_dbContextTransaction.Commit();
CurrentDbContextTransaction?.Commit();
}
catch (Exception e)
{
@ -241,7 +274,8 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
return;
try
{
await _dbContextTransaction.RollbackAsync(cancellationToken);
if (CurrentDbContextTransaction != null)
await CurrentDbContextTransaction.RollbackAsync(cancellationToken);
}
catch (Exception e)
{
@ -256,7 +290,8 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
return;
try
{
await _dbContextTransaction.CommitAsync(cancellationToken);
if (CurrentDbContextTransaction != null)
await CurrentDbContextTransaction.CommitAsync(cancellationToken);
}
catch (Exception e)
{

View File

@ -25,7 +25,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
{
bool IsDefault { get; }
DbContext CreateDbContext(IRouteTail routeTail);
void UseTransaction(IDbContextTransaction dbContextTransaction);
void NotifyTransaction();
int SaveChanges(bool acceptAllChangesOnSuccess);

View File

@ -31,6 +31,8 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
/// <typeparam name="TShardingDbContext"></typeparam>
public class ShardingDbContextExecutor<TShardingDbContext> : IShardingDbContextExecutor where TShardingDbContext : DbContext, IShardingDbContext
{
private readonly DbContext _shardingDbContext;
//private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, DbContext>> _dbContextCaches = new ConcurrentDictionary<string, ConcurrentDictionary<string, DbContext>>();
private readonly ConcurrentDictionary<string, IDataSourceDbContext> _dbContextCaches = new ConcurrentDictionary<string, IDataSourceDbContext>();
private readonly IVirtualDataSource<TShardingDbContext> _virtualDataSource;
@ -40,8 +42,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
private readonly IRouteTailFactory _routeTailFactory;
private readonly ActualConnectionStringManager<TShardingDbContext> _actualConnectionStringManager;
public IDbContextTransaction CurrentTransaction { get; private set; }
private bool IsBeginTransaction => CurrentTransaction != null;
private bool IsBeginTransaction => _shardingDbContext.Database.CurrentTransaction != null;
public int ReadWriteSeparationPriority
{
@ -57,8 +58,9 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
public ShardingDbContextExecutor()
public ShardingDbContextExecutor(DbContext shardingDbContext)
{
_shardingDbContext = shardingDbContext;
_virtualDataSource = ShardingContainer.GetService<IVirtualDataSource<TShardingDbContext>>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager<TShardingDbContext>>();
_shardingDbContextFactory = ShardingContainer.GetService<IShardingDbContextFactory<TShardingDbContext>>();
@ -71,7 +73,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
private IDataSourceDbContext GetDataSourceDbContext(string dataSourceName)
{
return _dbContextCaches.GetOrAdd(dataSourceName, dsname => new DataSourceDbContext<TShardingDbContext>(dataSourceName, _virtualDataSource.IsDefault(dataSourceName), IsBeginTransaction, _shardingDbContextOptionsBuilderConfig, _shardingDbContextFactory, _actualConnectionStringManager));
return _dbContextCaches.GetOrAdd(dataSourceName, dsname => new DataSourceDbContext<TShardingDbContext>(dataSourceName, _virtualDataSource.IsDefault(dataSourceName), _shardingDbContext, _shardingDbContextOptionsBuilderConfig, _shardingDbContextFactory, _actualConnectionStringManager));
}
@ -133,23 +135,12 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
return i;
}
public void UseShardingTransaction(IDbContextTransaction wrapDbContextTransaction)
public void NotifyShardingTransaction()
{
if (IsBeginTransaction)
foreach (var dbContextCache in _dbContextCaches)
{
if (wrapDbContextTransaction != null)
throw new ShardingCoreException("db transaction is already begin");
foreach (var dbContextCache in _dbContextCaches)
{
dbContextCache.Value.UseTransaction(null);
}
dbContextCache.Value.NotifyTransaction();
}
else
{
BeginTransaction(wrapDbContextTransaction);
}
CurrentTransaction = wrapDbContextTransaction;
}
public void Rollback()
@ -169,11 +160,11 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
}
private void BeginTransaction(IDbContextTransaction wrapDbContextTransaction)
private void BeginTransaction()
{
foreach (var dbContextCache in _dbContextCaches)
{
dbContextCache.Value.UseTransaction(wrapDbContextTransaction);
dbContextCache.Value.NotifyTransaction();
}
}

View File

@ -16,13 +16,22 @@ namespace ShardingCore.Sharding
*/
public class ShardingDbContextOptionsBuilderConfig<TShardingDbContext> : IShardingDbContextOptionsBuilderConfig<TShardingDbContext> where TShardingDbContext : DbContext, IShardingDbContext
{
public ShardingDbContextOptionsBuilderConfig(Action<string,DbContextOptionsBuilder> defaultQueryDbContextOptionsCreator)
public ShardingDbContextOptionsBuilderConfig(Action<DbConnection, DbContextOptionsBuilder> sameConnectionDbContextOptionsCreator, Action<string,DbContextOptionsBuilder> defaultQueryDbContextOptionsCreator)
{
SameConnectionDbContextOptionsCreator = sameConnectionDbContextOptionsCreator;
DefaultQueryDbContextOptionsCreator = defaultQueryDbContextOptionsCreator;
}
public Action<DbConnection, DbContextOptionsBuilder> SameConnectionDbContextOptionsCreator { get; }
public Action<string,DbContextOptionsBuilder> DefaultQueryDbContextOptionsCreator { get; }
public Type ShardingDbContextType => typeof(TShardingDbContext);
public DbContextOptionsBuilder UseDbContextOptionsBuilder(DbConnection dbConnection, DbContextOptionsBuilder dbContextOptionsBuilder)
{
SameConnectionDbContextOptionsCreator(dbConnection, dbContextOptionsBuilder);
dbContextOptionsBuilder.UseInnerDbContextSharding<TShardingDbContext>();
return dbContextOptionsBuilder;
}
public DbContextOptionsBuilder UseDbContextOptionsBuilder(string connectionString,DbContextOptionsBuilder dbContextOptionsBuilder)
{
DefaultQueryDbContextOptionsCreator(connectionString,dbContextOptionsBuilder);

View File

@ -65,7 +65,7 @@
// }
// else
// {
// dbContext.Database.UseTransaction(dbContextTransaction.GetDbTransaction());
// dbContext.Database.NotifyTransaction(dbContextTransaction.GetDbTransaction());
// }
// }

View File

@ -25,7 +25,7 @@ namespace ShardingCore
private readonly Dictionary<Type, Type> _virtualDataSourceRoutes = new Dictionary<Type, Type>();
private readonly Dictionary<Type, Type> _virtualTableRoutes = new Dictionary<Type, Type>();
//public Action<DbConnection, DbContextOptionsBuilder> SameConnectionConfigure { get; private set; }
public Action<DbConnection, DbContextOptionsBuilder> SameConnectionConfigure { get; private set; }
public Action<string, DbContextOptionsBuilder> DefaultQueryConfigure { get; private set; }
public Func<IServiceProvider, IDictionary<string, string>> DataSourcesConfigure { get; private set; }
@ -34,10 +34,10 @@ namespace ShardingCore
{
DefaultQueryConfigure = queryConfigure ?? throw new ArgumentNullException(nameof(queryConfigure));
}
//public void UseShardingTransaction(Action<DbConnection, DbContextOptionsBuilder> transactionConfigure)
//{
// SameConnectionConfigure = transactionConfigure ?? throw new ArgumentNullException(nameof(transactionConfigure));
//}
public void UseShardingTransaction(Action<DbConnection, DbContextOptionsBuilder> transactionConfigure)
{
SameConnectionConfigure = transactionConfigure ?? throw new ArgumentNullException(nameof(transactionConfigure));
}
public void AddShardingDataSource(Func<IServiceProvider, IDictionary<string, string>> dataSourcesConfigure)
{

View File

@ -55,6 +55,8 @@ namespace ShardingCore.Test50
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
})
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection).UseLoggerFactory(efLogger))
.AddDefaultDataSource("ds0",hostBuilderContext.Configuration.GetSection("SqlServer")["ConnectionString"])
.AddShardingTableRoute(op =>
{
@ -129,10 +131,16 @@ namespace ShardingCore.Test50
}
}
await virtualDbContext.AddRangeAsync(userMods);
await virtualDbContext.AddRangeAsync(userSalaries);
using (var tran = virtualDbContext.Database.BeginTransaction())
{
await virtualDbContext.AddRangeAsync(userMods);
await virtualDbContext.AddRangeAsync(userSalaries);
await virtualDbContext.SaveChangesAsync();
await virtualDbContext.SaveChangesAsync();
var b = 0;
var c = 1 / b;
tran.Commit();
}
}
}
}

View File

@ -52,6 +52,8 @@ namespace ShardingCore.Test50_2x
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
})
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection).UseLoggerFactory(efLogger))
.AddDefaultDataSource("ds0", hostBuilderContext.Configuration.GetSection("SqlServer")["ConnectionString"])
.AddShardingTableRoute(op =>
{

View File

@ -48,6 +48,8 @@ namespace ShardingCore.Test50_3x
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
})
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection).UseLoggerFactory(efLogger))
.AddDefaultDataSource("ds0", hostBuilderContext.Configuration.GetSection("SqlServer")["ConnectionString"])
.AddShardingTableRoute(op =>
{