This commit is contained in:
xuejiaming 2022-10-20 22:50:07 +08:00
parent c94add19ed
commit 2c8b1d0b57
23 changed files with 262 additions and 394 deletions

View File

@ -28,7 +28,7 @@ public static class MyShardingExtension
var shardingRuntimeContext = shardingDbContext.GetShardingRuntimeContext();
var entityType = typeof(TEntity);
var routeTailFactory = shardingRuntimeContext.GetRouteTailFactory();
var virtualDataSource = shardingDbContext.GetVirtualDataSource();
var virtualDataSource = shardingRuntimeContext.GetVirtualDataSource();
var dataSourceRouteManager = shardingRuntimeContext.GetDataSourceRouteManager();
var tableRouteManager =shardingRuntimeContext.GetTableRouteManager();
var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();

View File

@ -58,6 +58,7 @@ namespace Sample.MySql.Controllers
public IQueryable<SysTest> GetAll()
{
var shardingRouteManager = _shardingRuntimeContext.GetShardingRouteManager();
// var shardingTableCreator = _shardingRuntimeContext.GetShardingTableCreator();
// var tableRouteManager = _shardingRuntimeContext.GetTableRouteManager();
// //系统的时间分片都会实现 ITailAppendable 如果不是系统的自定义的转成你自己的对象即可

View File

@ -17,6 +17,7 @@ using Volo.Abp.Domain.Entities;
using Volo.Abp.EntityFrameworkCore;
using Volo.Abp.Reflection;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Exceptions;
using ShardingCore.Sharding;
namespace Samples.AbpSharding
@ -36,24 +37,6 @@ namespace Samples.AbpSharding
}
}
/// <summary>
/// 读写分离优先级
/// </summary>
public int ReadWriteSeparationPriority
{
get => _shardingDbContextExecutor.ReadWriteSeparationPriority;
set => _shardingDbContextExecutor.ReadWriteSeparationPriority = value;
}
/// <summary>
/// 是否使用读写分离
/// </summary>
public bool ReadWriteSeparation
{
get => _shardingDbContextExecutor.ReadWriteSeparation;
set => _shardingDbContextExecutor.ReadWriteSeparation = value;
}
/// <summary>
/// 是否是真正的执行者
/// </summary>
@ -93,6 +76,11 @@ namespace Samples.AbpSharding
return dbContext;
}
public IShardingDbContextExecutor GetShardingExecutor()
{
return _shardingDbContextExecutor;
}
private void CheckAndSetShardingKeyThatSupportAutoCreate<TEntity>(TEntity entity) where TEntity : class
{
@ -541,39 +529,5 @@ namespace Samples.AbpSharding
await base.DisposeAsync();
}
}
public Task RollbackAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _shardingDbContextExecutor.RollbackAsync(cancellationToken);
}
public Task CommitAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _shardingDbContextExecutor.CommitAsync(cancellationToken);
}
public void NotifyShardingTransaction()
{
_shardingDbContextExecutor.NotifyShardingTransaction();
}
public void Rollback()
{
_shardingDbContextExecutor.Rollback();
}
public void Commit()
{
_shardingDbContextExecutor.Commit();
}
public IVirtualDataSource GetVirtualDataSource()
{
return _shardingDbContextExecutor.GetVirtualDataSource();
}
public IDictionary<string, IDataSourceDbContext> GetCurrentDbContexts()
{
return _shardingDbContextExecutor.GetCurrentDbContexts();
}
}
}

View File

@ -45,24 +45,6 @@ namespace Samples.AbpSharding
_shardingDbContextExecutor = new ShardingDbContextExecutor(this);
}
}
/// <summary>
/// 读写分离优先级
/// </summary>
public int ReadWriteSeparationPriority
{
get => _shardingDbContextExecutor.ReadWriteSeparationPriority;
set => _shardingDbContextExecutor.ReadWriteSeparationPriority = value;
}
/// <summary>
/// 是否使用读写分离
/// </summary>
public bool ReadWriteSeparation
{
get => _shardingDbContextExecutor.ReadWriteSeparation;
set => _shardingDbContextExecutor.ReadWriteSeparation = value;
}
/// <summary>
/// 是否是真正的执行者
/// </summary>
@ -521,34 +503,9 @@ namespace Samples.AbpSharding
return dbContext;
}
public virtual IVirtualDataSource GetVirtualDataSource()
public IShardingDbContextExecutor GetShardingExecutor()
{
return _shardingDbContextExecutor.GetVirtualDataSource();
}
public virtual Task RollbackAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _shardingDbContextExecutor.RollbackAsync(cancellationToken);
}
public virtual Task CommitAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _shardingDbContextExecutor.CommitAsync(cancellationToken);
}
public virtual void NotifyShardingTransaction()
{
_shardingDbContextExecutor.NotifyShardingTransaction();
}
public virtual void Rollback()
{
_shardingDbContextExecutor.Rollback();
}
public virtual void Commit()
{
_shardingDbContextExecutor.Commit();
return _shardingDbContextExecutor;
}
#endregion
@ -647,10 +604,5 @@ namespace Samples.AbpSharding
}
#endregion
public IDictionary<string, IDataSourceDbContext> GetCurrentDbContexts()
{
return _shardingDbContextExecutor.GetCurrentDbContexts();
}
}
}

View File

