diff --git a/README.md b/README.md index dd2aa5a0..e8039e77 100644 --- a/README.md +++ b/README.md @@ -291,9 +291,44 @@ AbstractSimpleShardingYearKeyLongVirtualTableRoute |按时间戳 |yyyy | `>,>=,< ## 批量操作 -批量操作将对应的dbcontext和数据进行分离由用户自己选择第三方框架比如zzz进行批量操作或者batchextension +批量操作将对应的dbcontext和数据进行分离由用户自己选择第三方框架比如[`Z.EntityFramework.Plus.EFCore`](https://github.com/zzzprojects/EntityFramework-Plus) 进行批量操作或者 [`EFCore.BulkExtensions`](https://github.com/borisdj/EFCore.BulkExtensions) ,支持一切三方批量框架 ```c# -后期支持 +var list = new List(); +///通过集合返回出对应的k-v归集通过事务开启 + var dbContexts = _defaultTableDbContext.BulkShardingEnumerable(list); + + using (var tran = _defaultTableDbContext.Database.BeginTransaction()) + { + dbContexts.ForEach(kv => + { + kv.Key.BulkInsert(kv.Value); + }); + dbContexts.ForEach(kv => + { + kv.Key.BulkDelete(kv.Value); + }); + dbContexts.ForEach(kv => + { + kv.Key.BulkUpdate(kv.Value); + }); + _defaultTableDbContext.SaveChanges(); + tran.Commit(); + } + + + var dbContext2s = _defaultTableDbContext.BulkShardingExpression(o => o.Age > 100); + using (var tran = _defaultTableDbContext.Database.BeginTransaction()) + { + dbContext2s.ForEach(dbContext => + { + dbContext.Set().Where(o => o.Age > 100).Update(o => new SysUserMod() + { + AgeGroup = 1000 + }); + }); + _defaultTableDbContext.SaveChanges(); + tran.Commit(); + } ``` ## 手动路由 ```c# @@ -327,10 +362,20 @@ ctor inject IShardingRouteManager shardingRouteManager [参考](https://github.com/xuejmnet/sharding-core/tree/main/samples/Samples.AutoByDate.SqlServer) ## 事务 -默认savechanges支持事务 +1.默认savechanges支持事务 ```c# + await _defaultShardingDbContext.SaveChangesAsync(); - + +``` +2.手动开启事务 [请参考微软](https://docs.microsoft.com/zh-cn/ef/core/saving/transactions) +```c# + using (var tran = _defaultTableDbContext.Database.BeginTransaction()) + { + ........ + _defaultTableDbContext.SaveChanges(); + tran.Commit(); + } ``` ## 读写分离 该框架目前已经支持单node的读写分离,后续框架将支持多node的读 @@ -398,7 +443,19 @@ var shardingPageResultAsync = await _defaultTableDbContext.Set().Ord # 注意事项 -该库的追踪是基于adonet的MARS(MultipleActiveResultSets=True;)所以基本不支持该特性的无法支持完美追踪 +使用该框架需要注意两点如果你的shardingdbcontext重写了以下服务可能无法使用 如果还想使用需要自己重写扩展[请参考](https://github.com/xuejmnet/sharding-core/blob/main/src/ShardingCore/DIExtension.cs) +1.shardingdbcontext +```c# + return optionsBuilder.ReplaceService() + .ReplaceService() + .ReplaceService(); +``` +2.defaultdbcontext +```c# +return optionsBuilder.ReplaceService() + .ReplaceService>(); + +``` ,目前框架采用AppDomain.CurrentDomain.GetAssemblies(); 可能会导致程序集未被加载所以尽可能在api层加载所需要的dll 使用时需要注意 diff --git a/nuget-publish.bat b/nuget-publish.bat index a66f0065..e7ce0dc0 100644 --- a/nuget-publish.bat +++ b/nuget-publish.bat @@ -1,8 +1,8 @@ :start ::定义版本 -set EFCORE2=2.2.0.17 -set EFCORE3=3.2.0.17 -set EFCORE5=5.2.0.17 +set EFCORE2=2.2.0.19 +set EFCORE3=3.2.0.19 +set EFCORE5=5.2.0.19 ::删除所有bin与obj下的文件 @echo off diff --git a/samples/Sample.SqlServer/Controllers/ValuesController.cs b/samples/Sample.SqlServer/Controllers/ValuesController.cs index 37864e99..e1751e07 100644 --- a/samples/Sample.SqlServer/Controllers/ValuesController.cs +++ b/samples/Sample.SqlServer/Controllers/ValuesController.cs @@ -5,12 +5,14 @@ using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Logging; using Sample.SqlServer.DbContexts; using Sample.SqlServer.Domain.Entities; using ShardingCore.Core.QueryRouteManagers.Abstractions; using ShardingCore.DbContexts.VirtualDbContexts; using ShardingCore.Extensions; +using Z.EntityFramework.Plus; namespace Sample.SqlServer.Controllers { @@ -26,7 +28,7 @@ namespace Sample.SqlServer.Controllers private readonly DefaultShardingDbContext _defaultTableDbContext; private readonly IShardingRouteManager _shardingRouteManager; - public ValuesController(DefaultShardingDbContext defaultTableDbContext,IShardingRouteManager shardingRouteManager) + public ValuesController(DefaultShardingDbContext defaultTableDbContext, IShardingRouteManager shardingRouteManager) { _defaultTableDbContext = defaultTableDbContext; _shardingRouteManager = shardingRouteManager; @@ -36,17 +38,17 @@ namespace Sample.SqlServer.Controllers public async Task Get() { var sql = from ut in _defaultTableDbContext.Set() - join u in _defaultTableDbContext.Set() - on ut.UserId equals u.Id - select new - { - ut.Id, - userId=u.Id - }; - var listAsync =await sql.ToListAsync(); + join u in _defaultTableDbContext.Set() + on ut.UserId equals u.Id + select new + { + ut.Id, + userId = u.Id + }; + var listAsync = await sql.ToListAsync(); var resultx112331tt = await _defaultTableDbContext.Set().CountAsync(); var resultx112331 = await _defaultTableDbContext.Set().CountAsync(); - var resultx11233411 = _defaultTableDbContext.Set().Count(); + var resultx11233411 = _defaultTableDbContext.Set().Count(); var resultx11231 = await _defaultTableDbContext.Set().Where(o => o.Age == 198198).Select(o => o.Id).ContainsAsync("1981"); var resultx1121 = await _defaultTableDbContext.Set().Where(o => o.Id == "198").SumAsync(o => o.Age); var resultx111 = await _defaultTableDbContext.Set().FirstOrDefaultAsync(o => o.Id == "198"); @@ -88,11 +90,11 @@ namespace Sample.SqlServer.Controllers return Ok(); } [HttpGet] - public async Task Get1([FromQuery] int p,[FromQuery]int s) + public async Task Get1([FromQuery] int p, [FromQuery] int s) { Stopwatch sp = new Stopwatch(); sp.Start(); - var shardingPageResultAsync = await _defaultTableDbContext.Set().OrderBy(o=>o.Age).ToShardingPageAsync(p, s); + var shardingPageResultAsync = await _defaultTableDbContext.Set().OrderBy(o => o.Age).ToShardingPageAsync(p, s); sp.Stop(); return Ok(new { @@ -101,11 +103,11 @@ namespace Sample.SqlServer.Controllers }); } [HttpGet] - public IActionResult Get2([FromQuery] int p,[FromQuery]int s) + public IActionResult Get2([FromQuery] int p, [FromQuery] int s) { Stopwatch sp = new Stopwatch(); sp.Start(); - var shardingPageResultAsync = _defaultTableDbContext.Set().OrderBy(o=>o.Age).ToShardingPage(p, s); + var shardingPageResultAsync = _defaultTableDbContext.Set().OrderBy(o => o.Age).ToShardingPage(p, s); sp.Stop(); return Ok(new { @@ -113,5 +115,47 @@ namespace Sample.SqlServer.Controllers shardingPageResultAsync }); } + [HttpGet] + public IActionResult Get3() + { + + var dbContext2s = _defaultTableDbContext.BulkShardingExpression(o => o.Age > 100); + using (var tran = _defaultTableDbContext.Database.BeginTransaction()) + { + dbContext2s.ForEach(dbContext => + { + dbContext.Set().Where(o => o.Age > 100).Update(o => new SysUserMod() + { + AgeGroup = 1000 + }); + }); + _defaultTableDbContext.SaveChanges(); + tran.Commit(); + } + var list = new List(); + var dbContexts = _defaultTableDbContext.BulkShardingEnumerable(list); + + using (var tran = _defaultTableDbContext.Database.BeginTransaction()) + { + dbContexts.ForEach(kv => + { + kv.Key.BulkInsert(kv.Value); + }); + dbContexts.ForEach(kv => + { + kv.Key.BulkDelete(kv.Value); + }); + dbContexts.ForEach(kv => + { + kv.Key.BulkUpdate(kv.Value); + }); + _defaultTableDbContext.SaveChanges(); + tran.Commit(); + } + + + return Ok(); + } + } -} \ No newline at end of file +} diff --git a/samples/Sample.SqlServer/Sample.SqlServer.csproj b/samples/Sample.SqlServer/Sample.SqlServer.csproj index f6791120..4ca20ab2 100644 --- a/samples/Sample.SqlServer/Sample.SqlServer.csproj +++ b/samples/Sample.SqlServer/Sample.SqlServer.csproj @@ -6,8 +6,10 @@ + + diff --git a/samples/Sample.SqlServer/Startup.cs b/samples/Sample.SqlServer/Startup.cs index 33791e3c..4b6102cf 100644 --- a/samples/Sample.SqlServer/Startup.cs +++ b/samples/Sample.SqlServer/Startup.cs @@ -3,12 +3,14 @@ using Microsoft.AspNetCore.Hosting; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Query; using Microsoft.EntityFrameworkCore.SqlServer.Query.Internal; +using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Sample.SqlServer.DbContexts; using Sample.SqlServer.Shardings; using ShardingCore; +using ShardingCore.EFCores; namespace Sample.SqlServer { @@ -30,8 +32,8 @@ namespace Sample.SqlServer o => o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDB;Integrated Security=True;") , op => { - op.EnsureCreatedWithOutShardingTable = false; - op.CreateShardingTableOnStart = false; + op.EnsureCreatedWithOutShardingTable = true; + op.CreateShardingTableOnStart = true; op.UseShardingOptionsBuilder( (connection, builder) => builder.UseSqlServer(connection).UseLoggerFactory(efLogger),//使用dbconnection创建dbcontext支持事务 (conStr,builder) => builder.UseSqlServer(conStr).UseLoggerFactory(efLogger) diff --git a/samples/Samples.AbpSharding/AbstractShardingAbpDbContext.cs b/samples/Samples.AbpSharding/AbstractShardingAbpDbContext.cs index e94ba5c9..a1d30b32 100644 --- a/samples/Samples.AbpSharding/AbstractShardingAbpDbContext.cs +++ b/samples/Samples.AbpSharding/AbstractShardingAbpDbContext.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Data.Common; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Abp.EntityFrameworkCore; @@ -21,7 +23,7 @@ using ShardingCore.Sharding.Abstractions; namespace Samples.AbpSharding { - public abstract class AbstractShardingAbpDbContext : AbpDbContext, IShardingTableDbContext where T : AbpDbContext, IShardingTableDbContext + public abstract class AbstractShardingAbpDbContext : AbpDbContext, IShardingDbContext where T : AbpDbContext, IShardingTableDbContext { @@ -127,6 +129,28 @@ namespace Samples.AbpSharding return GetDbContext(true, _routeTailFactory.Create(tail)); } + + public IEnumerable CreateExpressionDbContext(Expression> where) + where TEntity : class + { + if (typeof(TEntity).IsShardingTable()) + { + var physicTable = _virtualTableManager.GetVirtualTable(ShardingDbContextType, typeof(TEntity)).RouteTo(new TableRouteConfig(predicate:where)); + if (physicTable.IsEmpty()) + throw new ShardingCoreException($"{where.ShardingPrint()} cant found any physic table"); + return physicTable.Select(o => GetDbContext(true, _routeTailFactory.Create(o.Tail))); + } + else + { + return new[] {GetDbContext(true, _routeTailFactory.Create(string.Empty))}; + } + } + + public void UseShardingTransaction(DbTransaction transaction) + { + throw new NotImplementedException(); + } + public override EntityEntry Add(object entity) { return CreateGenericDbContext(entity).Add(entity); diff --git a/src/ShardingCore/DIExtension.cs b/src/ShardingCore/DIExtension.cs index 8cdc6865..9553c8f0 100644 --- a/src/ShardingCore/DIExtension.cs +++ b/src/ShardingCore/DIExtension.cs @@ -12,6 +12,7 @@ using ShardingCore.Sharding; using ShardingCore.Sharding.Abstractions; using ShardingCore.TableCreator; using System; +using Microsoft.EntityFrameworkCore.Storage; using ShardingCore.Core.QueryRouteManagers; using ShardingCore.Core.QueryRouteManagers.Abstractions; using ShardingCore.Core.ShardingPage; @@ -38,7 +39,7 @@ namespace ShardingCore ServiceLifetime contextLifetime = ServiceLifetime.Scoped, ServiceLifetime optionsLifetime = ServiceLifetime.Scoped) where TActualDbContext : DbContext, IShardingTableDbContext - where TShardingDbContext : DbContext, IShardingTableDbContext + where TShardingDbContext : DbContext, IShardingDbContext { if (configure == null) throw new ArgumentNullException($"AddShardingDbContext params is null :{nameof(configure)}"); @@ -60,7 +61,13 @@ namespace ShardingCore Action shardingOptionAction = option => { optionsAction?.Invoke(option); +#if !EFCORE2 option.UseSharding(); + +#endif +#if EFCORE2 + option.UseSharding(); +#endif }; services.AddDbContext(shardingOptionAction, contextLifetime, optionsLifetime); services.AddInternalShardingCore(); @@ -75,7 +82,7 @@ namespace ShardingCore ServiceLifetime contextLifetime = ServiceLifetime.Scoped, ServiceLifetime optionsLifetime = ServiceLifetime.Scoped) where TActualDbContext : DbContext, IShardingTableDbContext - where TShardingDbContext : DbContext, IShardingTableDbContext + where TShardingDbContext : DbContext, IShardingDbContext { if (configure == null) throw new ArgumentNullException($"AddShardingDbContext params is null :{nameof(configure)}"); @@ -97,7 +104,13 @@ namespace ShardingCore Action shardingOptionAction = (sp, option) => { optionsAction?.Invoke(sp,option); +#if !EFCORE2 option.UseSharding(); + +#endif +#if EFCORE2 + option.UseSharding(); +#endif }; services.AddDbContext(shardingOptionAction, contextLifetime, optionsLifetime); services.AddInternalShardingCore(); @@ -132,12 +145,25 @@ namespace ShardingCore services.AddSingleton(); return services; } - +#if !EFCORE2 internal static DbContextOptionsBuilder UseSharding(this DbContextOptionsBuilder optionsBuilder) { return optionsBuilder.ReplaceService() - .ReplaceService(); + .ReplaceService() + .ReplaceService(); } + +#endif +#if EFCORE2 + internal static DbContextOptionsBuilder UseSharding(this DbContextOptionsBuilder optionsBuilder) where TShardingDbContext : DbContext, IShardingDbContext + { + return optionsBuilder.ReplaceService() + .ReplaceService() + .ReplaceService>(); + } + +#endif + internal static DbContextOptionsBuilder UseInnerDbContextSharding(this DbContextOptionsBuilder optionsBuilder) where TShardingDbContext:DbContext,IShardingDbContext { return optionsBuilder.ReplaceService() diff --git a/src/ShardingCore/DbContexts/EFCore2DbContextLocation.cs b/src/ShardingCore/DbContexts/EFCore2DbContextLocation.cs new file mode 100644 index 00000000..48a537af --- /dev/null +++ b/src/ShardingCore/DbContexts/EFCore2DbContextLocation.cs @@ -0,0 +1,20 @@ +#if EFCORE2 +using System; +using System.Collections.Generic; +using System.Text; + +namespace ShardingCore.DbContexts +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/5 21:29:47 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + class EFCore2DbContextLocation + { + } +} + +#endif \ No newline at end of file diff --git a/src/ShardingCore/EFCores/ShardingRelationalConnection.cs b/src/ShardingCore/EFCores/ShardingRelationalConnection.cs new file mode 100644 index 00000000..5758a4e1 --- /dev/null +++ b/src/ShardingCore/EFCores/ShardingRelationalConnection.cs @@ -0,0 +1,225 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Data.Common; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Internal; +using Microsoft.EntityFrameworkCore.Query.Internal; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore.Storage.Internal; +using Microsoft.Extensions.DependencyInjection; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Abstractions; + +namespace ShardingCore.EFCores +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/5 15:41:20 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class ShardingRelationalConnection : IRelationalConnection + { + private readonly IRelationalConnection _relationalConnection; + + +#if !EFCORE2 + public ShardingRelationalConnection(IRelationalConnection _relationalConnection, DbTransaction transaction) + { + this._relationalConnection = _relationalConnection; + ((IShardingTransaction)Context).UseShardingTransaction(transaction); + } + +#endif +#if EFCORE2 + private readonly Type _dbContextType; + public ShardingRelationalConnection(IRelationalConnection _relationalConnection,DbTransaction transaction,Type dbContextType) + { + this._relationalConnection = _relationalConnection; + _dbContextType = dbContextType; + ((IShardingTransaction)Context).UseShardingTransaction(transaction); + } +#endif + + public void ResetState() + { + _relationalConnection.ResetState(); + } + +#if !EFCORE2 + public Task ResetStateAsync(CancellationToken cancellationToken = new CancellationToken()) + { + return _relationalConnection.ResetStateAsync(cancellationToken); + } + +#endif + + public IDbContextTransaction BeginTransaction() + { + return _relationalConnection.BeginTransaction(); + } + + public Task BeginTransactionAsync(CancellationToken cancellationToken = new CancellationToken()) + { + return _relationalConnection.BeginTransactionAsync(cancellationToken); + } + + public void CommitTransaction() + { + _relationalConnection.CommitTransaction(); + } + + + public void RollbackTransaction() + { + _relationalConnection.RollbackTransaction(); + } +#if EFCORE5 + public IDbContextTransaction UseTransaction(DbTransaction transaction, Guid transactionId) + { + var dbContextTransaction = _relationalConnection.UseTransaction(transaction, transactionId); + ((IShardingTransaction)Context).UseShardingTransaction(transaction); + return dbContextTransaction; + } + public async Task UseTransactionAsync(DbTransaction transaction, Guid transactionId, + CancellationToken cancellationToken = new CancellationToken()) + { + var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, transactionId, cancellationToken); + ((IShardingTransaction)Context).UseShardingTransaction(transaction); + return dbContextTransaction; + } + + public Task CommitTransactionAsync(CancellationToken cancellationToken = new CancellationToken()) + { + return _relationalConnection.CommitTransactionAsync(cancellationToken); + } + public Task RollbackTransactionAsync(CancellationToken cancellationToken = new CancellationToken()) + { + return _relationalConnection.RollbackTransactionAsync(cancellationToken); + } +#endif + +#if !EFCORE5 + public bool IsMultipleActiveResultSetsEnabled => _relationalConnection.IsMultipleActiveResultSetsEnabled; + + +# endif + IDbContextTransaction IRelationalConnection.CurrentTransaction => _relationalConnection.CurrentTransaction; + + IDbContextTransaction IDbContextTransactionManager.CurrentTransaction => _relationalConnection.CurrentTransaction; + + + public SemaphoreSlim Semaphore => _relationalConnection.Semaphore; + + public bool Open(bool errorsExpected = false) + { + return _relationalConnection.Open(errorsExpected); + } + + public Task OpenAsync(CancellationToken cancellationToken, bool errorsExpected = false) + { + return _relationalConnection.OpenAsync(cancellationToken, errorsExpected); + } + + public bool Close() + { + return _relationalConnection.Close(); + } + + + + public DbConnection DbConnection => _relationalConnection.DbConnection; + + public DbContext Context => +#if !EFCORE2 + _relationalConnection.Context; +#endif +#if EFCORE2 + GetDbContext(); + + private DbContext GetDbContext() + { + var namedConnectionStringResolver = ((RelationalConnectionDependencies)_relationalConnection.GetPropertyValue("Dependencies")).ConnectionStringResolver; + var serviceProvider = (IServiceProvider)namedConnectionStringResolver.GetPropertyValue("ApplicationServiceProvider"); + var dbContext = (DbContext)serviceProvider.GetService(_dbContextType); + return dbContext; + } + + + public void RegisterBufferable(IBufferable bufferable) + { + _relationalConnection.RegisterBufferable(bufferable); + } + + public Task RegisterBufferableAsync(IBufferable bufferable, CancellationToken cancellationToken) + { + return _relationalConnection.RegisterBufferableAsync(bufferable, cancellationToken); + } +#endif + public Guid ConnectionId => _relationalConnection.ConnectionId; + + public int? CommandTimeout + { + get + { + return _relationalConnection.CommandTimeout; + } + set + { + _relationalConnection.CommandTimeout = value; + } + } + + public IDbContextTransaction BeginTransaction(IsolationLevel isolationLevel) + { + return _relationalConnection.BeginTransaction(isolationLevel); + } + + public Task BeginTransactionAsync(IsolationLevel isolationLevel, + CancellationToken cancellationToken = new CancellationToken()) + { + return _relationalConnection.BeginTransactionAsync(isolationLevel, cancellationToken); + } + + public IDbContextTransaction UseTransaction(DbTransaction transaction) + { + var dbContextTransaction = _relationalConnection.UseTransaction(transaction); + ((IShardingTransaction)Context).UseShardingTransaction(transaction); + return dbContextTransaction; + } + + + public void Dispose() + { + _relationalConnection.Dispose(); + } + + + public string ConnectionString => _relationalConnection.ConnectionString; +#if !EFCORE2 + + public async Task UseTransactionAsync(DbTransaction transaction, CancellationToken cancellationToken = new CancellationToken()) + { + var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, cancellationToken); + ((IShardingTransaction)Context).UseShardingTransaction(transaction); + return dbContextTransaction; + + } + + public Task CloseAsync() + { + return _relationalConnection.CloseAsync(); + } + + public ValueTask DisposeAsync() + { + return _relationalConnection.DisposeAsync(); + } +#endif + } +} diff --git a/src/ShardingCore/EFCores/ShardingRelationalTransaction.cs b/src/ShardingCore/EFCores/ShardingRelationalTransaction.cs new file mode 100644 index 00000000..3bc7118f --- /dev/null +++ b/src/ShardingCore/EFCores/ShardingRelationalTransaction.cs @@ -0,0 +1,44 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Diagnostics; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Storage; +using ShardingCore.Sharding.Abstractions; + +namespace ShardingCore.EFCores +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/5 20:37:36 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + //public class ShardingRelationalTransaction: RelationalTransaction + //{ + // private readonly IShardingDbContext _shardingDbContext; + // public ShardingRelationalTransaction(IRelationalConnection connection, DbTransaction transaction, Guid transactionId, IDiagnosticsLogger logger, bool transactionOwned) : base(connection, transaction, transactionId, logger, transactionOwned) + // { + // _shardingDbContext = (IShardingDbContext)null; + // _shardingDbContext.UseShardingTransaction(transaction); + // } + + // protected override void ClearTransaction() + // { + // base.ClearTransaction(); + // _shardingDbContext.UseShardingTransaction(null); + // } + + // protected override async Task ClearTransactionAsync(CancellationToken cancellationToken = new CancellationToken()) + // { + // await base.ClearTransactionAsync(cancellationToken); + // _shardingDbContext.UseShardingTransaction(null); + + // } + //} +} diff --git a/src/ShardingCore/EFCores/ShardingRelationalTransactionFactory.cs b/src/ShardingCore/EFCores/ShardingRelationalTransactionFactory.cs new file mode 100644 index 00000000..d70b9c4c --- /dev/null +++ b/src/ShardingCore/EFCores/ShardingRelationalTransactionFactory.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Text; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Diagnostics; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Storage; +using ShardingCore.Sharding.Abstractions; + +namespace ShardingCore.EFCores +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/5 16:03:04 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ +#if !EFCORE2 + public class ShardingRelationalTransactionFactory: RelationalTransactionFactory + { + private readonly RelationalTransactionFactoryDependencies _dependencies; + public ShardingRelationalTransactionFactory(RelationalTransactionFactoryDependencies dependencies) : base(dependencies) + { + _dependencies = dependencies; + } + public override RelationalTransaction Create(IRelationalConnection connection, DbTransaction transaction, Guid transactionId, + IDiagnosticsLogger logger, bool transactionOwned) + { + return new RelationalTransaction(new ShardingRelationalConnection(connection, transaction), transaction, transactionId, logger, transactionOwned); + } + } +#endif +#if EFCORE2 + + public class ShardingRelationalTransactionFactory : RelationalTransactionFactory where TShardingDbContext:DbContext,IShardingDbContext + { + private readonly RelationalTransactionFactoryDependencies _dependencies; + public ShardingRelationalTransactionFactory(RelationalTransactionFactoryDependencies dependencies) : base(dependencies) + { + _dependencies = dependencies; + } + public override RelationalTransaction Create(IRelationalConnection connection, DbTransaction transaction + , IDiagnosticsLogger logger, + bool transactionOwned) + { + return new RelationalTransaction(new ShardingRelationalConnection(connection, transaction,typeof(TShardingDbContext)), transaction, logger, + transactionOwned); + } + } +#endif +} diff --git a/src/ShardingCore/Extensions/CommonExtension.cs b/src/ShardingCore/Extensions/CommonExtension.cs index 7ef2cf30..d4970eeb 100644 --- a/src/ShardingCore/Extensions/CommonExtension.cs +++ b/src/ShardingCore/Extensions/CommonExtension.cs @@ -48,7 +48,7 @@ namespace ShardingCore.Extensions } /// - /// IShardingTableDbContext + /// IShardingDbContext /// /// /// @@ -59,7 +59,7 @@ namespace ShardingCore.Extensions return dbContext is IShardingTableDbContext; } /// - /// IShardingTableDbContext + /// IShardingDbContext /// /// /// diff --git a/src/ShardingCore/Extensions/InternalExtensions/InternalPaginationMetadataExtension.cs b/src/ShardingCore/Extensions/InternalExtensions/InternalPaginationMetadataExtension.cs index ab7a1da5..38932016 100644 --- a/src/ShardingCore/Extensions/InternalExtensions/InternalPaginationMetadataExtension.cs +++ b/src/ShardingCore/Extensions/InternalExtensions/InternalPaginationMetadataExtension.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Text; using ShardingCore.Sharding.PaginationConfigurations; +using ShardingCore.Sharding.StreamMergeEngines; namespace ShardingCore.Extensions.InternalExtensions { @@ -19,7 +21,19 @@ namespace ShardingCore.Extensions.InternalExtensions if (total < paginationMetadata.ReverseTotalGe) return false; - return paginationMetadata.ReverseFactor * total < skip; + return skip> paginationMetadata.ReverseFactor * total; + } + internal static bool IsUseUneven(this PaginationMetadata paginationMetadata,ICollection> routeQueryResults,int skip) + { + if (routeQueryResults.Count <= 1) + return false; + + if (skip < paginationMetadata.UnevenLimit) + return false; + var total = routeQueryResults.Sum(o => o.QueryResult); + if(total* paginationMetadata.UnevenFactorGe < routeQueryResults.First().QueryResult) + return false; + return true; } } } diff --git a/src/ShardingCore/Extensions/ShardingExtension.cs b/src/ShardingCore/Extensions/ShardingExtension.cs index f20dbb43..0ff25a3a 100644 --- a/src/ShardingCore/Extensions/ShardingExtension.cs +++ b/src/ShardingCore/Extensions/ShardingExtension.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Linq.Expressions; using System.Text; +using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Exceptions; using ShardingCore.Sharding.Abstractions; @@ -26,7 +28,7 @@ namespace ShardingCore.Extensions // /// // /// // /// - // public static string GetShardingTableDbContextTail(this IShardingTableDbContext dbContext) + // public static string GetShardingTableDbContextTail(this IShardingDbContext dbContext) // { // return dbContext.RouteTail?.Replace(ShardingTableDbContextFormat, string.Empty)??string.Empty; // @@ -36,7 +38,7 @@ namespace ShardingCore.Extensions // /// // /// // /// - // public static void SetShardingTableDbContextTail(this IShardingTableDbContext dbContext, string tail) + // public static void SetShardingTableDbContextTail(this IShardingDbContext dbContext, string tail) // { // if (!string.IsNullOrWhiteSpace(dbContext.ModelChangeKey)) // throw new ShardingCoreException($"repeat set ModelChangeKey in {dbContext.GetType().FullName}"); @@ -58,5 +60,37 @@ namespace ShardingCore.Extensions return expression.ToString(); #endif } + + /// + /// 根据对象集合解析 + /// + /// + /// + /// + /// + public static IDictionary> BulkShardingEnumerable(this IShardingDbContext shardingDbContext, + IEnumerable entities) where TEntity : class + { + return entities.Select(o => + { + var dbContext = shardingDbContext.CreateGenericDbContext(o); + return new + { + DbContext = dbContext, + Entity = o + }; + }).GroupBy(g => g.DbContext).ToDictionary(o=>o.Key,o=>o.Select(g=>g.Entity)); + } + /// + /// 根据条件表达式解析 + /// + /// + /// + /// + /// + public static IEnumerable BulkShardingExpression(this IShardingDbContext shardingDbContext, Expression> where) where TEntity : class + { + return shardingDbContext.CreateExpressionDbContext(where); + } } } diff --git a/src/ShardingCore/Sharding/AbstractShardingDbContext.cs b/src/ShardingCore/Sharding/AbstractShardingDbContext.cs index 8950cb05..afe77b71 100644 --- a/src/ShardingCore/Sharding/AbstractShardingDbContext.cs +++ b/src/ShardingCore/Sharding/AbstractShardingDbContext.cs @@ -14,9 +14,13 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Data; +using System.Data.Common; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore.Infrastructure; +using ShardingCore.EFCores; namespace ShardingCore.Sharding { @@ -30,7 +34,7 @@ namespace ShardingCore.Sharding /// 分表分库的dbcontext /// /// - public abstract class AbstractShardingDbContext : DbContext, IShardingTableDbContext where T : DbContext, IShardingTableDbContext + public abstract class AbstractShardingDbContext : DbContext, IShardingDbContext, IShardingTransaction where T : DbContext, IShardingTableDbContext { private readonly ConcurrentDictionary _dbContextCaches = new ConcurrentDictionary(); private readonly IVirtualTableManager _virtualTableManager; @@ -40,6 +44,7 @@ namespace ShardingCore.Sharding private DbContextOptions _dbContextOptions; private readonly object CREATELOCK = new object(); + private Guid idid = Guid.NewGuid(); public AbstractShardingDbContext(DbContextOptions options) : base(options) { @@ -53,13 +58,22 @@ namespace ShardingCore.Sharding public abstract Type ShardingDbContextType { get; } public Type ActualDbContextType => typeof(T); + //private ShardingDatabaseFacade _database; + //public override DatabaseFacade Database + //{ + // get + // { + + // return _database ?? (_database = new ShardingDatabaseFacade(this)); + // } + //} private DbContextOptionsBuilder CreateDbContextOptionBuilder() { Type type = typeof(DbContextOptionsBuilder<>); type = type.MakeGenericType(ActualDbContextType); - return (DbContextOptionsBuilder) Activator.CreateInstance(type); + return (DbContextOptionsBuilder)Activator.CreateInstance(type); } private DbContextOptions CreateShareDbContextOptions() @@ -73,7 +87,7 @@ namespace ShardingCore.Sharding { var dbContextOptionBuilder = CreateDbContextOptionBuilder(); var connectionString = Database.GetDbConnection().ConnectionString; - _shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(connectionString,dbContextOptionBuilder); + _shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(connectionString, dbContextOptionBuilder); return dbContextOptionBuilder.Options; } @@ -104,7 +118,7 @@ namespace ShardingCore.Sharding { if (routeTail.IsMultiEntityQuery()) throw new ShardingCoreException("multi route not support track"); - if(!(routeTail is ISingleQueryRouteTail singleQueryRouteTail)) + if (!(routeTail is ISingleQueryRouteTail singleQueryRouteTail)) throw new ShardingCoreException("multi route not support track"); var cacheKey = routeTail.GetRouteTailIdenty(); if (!_dbContextCaches.TryGetValue(cacheKey, out var dbContext)) @@ -113,6 +127,8 @@ namespace ShardingCore.Sharding _dbContextCaches.TryAdd(cacheKey, dbContext); } + if (IsBeginTransaction) + dbContext.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction()); return dbContext; } else @@ -134,7 +150,27 @@ namespace ShardingCore.Sharding return GetDbContext(true, _routeTailFactory.Create(tail)); } - + + public IEnumerable CreateExpressionDbContext(Expression> @where) where TEntity : class + { + if (typeof(TEntity).IsShardingTable()) + { + var physicTable = _virtualTableManager.GetVirtualTable(ShardingDbContextType, typeof(TEntity)).RouteTo(new TableRouteConfig(predicate:@where)); + if (physicTable.IsEmpty()) + throw new ShardingCoreException($"{@where.ShardingPrint()} cant found ant physic table"); + return physicTable.Select(o => GetDbContext(true, _routeTailFactory.Create(o.Tail))); + } + else + { + return new[] { GetDbContext(true, _routeTailFactory.Create(string.Empty)) }; + } + } + + public void UseShardingTransaction(DbTransaction transaction) + { + _dbContextCaches.Values.ForEach(o => o.Database.UseTransaction(transaction)); + } + public override EntityEntry Add(object entity) { @@ -399,34 +435,26 @@ namespace ShardingCore.Sharding { var isBeginTransaction = IsBeginTransaction; //如果是内部开的事务就内部自己消化 - if (!isBeginTransaction) - { - Database.BeginTransaction(); - } int i = 0; + if (!isBeginTransaction) + { + using(var tran= Database.BeginTransaction()) + { - try + foreach (var dbContextCache in _dbContextCaches) + { + i += dbContextCache.Value.SaveChanges(); + } + tran.Commit(); + } + } + else { foreach (var dbContextCache in _dbContextCaches) { - dbContextCache.Value.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction()); i += dbContextCache.Value.SaveChanges(); } - - if (!isBeginTransaction) - Database.CurrentTransaction.Commit(); - } - finally - { - if (!isBeginTransaction) - { - Database.CurrentTransaction?.Dispose(); - foreach (var dbContextCache in _dbContextCaches) - { - dbContextCache.Value.Database.UseTransaction(null); - } - } } return i; @@ -435,35 +463,26 @@ namespace ShardingCore.Sharding public override int SaveChanges(bool acceptAllChangesOnSuccess) { var isBeginTransaction = IsBeginTransaction; + int i = 0; //如果是内部开的事务就内部自己消化 if (!isBeginTransaction) { - Database.BeginTransaction(); + using (var tran = Database.BeginTransaction()) + { + + foreach (var dbContextCache in _dbContextCaches) + { + i += dbContextCache.Value.SaveChanges(acceptAllChangesOnSuccess); + } + tran.Commit(); + } } - - int i = 0; - - try + else { foreach (var dbContextCache in _dbContextCaches) { - dbContextCache.Value.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction()); i += dbContextCache.Value.SaveChanges(acceptAllChangesOnSuccess); } - - if (!isBeginTransaction) - Database.CurrentTransaction.Commit(); - } - finally - { - if (!isBeginTransaction) - { - Database.CurrentTransaction?.Dispose(); - foreach (var dbContextCache in _dbContextCaches) - { - dbContextCache.Value.Database.UseTransaction(null); - } - } } return i; @@ -474,39 +493,27 @@ namespace ShardingCore.Sharding public override async Task SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken()) { var isBeginTransaction = IsBeginTransaction; + + int i = 0; //如果是内部开的事务就内部自己消化 if (!isBeginTransaction) { - await Database.BeginTransactionAsync(cancellationToken); + using (var tran = await Database.BeginTransactionAsync(cancellationToken)) + { + + foreach (var dbContextCache in _dbContextCaches) + { + i += await dbContextCache.Value.SaveChangesAsync(cancellationToken); + } + await tran.CommitAsync(); + } } - - int i = 0; - - try + else { foreach (var dbContextCache in _dbContextCaches) { - await dbContextCache.Value.Database.UseTransactionAsync(Database.CurrentTransaction.GetDbTransaction(), cancellationToken: cancellationToken); i += await dbContextCache.Value.SaveChangesAsync(cancellationToken); } - - if (!isBeginTransaction) - await Database.CurrentTransaction.CommitAsync(cancellationToken); - } - finally - { - if (!isBeginTransaction) - { - } - - if (Database.CurrentTransaction != null) - { - await Database.CurrentTransaction.DisposeAsync(); - foreach (var dbContextCache in _dbContextCaches) - { - await dbContextCache.Value.Database.UseTransactionAsync(null, cancellationToken: cancellationToken); - } - } } return i; @@ -517,37 +524,27 @@ namespace ShardingCore.Sharding { var isBeginTransaction = IsBeginTransaction; + int i = 0; //如果是内部开的事务就内部自己消化 if (!isBeginTransaction) { - await Database.BeginTransactionAsync(cancellationToken); - } - int i = 0; - - try - { - - foreach (var dbContextCache in _dbContextCaches) + using (var tran = await Database.BeginTransactionAsync(cancellationToken)) { - dbContextCache.Value.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction()); - i += await dbContextCache.Value.SaveChangesAsync(cancellationToken); - } - if (!isBeginTransaction) - Database.CurrentTransaction.Commit(); - } - finally - { - if (!isBeginTransaction) { } - if (Database.CurrentTransaction != null) - { - Database.CurrentTransaction.Dispose(); foreach (var dbContextCache in _dbContextCaches) { - dbContextCache.Value.Database.UseTransaction(null); + i += await dbContextCache.Value.SaveChangesAsync(cancellationToken); } + tran.Commit(); } - } + else + { + foreach (var dbContextCache in _dbContextCaches) + { + i += await dbContextCache.Value.SaveChangesAsync(cancellationToken); + } + } + return i; } #endif @@ -556,38 +553,31 @@ namespace ShardingCore.Sharding public override async Task SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken()) { var isBeginTransaction = IsBeginTransaction; + + int i = 0; //如果是内部开的事务就内部自己消化 if (!isBeginTransaction) { - await Database.BeginTransactionAsync(cancellationToken); + using (var tran = await Database.BeginTransactionAsync(cancellationToken)) + { + + foreach (var dbContextCache in _dbContextCaches) + { + i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken); + } + + await tran.CommitAsync(); + } } - - int i = 0; - - try + else { + foreach (var dbContextCache in _dbContextCaches) { - await dbContextCache.Value.Database.UseTransactionAsync(Database.CurrentTransaction.GetDbTransaction(), cancellationToken: cancellationToken); i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken); } - - if (!isBeginTransaction) - await Database.CurrentTransaction.CommitAsync(cancellationToken); } - finally - { - if (!isBeginTransaction) - if (Database.CurrentTransaction != null) - { - await Database.CurrentTransaction.DisposeAsync(); - foreach (var dbContextCache in _dbContextCaches) - { - await dbContextCache.Value.Database.UseTransactionAsync(null, cancellationToken: cancellationToken); - } - } - } return i; } @@ -597,37 +587,27 @@ namespace ShardingCore.Sharding { var isBeginTransaction = IsBeginTransaction; + int i = 0; //如果是内部开的事务就内部自己消化 if (!isBeginTransaction) { - await Database.BeginTransactionAsync(cancellationToken); - } - int i = 0; - - try - { - - foreach (var dbContextCache in _dbContextCaches) + using (var tran = await Database.BeginTransactionAsync(cancellationToken)) { - dbContextCache.Value.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction()); - i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken); - } - if (!isBeginTransaction) - Database.CurrentTransaction.Commit(); - } - finally - { - if (!isBeginTransaction) - if (Database.CurrentTransaction != null) - { - Database.CurrentTransaction.Dispose(); - foreach (var dbContextCache in _dbContextCaches) - { - dbContextCache.Value.Database.UseTransaction(null); - } + foreach (var dbContextCache in _dbContextCaches) + { + i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken); } + tran.Commit(); + } + } + else + { + foreach (var dbContextCache in _dbContextCaches) + { + i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken); + } } return i; } diff --git a/src/ShardingCore/Sharding/Abstractions/IShardingDbContext.cs b/src/ShardingCore/Sharding/Abstractions/IShardingDbContext.cs index 736325ec..072aaf18 100644 --- a/src/ShardingCore/Sharding/Abstractions/IShardingDbContext.cs +++ b/src/ShardingCore/Sharding/Abstractions/IShardingDbContext.cs @@ -1,6 +1,9 @@ using Microsoft.EntityFrameworkCore; using ShardingCore.Core.VirtualRoutes.RouteTails.Abstractions; using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Linq.Expressions; namespace ShardingCore.Sharding.Abstractions { @@ -12,30 +15,44 @@ namespace ShardingCore.Sharding.Abstractions */ public interface IShardingDbContext { + /// + /// 当前sharding的db context type + /// Type ShardingDbContextType { get; } /// - /// ��ʵ��DbContext ���� + /// 真实的db context type /// Type ActualDbContextType { get;} /// /// create DbContext /// - /// true not care dbcontext life, false need call dispose() + /// true not care db context life, false need call dispose() /// /// DbContext GetDbContext(bool track,IRouteTail routeTail); /// - /// ����ʵ�崴��db context + /// 创建通用的db context /// /// /// /// DbContext CreateGenericDbContext(T entity) where T : class; - + /// + /// 根据表达式创建db context + /// + /// + /// + /// + + IEnumerable CreateExpressionDbContext(Expression> where) + where TEntity : class; + + + } - public interface IShardingTableDbContext : IShardingDbContext where T : DbContext, IShardingTableDbContext + public interface IShardingDbContext : IShardingDbContext where T : DbContext, IShardingTableDbContext { } diff --git a/src/ShardingCore/Sharding/Abstractions/IShardingTransaction.cs b/src/ShardingCore/Sharding/Abstractions/IShardingTransaction.cs new file mode 100644 index 00000000..79e49cc1 --- /dev/null +++ b/src/ShardingCore/Sharding/Abstractions/IShardingTransaction.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Text; + +namespace ShardingCore.Sharding.Abstractions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/6 8:41:50 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public interface IShardingTransaction + { + void UseShardingTransaction(DbTransaction transaction); + } +} diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs index c59ae3f2..94357ad0 100644 --- a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs @@ -20,27 +20,34 @@ namespace ShardingCore.Sharding.PaginationConfigurations public ISet PaginationConfigs = new HashSet(); /// - /// 反向排序因子 + /// 反向排序因子 skip>ReverseFactor * total /// public double ReverseFactor { get; set; } = -1; /// - /// 当条数大于多少条后采用反向排序 + /// 当条数大于ReverseTotalGe条后采用反向排序 /// public long ReverseTotalGe { get; set; } = 10000L; /// - /// 是否已开启反向排序 仅支持单排序 + /// 是否已开启反向排序 skip>ReverseFactor * total 查询条件必须存在 order by /// public bool EnableReverseShardingPage => ReverseFactor > 0 && ReverseFactor < 1 && ReverseTotalGe >= 500; - // /// - // /// 当出现N张表分页需要跳过X条数据,获取Y条数据除了total条数最多的那张表以外的其他表和小于TakeInMemoryMaxRangeSkip那么就启用 - // /// - // public int TakeInMemoryMaxRangeSkip { get; set; } = 1000; - // - // public bool EnableTakeInMemory(int skip) - // { - // return skip > TakeInMemoryMaxRangeSkip && TakeInMemoryMaxRangeSkip > 500; - // } + + /// + /// 极度不规则分布时当分页最大一页书占全部的(UnevenFactorGe*100)%时启用内存排序 + /// + [Obsolete] + public double UnevenFactorGe { get; set; } = -1; + /// + /// 极度不规则分布时除了total最大一张表外的其余表相加不能超过UnevenLimit + /// + [Obsolete] + public int UnevenLimit { get; set; } = 300; + /// + /// 启用不规则分布分页 查询条件必须存在 order by + /// + [Obsolete] + public bool EnableUnevenShardingPage => UnevenFactorGe > 0 && UnevenFactorGe < 1 && UnevenLimit > 0; } } diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs index 39dcb56c..69575bba 100644 --- a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs +++ b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs @@ -90,8 +90,15 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors var total = _shardingPageManager.Current.RouteQueryResults.Sum(o => o.QueryResult); if (paginationMetadata.IsUseReverse(_streamMergeContext.Skip.GetValueOrDefault(), total)) { - return new ReverseShardingEnumeratorAsyncStreamMergeEngine( - _streamMergeContext, _streamMergeContext.Orders, total); + return new ReverseShardingEnumeratorAsyncStreamMergeEngine( _streamMergeContext, total); + } + } + + if (paginationMetadata.EnableUnevenShardingPage) + { + if (paginationMetadata.IsUseUneven(_shardingPageManager.Current.RouteQueryResults, _streamMergeContext.Skip.GetValueOrDefault())) + { + } } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs index 9701c828..58227e97 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs @@ -21,12 +21,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. */ public class ReverseShardingEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorAsyncStreamMergeEngine { - private readonly IEnumerable _primaryOrders; private readonly long _total; - public ReverseShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext, IEnumerable primaryOrders, long total) : base(streamMergeContext) + public ReverseShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext, long total) : base(streamMergeContext) { - _primaryOrders = primaryOrders; _total = total; } @@ -40,7 +38,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. var realSkip = _total- take- skip; var tableResult = StreamMergeContext.RouteResults; StreamMergeContext.ReSetSkip((int)realSkip); - var propertyOrders =_primaryOrders.Select(o=>new PropertyOrder( o.PropertyExpression,!o.IsAsc)).ToArray(); + var propertyOrders = StreamMergeContext.Orders.Select(o=>new PropertyOrder( o.PropertyExpression,!o.IsAsc)).ToArray(); StreamMergeContext.ReSetOrders(propertyOrders); var reverseOrderQueryable = noPaginationNoOrderQueryable.Take((int)realSkip+(int)take).OrderWithExpression(propertyOrders); var enumeratorTasks = tableResult.Select(routeResult => diff --git a/src/ShardingCore/ShardingConfigOption.cs b/src/ShardingCore/ShardingConfigOption.cs index f8ae5cdf..ea978e52 100644 --- a/src/ShardingCore/ShardingConfigOption.cs +++ b/src/ShardingCore/ShardingConfigOption.cs @@ -21,7 +21,7 @@ namespace ShardingCore */ public class ShardingConfigOption : IShardingConfigOption where TActualDbContext : DbContext, IShardingTableDbContext - where TShardingDbContext : DbContext, IShardingTableDbContext + where TShardingDbContext : DbContext, IShardingDbContext { private readonly Dictionary _virtualRoutes = new Dictionary(); diff --git a/src/ShardingCore/ShardingCore.csproj b/src/ShardingCore/ShardingCore.csproj index 3525589b..1e153601 100644 --- a/src/ShardingCore/ShardingCore.csproj +++ b/src/ShardingCore/ShardingCore.csproj @@ -14,7 +14,7 @@ - - + + diff --git a/test/ShardingCore.Test50_2x/Startup.cs b/test/ShardingCore.Test50_2x/Startup.cs index d509627a..bbef08fc 100644 --- a/test/ShardingCore.Test50_2x/Startup.cs +++ b/test/ShardingCore.Test50_2x/Startup.cs @@ -3,10 +3,12 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using ShardingCore.EFCores; using ShardingCore.Test50_2x.Domain.Entities; using ShardingCore.Test50_2x.Shardings; @@ -41,8 +43,9 @@ namespace ShardingCore.Test50_2x // ConfigureServices(HostBuilderContext hostBuilderContext, IServiceCollection services) public void ConfigureServices(IServiceCollection services, HostBuilderContext hostBuilderContext) { + services.AddDbContext(); services.AddShardingDbContext(o => o.UseSqlServer(hostBuilderContext.Configuration.GetSection("SqlServer")["ConnectionString"]) - ,op => + , op => { op.EnsureCreatedWithOutShardingTable = true; op.CreateShardingTableOnStart = true;