@ -28,7 +28,7 @@ namespace ShardingCore.EFCores.ChangeTrackers
{
if (_dbContext is IShardingDbContext shardingDbContext)
{
return shardingDbContext.GetCurrentDbContexts().Any(o =>
return shardingDbContext.GetShardingExecutor().GetCurrentDbContexts().Any(o =>
o.Value.GetCurrentContexts().Any(r => r.Value.ChangeTracker.HasChanges()));
}
@ -39,7 +39,7 @@ namespace ShardingCore.EFCores.ChangeTrackers
{
if (_dbContext is IShardingDbContext shardingDbContext)
{
return shardingDbContext.GetCurrentDbContexts().SelectMany(o =>
return shardingDbContext.GetShardingExecutor().GetCurrentDbContexts().SelectMany(o =>
o.Value.GetCurrentContexts().SelectMany(cd => cd.Value.ChangeTracker.Entries()));
}
@ -50,7 +50,7 @@ namespace ShardingCore.EFCores.ChangeTrackers
{
if (_dbContext is IShardingDbContext shardingDbContext)
{
return shardingDbContext.GetCurrentDbContexts().SelectMany(o =>
return shardingDbContext.GetShardingExecutor().GetCurrentDbContexts().SelectMany(o =>
o.Value.GetCurrentContexts().SelectMany(cd => cd.Value.ChangeTracker.Entries<TEntity>()));
}
@ -79,7 +79,7 @@ namespace ShardingCore.EFCores.ChangeTrackers
private void Do(Action<ChangeTracker> action)
{
var dataSourceDbContexts = ((IShardingDbContext)_dbContext).GetCurrentDbContexts();
var dataSourceDbContexts = ((IShardingDbContext)_dbContext).GetShardingExecutor().GetCurrentDbContexts();
foreach (var dataSourceDbContext in dataSourceDbContexts)
{
var currentContexts = dataSourceDbContext.Value.GetCurrentContexts();

View File

@ -26,6 +26,7 @@ namespace ShardingCore.EFCores
public class ShardingRelationalTransaction : RelationalTransaction
{
private readonly IShardingDbContext _shardingDbContext;
private readonly IShardingDbContextExecutor _shardingDbContextExecutor;
#if NET6_0
public ShardingRelationalTransaction(IShardingDbContext shardingDbContext, IRelationalConnection connection, DbTransaction transaction, Guid transactionId, IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger, bool transactionOwned, ISqlGenerationHelper sqlGenerationHelper) : base(connection, transaction, transactionId, logger, transactionOwned, sqlGenerationHelper)
{
@ -43,6 +44,9 @@ namespace ShardingCore.EFCores
_shardingDbContext = shardingDbContext ??
throw new ShardingCoreInvalidOperationException(
$"should implement {nameof(IShardingDbContext)}");
_shardingDbContextExecutor = shardingDbContext.GetShardingExecutor() ??
throw new ShardingCoreInvalidOperationException(
$"{shardingDbContext.GetType()} cant get {nameof(IShardingDbContextExecutor)} from {nameof(shardingDbContext.GetShardingExecutor)}");
}
#endif
@ -65,15 +69,15 @@ namespace ShardingCore.EFCores
public override void Commit()
{
base.Commit();
_shardingDbContext.Commit();
_shardingDbContext.NotifyShardingTransaction();
_shardingDbContextExecutor.Commit();
_shardingDbContextExecutor.NotifyShardingTransaction();
}
public override void Rollback()
{
base.Rollback();
_shardingDbContext.Rollback();
_shardingDbContext.NotifyShardingTransaction();
_shardingDbContextExecutor.Rollback();
_shardingDbContextExecutor.NotifyShardingTransaction();
}
#if !NETCOREAPP2_0
@ -81,29 +85,53 @@ namespace ShardingCore.EFCores
{
await base.RollbackAsync(cancellationToken);
await _shardingDbContext.RollbackAsync(cancellationToken);
_shardingDbContext.NotifyShardingTransaction();
await _shardingDbContextExecutor.RollbackAsync(cancellationToken);
_shardingDbContextExecutor.NotifyShardingTransaction();
}
public override async Task CommitAsync(CancellationToken cancellationToken = new CancellationToken())
{
await base.CommitAsync(cancellationToken);
await _shardingDbContext.CommitAsync(cancellationToken);
_shardingDbContext.NotifyShardingTransaction();
await _shardingDbContextExecutor.CommitAsync(cancellationToken);
_shardingDbContextExecutor.NotifyShardingTransaction();
}
#if !NETCOREAPP3_0&&!NETSTANDARD2_0
// public override void CreateSavepoint(string name)
// {
// base.CreateSavepoint(name);
// _shardingDbContext.CreateSavepoint(name);
// }
//
// public override async Task CreateSavepointAsync(string name, CancellationToken cancellationToken = new CancellationToken())
// {
// await base.CreateSavepointAsync(name, cancellationToken);
// await _shardingDbContext.CreateSavepointAsync(name,cancellationToken);
// }
public override void CreateSavepoint(string name)
{
base.CreateSavepoint(name);
_shardingDbContextExecutor.CreateSavepoint(name);
}
public override async Task CreateSavepointAsync(string name, CancellationToken cancellationToken = new CancellationToken())
{
await base.CreateSavepointAsync(name, cancellationToken);
await _shardingDbContextExecutor.CreateSavepointAsync(name,cancellationToken);
}
public override void RollbackToSavepoint(string name)
{
base.RollbackToSavepoint(name);
_shardingDbContextExecutor.RollbackToSavepoint(name);
}
public override async Task RollbackToSavepointAsync(string name, CancellationToken cancellationToken = new CancellationToken())
{
await base.RollbackToSavepointAsync(name, cancellationToken);
await _shardingDbContextExecutor.RollbackToSavepointAsync(name,cancellationToken);
}
public override void ReleaseSavepoint(string name)
{
base.ReleaseSavepoint(name);
_shardingDbContextExecutor.ReleaseSavepoint(name);
}
public override async Task ReleaseSavepointAsync(string name, CancellationToken cancellationToken = new CancellationToken())
{
await base.ReleaseSavepointAsync(name, cancellationToken);
await _shardingDbContextExecutor.ReleaseSavepointAsync(name,cancellationToken);
}
#endif
#endif
}

View File

@ -21,9 +21,9 @@ namespace ShardingCore.EFCores
public override IEnumerator<TEntity> GetEnumerator()
{
if (_dbContext is IShardingDbContext currentDbContextDiscover)
if (_dbContext is IShardingDbContext shardingDbContext)
{
var dataSourceDbContexts = currentDbContextDiscover.GetCurrentDbContexts();
var dataSourceDbContexts = shardingDbContext.GetShardingExecutor().GetCurrentDbContexts();
var enumerators = dataSourceDbContexts.SelectMany(o => o.Value.GetCurrentContexts().Select(cd=>cd.Value.Set<TEntity>().Local.GetEnumerator()));
return new MultiEnumerator<TEntity>(enumerators);
}

View File

@ -27,11 +27,13 @@ namespace ShardingCore.EFCores
{
private readonly IRelationalConnection _relationalConnection;
private readonly IShardingDbContext _shardingDbContext;
private readonly IShardingDbContextExecutor _shardingDbContextExecutor;
#if !NETCOREAPP2_0
public ShardingRelationalTransactionManager(IRelationalConnection relationalConnection)
{
_relationalConnection = relationalConnection;
_shardingDbContext = relationalConnection.Context as IShardingDbContext??throw new ShardingCoreInvalidOperationException($"should implement {nameof(IShardingDbContext)}");
_shardingDbContextExecutor = _shardingDbContext.GetShardingExecutor();
}
#endif
@ -40,6 +42,7 @@ namespace ShardingCore.EFCores
{
_relationalConnection = relationalConnection;
_shardingDbContext = GetDbContext(relationalConnection) as IShardingDbContext??throw new ShardingCoreInvalidOperationException($"should implement {nameof(IShardingDbContext)}");
_shardingDbContextExecutor = _shardingDbContext.GetShardingExecutor();
}
private DbContext GetDbContext(IRelationalConnection connection)
{
@ -81,7 +84,7 @@ namespace ShardingCore.EFCores
public IDbContextTransaction BeginTransaction(IsolationLevel isolationLevel)
{
var dbContextTransaction = _relationalConnection.BeginTransaction(isolationLevel);
_shardingDbContext.NotifyShardingTransaction();
_shardingDbContextExecutor.NotifyShardingTransaction();
return dbContextTransaction;
}
@ -89,14 +92,14 @@ namespace ShardingCore.EFCores
CancellationToken cancellationToken = new CancellationToken())
{
var dbContextTransaction = await _relationalConnection.BeginTransactionAsync(isolationLevel, cancellationToken);
_shardingDbContext.NotifyShardingTransaction();
_shardingDbContextExecutor.NotifyShardingTransaction();
return dbContextTransaction;
}
public IDbContextTransaction UseTransaction(DbTransaction transaction)
{
var dbContextTransaction = _relationalConnection.UseTransaction(transaction);
_shardingDbContext.NotifyShardingTransaction();
_shardingDbContextExecutor.NotifyShardingTransaction();
return dbContextTransaction;
}
#if !NETCOREAPP2_0
@ -109,7 +112,7 @@ namespace ShardingCore.EFCores
public async Task<IDbContextTransaction> UseTransactionAsync(DbTransaction transaction, CancellationToken cancellationToken = new CancellationToken())
{
var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, cancellationToken);
_shardingDbContext.NotifyShardingTransaction();
_shardingDbContextExecutor.NotifyShardingTransaction();
return dbContextTransaction;
}
#if !NETCOREAPP3_0 && !NETSTANDARD2_0
@ -125,14 +128,14 @@ namespace ShardingCore.EFCores
public IDbContextTransaction UseTransaction(DbTransaction transaction, Guid transactionId)
{
var dbContextTransaction = _relationalConnection.UseTransaction(transaction, transactionId);
_shardingDbContext.NotifyShardingTransaction();
_shardingDbContextExecutor.NotifyShardingTransaction();
return dbContextTransaction;
}
public async Task<IDbContextTransaction> UseTransactionAsync(DbTransaction transaction, Guid transactionId,
CancellationToken cancellationToken = new CancellationToken())
{
var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, transactionId, cancellationToken);
_shardingDbContext.NotifyShardingTransaction();
_shardingDbContextExecutor.NotifyShardingTransaction();
return dbContextTransaction;
}
#endif

View File

@ -21,7 +21,7 @@ namespace ShardingCore.Extensions
{
public static bool IsUseReadWriteSeparation(this IShardingDbContext shardingDbContext)
{
return shardingDbContext.GetVirtualDataSource().UseReadWriteSeparation;
return shardingDbContext.GetShardingExecutor().GetVirtualDataSource().UseReadWriteSeparation;
}
public static bool SupportUnionAllMerge(this IShardingDbContext shardingDbContext)

View File

@ -92,7 +92,7 @@ namespace ShardingCore.Extensions
var shardingRuntimeContext = shardingDbContext.GetShardingRuntimeContext();
var entityType = typeof(TEntity);
var routeTailFactory = shardingRuntimeContext.GetRouteTailFactory();
var virtualDataSource = shardingDbContext.GetVirtualDataSource();
var virtualDataSource = shardingRuntimeContext.GetVirtualDataSource();
var dataSourceRouteManager = shardingRuntimeContext.GetDataSourceRouteManager();
var tableRouteManager =shardingRuntimeContext.GetTableRouteManager();
var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();

View File

@ -45,22 +45,23 @@ namespace ShardingCore.Extensions
/// <summary>
/// 设置读写分离
/// </summary>
/// <param name="supportShardingReadWrite"></param>
/// <param name="shardingDbContext"></param>
/// <param name="readOnly">是否是读数据源</param>
private static void SetReadWriteSeparation(this IShardingDbContext supportShardingReadWrite, bool readOnly)
private static void SetReadWriteSeparation(this IShardingDbContext shardingDbContext, bool readOnly)
{
var shardingRuntimeContext = ((DbContext)supportShardingReadWrite).GetShardingRuntimeContext();
var shardingRuntimeContext = ((DbContext)shardingDbContext).GetShardingRuntimeContext();
var shardingDbContextExecutor = shardingDbContext.GetShardingExecutor();
var shardingReadWriteManager = shardingRuntimeContext.GetService<IShardingReadWriteManager>();
var shardingReadWriteContext = shardingReadWriteManager.GetCurrent();
if (shardingReadWriteContext != null)
{
if (shardingReadWriteContext.DefaultPriority > supportShardingReadWrite.ReadWriteSeparationPriority)
if (shardingReadWriteContext.DefaultPriority > shardingDbContextExecutor.ReadWriteSeparationPriority)
{
supportShardingReadWrite.ReadWriteSeparationPriority = shardingReadWriteContext.DefaultPriority + 1;
shardingDbContextExecutor.ReadWriteSeparationPriority=shardingReadWriteContext.DefaultPriority + 1;
}
}
supportShardingReadWrite.ReadWriteSeparation = readOnly;
shardingDbContextExecutor.ReadWriteSeparation = readOnly;
}
public static void SetReadWriteSeparation(this ShardingReadWriteContext shardingReadWriteContext, int priority,
@ -80,21 +81,22 @@ namespace ShardingCore.Extensions
if (shardingDbContext.IsUseReadWriteSeparation())
{
var shardingRuntimeContext = ((DbContext)shardingDbContext).GetShardingRuntimeContext();
var shardingDbContextExecutor = shardingDbContext.GetShardingExecutor();
var shardingReadWriteManager = shardingRuntimeContext.GetService<IShardingReadWriteManager>();
var shardingReadWriteContext = shardingReadWriteManager.GetCurrent();
if (shardingReadWriteContext != null)
{
if (shardingReadWriteContext.DefaultPriority > shardingDbContext.ReadWriteSeparationPriority)
if (shardingReadWriteContext.DefaultPriority > shardingDbContextExecutor.ReadWriteSeparationPriority)
{
return shardingReadWriteContext.DefaultReadEnable;
}
else
{
return shardingDbContext.ReadWriteSeparation;
return shardingDbContextExecutor.ReadWriteSeparation;
}
}
return shardingDbContext.ReadWriteSeparation;
return shardingDbContextExecutor.ReadWriteSeparation;
}
return false;

View File

@ -12,6 +12,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Exceptions;
namespace ShardingCore.Sharding
{
@ -40,22 +41,6 @@ namespace ShardingCore.Sharding
IsExecutor = wrapOptionsExtension == null;
}
/// <summary>
/// 读写分离优先级
/// </summary>
public int ReadWriteSeparationPriority
{
get => ShardingDbContextExecutor.ReadWriteSeparationPriority;
set => ShardingDbContextExecutor.ReadWriteSeparationPriority = value;
}
/// <summary>
/// 是否使用读写分离
/// </summary>
public bool ReadWriteSeparation
{
get => ShardingDbContextExecutor.ReadWriteSeparation;
set => ShardingDbContextExecutor.ReadWriteSeparation = value;
}
/// <summary>
/// 是否是真正的执行者
@ -80,12 +65,11 @@ namespace ShardingCore.Sharding
return ShardingDbContextExecutor.CreateGenericDbContext(entity);
}
public IVirtualDataSource GetVirtualDataSource()
public IShardingDbContextExecutor GetShardingExecutor()
{
return ShardingDbContextExecutor.GetVirtualDataSource();
return ShardingDbContextExecutor;
}
public override EntityEntry Add(object entity)
{
if (IsExecutor)
@ -349,44 +333,6 @@ namespace ShardingCore.Sharding
}
}
//protected virtual void ApplyShardingConcepts()
//{
// foreach (var entry in ChangeTracker.Entries().ToList())
// {
// ApplyShardingConcepts(entry);
// }
//}
//protected virtual void ApplyShardingConcepts(EntityEntry entry)
//{
// switch (entry.State)
// {
// case EntityState.Added:
// case EntityState.Modified:
// case EntityState.Deleted:
// ApplyShardingConceptsForEntity(entry);
// break;
// }
// //throw new ShardingCoreNotSupportedException($"entry.State:[{entry.State}]");
//}
//protected virtual void ApplyShardingConceptsForEntity(EntityEntry entry)
//{
// var genericDbContext = CreateGenericDbContext(entry.Entity);
// var entityState = entry.State;
// entry.State = EntityState.Unchanged;
// genericDbContext.Entry(entry.Entity).State = entityState;
//}
// public override int SaveChanges()
// {
//
// if (IsExecutor)
// return base.SaveChanges();
// return this.SaveChanges(true);
// }
public override int SaveChanges(bool acceptAllChangesOnSuccess)
{
if (IsExecutor)
@ -411,13 +357,6 @@ namespace ShardingCore.Sharding
}
// public override Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// if (IsExecutor)
// return base.SaveChangesAsync(cancellationToken);
// return this.SaveChangesAsync(true, cancellationToken);
// }
public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
{
if (IsExecutor)
@ -475,36 +414,6 @@ namespace ShardingCore.Sharding
await base.DisposeAsync();
}
}
public Task RollbackAsync(CancellationToken cancellationToken = new CancellationToken())
{
return ShardingDbContextExecutor.RollbackAsync(cancellationToken);
}
public Task CommitAsync(CancellationToken cancellationToken = new CancellationToken())
{
return ShardingDbContextExecutor.CommitAsync(cancellationToken);
}
#endif
public void NotifyShardingTransaction()
{
ShardingDbContextExecutor.NotifyShardingTransaction();
}
public void Rollback()
{
ShardingDbContextExecutor.Rollback();
}
public void Commit()
{
ShardingDbContextExecutor.Commit();
}
public IDictionary<string, IDataSourceDbContext> GetCurrentDbContexts()
{
return ShardingDbContextExecutor.GetCurrentDbContexts();
}
}
}

View File

@ -13,7 +13,7 @@ namespace ShardingCore.Sharding.Abstractions
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface ISupportShardingReadWrite
public interface IReadWriteSwitch
{
int ReadWriteSeparationPriority { get; set; }
bool ReadWriteSeparation { get; set; }

View File

@ -11,7 +11,7 @@ namespace ShardingCore.Sharding.Abstractions
* @Date: Saturday, 14 August 2021 21:47:11
* @Email: 326308290@qq.com
*/
public interface IShardingDbContext: IShardingTransaction,ISupportShardingReadWrite,ICurrentDbContextDiscover
public interface IShardingDbContext
{
/// <summary>
/// create DbContext
@ -30,7 +30,7 @@ namespace ShardingCore.Sharding.Abstractions
/// <returns></returns>
DbContext CreateGenericDbContext<T>(T entity) where T : class;
IVirtualDataSource GetVirtualDataSource();
IShardingDbContextExecutor GetShardingExecutor();
}

View File

@ -17,20 +17,12 @@ namespace ShardingCore.Sharding.Abstractions
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IShardingDbContextExecutor : IDisposable
public interface IShardingDbContextExecutor : IShardingTransaction,IReadWriteSwitch,ICurrentDbContextDiscover,IDisposable
#if !NETCOREAPP2_0
, IAsyncDisposable
#endif
{
/// <summary>
/// read write priority
/// </summary>
int ReadWriteSeparationPriority { get; set; }
/// <summary>
/// current db context open? ReadWriteSeparationPriority>Current.ReadWriteSeparationPriority
/// </summary>
bool ReadWriteSeparation { get; set; }
/// <summary>
/// has multi db context
/// </summary>
@ -55,51 +47,10 @@ namespace ShardingCore.Sharding.Abstractions
IVirtualDataSource GetVirtualDataSource();
Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess,
CancellationToken cancellationToken = new CancellationToken());
int SaveChanges(bool acceptAllChangesOnSuccess);
/// <summary>
/// default data source CurrentTransaction change will call this method without default
/// </summary>
void NotifyShardingTransaction();
/// <summary>
/// rollback
/// </summary>
void Rollback();
/// <summary>
/// commit
/// </summary>
void Commit();
IDictionary<string, IDataSourceDbContext> GetCurrentDbContexts();
#if !NETCOREAPP2_0
/// <summary>
/// rollback async
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task RollbackAsync(CancellationToken cancellationToken = new CancellationToken());
/// <summary>
/// commit async
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task CommitAsync(CancellationToken cancellationToken = new CancellationToken());
#if !NETCOREAPP3_0&&!NETSTANDARD2_0
// void CreateSavepoint(string name);
// Task CreateSavepointAsync(string name, CancellationToken cancellationToken = new CancellationToken());
// void RollbackToSavepoint(string name);
// Task RollbackToSavepointAsync(string name,CancellationToken cancellationToken = default(CancellationToken));
// void ReleaseSavepoint(string name);
// Task ReleaseSavepointAsync(string name, CancellationToken cancellationToken = default(CancellationToken));
#endif
#endif
}
}

View File

@ -22,12 +22,12 @@ namespace ShardingCore.Sharding.Abstractions
Task RollbackAsync(CancellationToken cancellationToken = new CancellationToken());
Task CommitAsync(CancellationToken cancellationToken = new CancellationToken());
#if !NETCOREAPP3_0 && !NETSTANDARD2_0
// void CreateSavepoint(string name);
// Task CreateSavepointAsync(string name, CancellationToken cancellationToken = new CancellationToken());
// void RollbackToSavepoint(string name);
// Task RollbackToSavepointAsync(string name,CancellationToken cancellationToken = default(CancellationToken));
// void ReleaseSavepoint(string name);
// Task ReleaseSavepointAsync(string name, CancellationToken cancellationToken = default(CancellationToken));
void CreateSavepoint(string name);
Task CreateSavepointAsync(string name, CancellationToken cancellationToken = new CancellationToken());
void RollbackToSavepoint(string name);
Task RollbackToSavepointAsync(string name,CancellationToken cancellationToken = default(CancellationToken));
void ReleaseSavepoint(string name);
Task ReleaseSavepointAsync(string name, CancellationToken cancellationToken = default(CancellationToken));
#endif
#endif
}

View File

@ -22,9 +22,9 @@ namespace ShardingCore.Sharding.MergeContexts
IRewriteResult rewriteResult)
{
var shardingDbContext = mergeQueryCompilerContext.GetShardingDbContext();
var maxParallelExecuteCount =
shardingDbContext.GetVirtualDataSource().ConfigurationParams.MaxQueryConnectionsLimit;
var connectionMode = shardingDbContext.GetVirtualDataSource().ConfigurationParams.ConnectionMode;
var virtualDataSource = shardingDbContext.GetShardingExecutor().GetVirtualDataSource();
var maxParallelExecuteCount =virtualDataSource.ConfigurationParams.MaxQueryConnectionsLimit;
var connectionMode = virtualDataSource.ConfigurationParams.ConnectionMode;
IComparer<string> shardingTailComparer = Comparer<string>.Default;
bool sameWithTailComparer = true;
bool sequenceQuery = false;

View File

@ -60,14 +60,6 @@ namespace ShardingCore.Sharding.MergeContexts
var combineQueryable = mergeQueryCompilerContext.GetQueryCombineResult().GetCombineQueryable();
if (skip is < 0)
{
throw new ShardingCoreException($"queryable:{mergeQueryCompilerContext.GetQueryCombineResult().GetQueryCompilerContext().GetQueryExpression().ShardingPrint()} skip should >= 0");
}
if (take is < 0)
{
throw new ShardingCoreException($"queryable:{mergeQueryCompilerContext.GetQueryCombineResult().GetQueryCompilerContext().GetQueryExpression().ShardingPrint()} take should >= 0");
}
//去除分页,获取前Take+Skip数量
var reWriteQueryable = combineQueryable;
if (take.HasValue || skip.HasValue)

View File

@ -118,14 +118,12 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
IDbContextCreator dbContextCreator,
ActualConnectionStringManager actualConnectionStringManager)
{
var shardingDbContext = (IShardingDbContext)shardingShellDbContext;
DataSourceName = dataSourceName;
IsDefault = isDefault;
_shardingShellDbContext = shardingShellDbContext;
_shardingRuntimeContext = shardingShellDbContext.GetShardingRuntimeContext();
DbContextType = shardingShellDbContext.GetType();
_virtualDataSource = shardingDbContext
.GetVirtualDataSource();
_virtualDataSource = _shardingRuntimeContext.GetVirtualDataSource();
_dbContextCreator = dbContextCreator;
_actualConnectionStringManager = actualConnectionStringManager;
}
@ -372,55 +370,55 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
await CurrentDbContextTransaction.CommitAsync(cancellationToken);
}
#if !NETCOREAPP3_0&&!NETSTANDARD2_0
// public void CreateSavepoint(string name)
// {
// if (IsDefault)
// return;
// CurrentDbContextTransaction?.CreateSavepoint(name);
// }
//
// public async Task CreateSavepointAsync(string name,
// CancellationToken cancellationToken = new CancellationToken())
// {
// cancellationToken.ThrowIfCancellationRequested();
// if (IsDefault)
// return;
// if (CurrentDbContextTransaction != null)
// await CurrentDbContextTransaction.CreateSavepointAsync(name, cancellationToken);
// }
//
// public void RollbackToSavepoint(string name)
// {
// if (IsDefault)
// return;
// CurrentDbContextTransaction?.RollbackToSavepoint(name);
// }
//
// public async Task RollbackToSavepointAsync(string name,
// CancellationToken cancellationToken = default(CancellationToken))
// {
// cancellationToken.ThrowIfCancellationRequested();
// if (IsDefault)
// return;
// if (CurrentDbContextTransaction != null)
// await CurrentDbContextTransaction.RollbackToSavepointAsync(name, cancellationToken);
// }
//
// public void ReleaseSavepoint(string name)
// {
// if (IsDefault)
// return;
// CurrentDbContextTransaction?.ReleaseSavepoint(name);
// }
//
// public async Task ReleaseSavepointAsync(string name, CancellationToken cancellationToken = default(CancellationToken))
// {
// cancellationToken.ThrowIfCancellationRequested();
// if (IsDefault)
// return;
// if (CurrentDbContextTransaction != null)
// await CurrentDbContextTransaction.ReleaseSavepointAsync(name, cancellationToken);
// }
public void CreateSavepoint(string name)
{
if (IsDefault)
return;
CurrentDbContextTransaction?.CreateSavepoint(name);
}
public async Task CreateSavepointAsync(string name,
CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
if (IsDefault)
return;
if (CurrentDbContextTransaction != null)
await CurrentDbContextTransaction.CreateSavepointAsync(name, cancellationToken);
}
public void RollbackToSavepoint(string name)
{
if (IsDefault)
return;
CurrentDbContextTransaction?.RollbackToSavepoint(name);
}
public async Task RollbackToSavepointAsync(string name,
CancellationToken cancellationToken = default(CancellationToken))
{
cancellationToken.ThrowIfCancellationRequested();
if (IsDefault)
return;
if (CurrentDbContextTransaction != null)
await CurrentDbContextTransaction.RollbackToSavepointAsync(name, cancellationToken);
}
public void ReleaseSavepoint(string name)
{
if (IsDefault)
return;
CurrentDbContextTransaction?.ReleaseSavepoint(name);
}
public async Task ReleaseSavepointAsync(string name, CancellationToken cancellationToken = default(CancellationToken))
{
cancellationToken.ThrowIfCancellationRequested();
if (IsDefault)
return;
if (CurrentDbContextTransaction != null)
await CurrentDbContextTransaction.ReleaseSavepointAsync(name, cancellationToken);
}
#endif
#endif

View File

@ -50,12 +50,12 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
Task RollbackAsync(CancellationToken cancellationToken = new CancellationToken());
Task CommitAsync(CancellationToken cancellationToken = new CancellationToken());
#if !NETCOREAPP3_0&&!NETSTANDARD2_0
// void CreateSavepoint(string name);
// Task CreateSavepointAsync(string name, CancellationToken cancellationToken = new CancellationToken());
// void RollbackToSavepoint(string name);
// Task RollbackToSavepointAsync(string name,CancellationToken cancellationToken = default(CancellationToken));
// void ReleaseSavepoint(string name);
// Task ReleaseSavepointAsync(string name, CancellationToken cancellationToken = default(CancellationToken));
void CreateSavepoint(string name);
Task CreateSavepointAsync(string name, CancellationToken cancellationToken = new CancellationToken());
void RollbackToSavepoint(string name);
Task RollbackToSavepointAsync(string name,CancellationToken cancellationToken = default(CancellationToken));
void ReleaseSavepoint(string name);
Task ReleaseSavepointAsync(string name, CancellationToken cancellationToken = default(CancellationToken));
#endif
#endif

View File

@ -36,9 +36,11 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
{
private readonly ILogger<ShardingDbContextExecutor> _logger;
private readonly DbContext _shardingDbContext;
//private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, DbContext>> _dbContextCaches = new ConcurrentDictionary<string, ConcurrentDictionary<string, DbContext>>();
private readonly ConcurrentDictionary<string, IDataSourceDbContext> _dbContextCaches = new ConcurrentDictionary<string, IDataSourceDbContext>();
private readonly ConcurrentDictionary<string, IDataSourceDbContext> _dbContextCaches =
new ConcurrentDictionary<string, IDataSourceDbContext>();
private readonly IShardingRuntimeContext _shardingRuntimeContext;
private readonly ShardingConfigOptions _shardingConfigOptions;
private readonly IVirtualDataSource _virtualDataSource;
@ -62,7 +64,6 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
}
public ShardingDbContextExecutor(DbContext shardingDbContext)
{
_shardingDbContext = shardingDbContext;
@ -78,46 +79,54 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
_routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
var shardingReadWriteManager = _shardingRuntimeContext.GetShardingReadWriteManager();
var shardingProvider = _shardingRuntimeContext.GetShardingProvider();
var loggerFactory=shardingProvider.GetRequiredService<ILoggerFactory>();
_logger=loggerFactory.CreateLogger<ShardingDbContextExecutor>();
_actualConnectionStringManager = new ActualConnectionStringManager(shardingReadWriteManager,_virtualDataSource);
var loggerFactory = shardingProvider.GetRequiredService<ILoggerFactory>();
_logger = loggerFactory.CreateLogger<ShardingDbContextExecutor>();
_actualConnectionStringManager =
new ActualConnectionStringManager(shardingReadWriteManager, _virtualDataSource);
}
#region create db context
private IDataSourceDbContext GetDataSourceDbContext(string dataSourceName)
{
return _dbContextCaches.GetOrAdd(dataSourceName, dsname => new DataSourceDbContext(dsname, _virtualDataSource.IsDefault(dsname), _shardingDbContext, _dbContextCreator, _actualConnectionStringManager));
return _dbContextCaches.GetOrAdd(dataSourceName,
dsname => new DataSourceDbContext(dsname, _virtualDataSource.IsDefault(dsname), _shardingDbContext,
_dbContextCreator, _actualConnectionStringManager));
}
/// <summary>
/// has more db context
/// </summary>
public bool IsMultiDbContext =>
_dbContextCaches.Count > 1 || _dbContextCaches.Sum(o => o.Value.DbContextCount) > 1;
public DbContext CreateDbContext(CreateDbContextStrategyEnum strategy, string dataSourceName, IRouteTail routeTail)
public DbContext CreateDbContext(CreateDbContextStrategyEnum strategy, string dataSourceName,
IRouteTail routeTail)
{
if (CreateDbContextStrategyEnum.ShareConnection==strategy)
if (CreateDbContextStrategyEnum.ShareConnection == strategy)
{
var dataSourceDbContext = GetDataSourceDbContext(dataSourceName);
return dataSourceDbContext.CreateDbContext(routeTail);
}
else
{
var parallelDbContextOptions = CreateParallelDbContextOptions(dataSourceName,strategy);
var dbContext = _dbContextCreator.CreateDbContext(_shardingDbContext, parallelDbContextOptions, routeTail);
var parallelDbContextOptions = CreateParallelDbContextOptions(dataSourceName, strategy);
var dbContext =
_dbContextCreator.CreateDbContext(_shardingDbContext, parallelDbContextOptions, routeTail);
dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
return dbContext;
}
}
private DbContextOptions CreateParallelDbContextOptions(string dataSourceName,CreateDbContextStrategyEnum strategy)
private DbContextOptions CreateParallelDbContextOptions(string dataSourceName,
CreateDbContextStrategyEnum strategy)
{
var dbContextOptionBuilder =_shardingRuntimeContext.GetDbContextOptionBuilderCreator().CreateDbContextOptionBuilder();
var connectionString = _actualConnectionStringManager.GetConnectionString(dataSourceName, CreateDbContextStrategyEnum.IndependentConnectionWrite==strategy);
_virtualDataSource.UseDbContextOptionsBuilder(connectionString, dbContextOptionBuilder).UseShardingOptions(_shardingRuntimeContext);
var dbContextOptionBuilder = _shardingRuntimeContext.GetDbContextOptionBuilderCreator()
.CreateDbContextOptionBuilder();
var connectionString = _actualConnectionStringManager.GetConnectionString(dataSourceName,
CreateDbContextStrategyEnum.IndependentConnectionWrite == strategy);
_virtualDataSource.UseDbContextOptionsBuilder(connectionString, dbContextOptionBuilder)
.UseShardingOptions(_shardingRuntimeContext);
return dbContextOptionBuilder.Options;
}
@ -125,9 +134,10 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
public DbContext CreateGenericDbContext<TEntity>(TEntity entity) where TEntity : class
{
var dataSourceName = GetDataSourceName(entity);
var tail = GetTableTail(dataSourceName,entity);
var tail = GetTableTail(dataSourceName, entity);
return CreateDbContext(CreateDbContextStrategyEnum.ShareConnection, dataSourceName, _routeTailFactory.Create(tail));
return CreateDbContext(CreateDbContextStrategyEnum.ShareConnection, dataSourceName,
_routeTailFactory.Create(tail));
}
public IVirtualDataSource GetVirtualDataSource()
@ -140,18 +150,19 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
return _dataSourceRouteManager.GetDataSourceName(entity);
}
private string GetTableTail<TEntity>(string dataSourceName,TEntity entity) where TEntity : class
private string GetTableTail<TEntity>(string dataSourceName, TEntity entity) where TEntity : class
{
if (!_entityMetadataManager.IsShardingTable(entity.GetType()))
return string.Empty;
return _tableRouteManager.GetTableTail(dataSourceName,entity);
return _tableRouteManager.GetTableTail(dataSourceName, entity);
}
#endregion
#region transaction
public async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
public async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess,
CancellationToken cancellationToken = new CancellationToken())
{
int i = 0;
foreach (var dbContextCache in _dbContextCaches)
@ -204,10 +215,11 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
}
catch (Exception e)
{
_logger.LogError(e, "commit error.");
_logger.LogError(e, $"{nameof(Commit)} error.");
if (i == 0)
throw;
}
i++;
}
@ -252,15 +264,71 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
}
catch (Exception e)
{
_logger.LogError(e, "commit error.");
_logger.LogError(e, $"{nameof(CommitAsync)} error.");
if (i == 0)
throw;
}
i++;
}
AutoUseWriteConnectionString();
}
#if !NETCOREAPP3_0 && !NETSTANDARD2_0
public void CreateSavepoint(string name)
{
foreach (var dbContextCache in _dbContextCaches)
{
dbContextCache.Value.CreateSavepoint(name);
}
}
public async Task CreateSavepointAsync(string name,
CancellationToken cancellationToken = new CancellationToken())
{
foreach (var dbContextCache in _dbContextCaches)
{
await dbContextCache.Value.CreateSavepointAsync(name, cancellationToken);
}
}
public void RollbackToSavepoint(string name)
{
foreach (var dbContextCache in _dbContextCaches)
{
dbContextCache.Value.RollbackToSavepoint(name);
}
}
public async Task RollbackToSavepointAsync(string name,
CancellationToken cancellationToken = default(CancellationToken))
{
foreach (var dbContextCache in _dbContextCaches)
{
await dbContextCache.Value.RollbackToSavepointAsync(name, cancellationToken);
}
}
public void ReleaseSavepoint(string name)
{
foreach (var dbContextCache in _dbContextCaches)
{
dbContextCache.Value.ReleaseSavepoint(name);
}
}
public async Task ReleaseSavepointAsync(string name,
CancellationToken cancellationToken = default(CancellationToken))
{
foreach (var dbContextCache in _dbContextCaches)
{
await dbContextCache.Value.ReleaseSavepointAsync(name, cancellationToken);
}
}
#endif
public async ValueTask DisposeAsync()
{
foreach (var dbContextCache in _dbContextCaches)
@ -284,4 +352,4 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
}
}
}
}
}

View File

@ -169,7 +169,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
hasQueryCompilerExecutor = _queryEntities.Keys.All(o => !_entityMetadataManager.IsSharding(o));
if (hasQueryCompilerExecutor.Value)
{
var virtualDataSource = _shardingDbContext.GetVirtualDataSource();
var virtualDataSource = _shardingRuntimeContext.GetVirtualDataSource();
var routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
var strategy = !IsParallelQuery()

View File

@ -8,6 +8,7 @@ using ShardingCore.Core.ShardingConfigurations.Abstractions;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.MergeContexts;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
@ -46,6 +47,15 @@ namespace ShardingCore.Sharding
private void CheckMergeContext(IMergeQueryCompilerContext mergeQueryCompilerContext,IParseResult parseResult,IRewriteResult rewriteResult,IOptimizeResult optimizeResult)
{
var paginationContext = parseResult.GetPaginationContext();
if (paginationContext.Skip is < 0)
{
throw new ShardingCoreException($"queryable skip should >= 0");
}
if (paginationContext.Take is < 0)
{
throw new ShardingCoreException($"queryable take should >= 0");
}
if (!mergeQueryCompilerContext.IsEnumerableQuery())
{
if ((nameof(Enumerable.Last)==mergeQueryCompilerContext.GetQueryMethodName()||nameof(Enumerable.LastOrDefault)==mergeQueryCompilerContext.GetQueryMethodName())&&parseResult.GetOrderByContext().PropertyOrders.IsEmpty())