From 9eff3f47d54b7633c760af29f8959d7714a35b35 Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Mon, 6 Dec 2021 14:19:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E7=BE=8E=E9=9B=86=E6=88=90connection?= =?UTF-8?q?=20mode=E5=B9=B6=E4=B8=94=E4=BC=98=E5=8C=96connection=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- samples/Sample.SqlServer/Startup.cs | 2 - src/ShardingCore/DIExtension.cs | 16 ++-- .../DIExtensions/ShardingCoreConfigBuilder.cs | 18 +---- ...ardingCoreParallelQueryTimeOutException.cs | 24 ------ .../Extensions/ConnectionModeExtension.cs | 66 +++++++-------- src/ShardingCore/IShardingConfigOption.cs | 9 --- .../InMemoryStreamMergeAsyncEnumerator.cs | 57 ++++++------- .../PaginationStreamMergeAsyncEnumerator.cs | 9 ++- .../Abstractions/AbstractBaseMergeEngine.cs | 80 ++++++++++++++----- .../AbstractInMemoryAsyncMergeEngine.cs | 15 ++-- .../Abstractions/ShardingMergeResult.cs | 21 +++++ .../AbstractEnumeratorStreamMergeEngine.cs | 58 +++++++++++++- .../Common/DataSourceSqlExecutorUnit.cs | 21 +++++ .../MergeEngines/Common/SqlExecutorGroup.cs | 6 +- ...equenceEnumeratorAsyncStreamMergeEngine.cs | 14 ++-- ...hardingEnumeratorAsyncStreamMergeEngine.cs | 18 ++++- ...hardingEnumeratorAsyncStreamMergeEngine.cs | 14 +++- ...equenceEnumeratorAsyncStreamMergeEngine.cs | 7 +- src/ShardingCore/ShardingConfigOption.cs | 9 --- test/ShardingCore.Test/ShardingTest.cs | 8 -- test/ShardingCore.Test/ShardingTestSync.cs | 8 -- test/ShardingCore.Test/Startup.cs | 4 +- test/ShardingCore.Test2x/ShardingTest.cs | 8 -- test/ShardingCore.Test2x/ShardingTestSync.cs | 9 --- test/ShardingCore.Test3x/ShardingTest.cs | 8 -- test/ShardingCore.Test3x/ShardingTestSync.cs | 8 -- test/ShardingCore.Test5x/ShardingTest.cs | 8 -- test/ShardingCore.Test5x/ShardingTestSync.cs | 8 -- 29 files changed, 282 insertions(+), 253 deletions(-) delete mode 100644 src/ShardingCore/Exceptions/ShardingCoreParallelQueryTimeOutException.cs create mode 100644 src/ShardingCore/Sharding/MergeEngines/Abstractions/ShardingMergeResult.cs create mode 100644 src/ShardingCore/Sharding/MergeEngines/Common/DataSourceSqlExecutorUnit.cs diff --git a/README.md b/README.md index cbf42aa0..2e6e7034 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,7 @@ AMD Ryzen 9 3900X, 1 CPU, 24 logical and 12 physical cores | NoShardingNoIndexToListAsync | 10 | 25,865.136 ms | 115.6391 ms | 102.5111 ms | 25,847.258 ms | | ShardingNoIndexToListAsync | 10 | 5,502.922 ms | 92.7201 ms | 86.7305 ms | 5,483.847 ms | -具体可以通过first前两次结果来计算得出结论单次查询的的损耗为0.3-0.4毫秒之间,通过数据聚合和数据路由的损耗单次在0.3ms-0.4ms,其中创建dbcontext为0.1毫秒目前没有好的优化方案,0.013毫秒左右是路由表达式解析和编译,复杂表达式可能更加耗时,剩下的0.28毫秒为数据源和表后缀的解析等操作包括实例的反射创建和数据的聚合, +具体可以通过first前两次结果来计算得出结论单次查询的的损耗为0.2-0.3毫秒之间,通过数据聚合和数据路由的损耗单次在0.3ms-0.4ms,其中创建dbcontext为0.1毫秒目前没有好的优化方案,0.013毫秒左右是路由表达式解析和编译,复杂表达式可能更加耗时,剩下的0.2毫秒为数据源和表后缀的解析等操作包括实例的反射创建和数据的聚合, sqlserver的各项数据在分表和未分表的情况下都几乎差不多可以得出在770w数据集情况下数据库还并未是数据瓶颈的关键,但是mysql可以看到在分表和未分表的情况下如果涉及到没有索引的全表扫描那么性能的差距将是分表后的表数目之多,测试中为5-6倍,也就是分表数目 diff --git a/samples/Sample.SqlServer/Startup.cs b/samples/Sample.SqlServer/Startup.cs index 8f61c1de..575ece59 100644 --- a/samples/Sample.SqlServer/Startup.cs +++ b/samples/Sample.SqlServer/Startup.cs @@ -34,8 +34,6 @@ namespace Sample.SqlServer o.CreateShardingTableOnStart = true; o.EnsureCreatedWithOutShardingTable = true; o.AutoTrackEntity = true; - o.ParallelQueryMaxThreadCount = 100; - o.ParallelQueryTimeOut = TimeSpan.FromSeconds(10); //if SysTest entity not exists in db and db is exists //o.AddEntityTryCreateTable(); // or `o.AddEntitiesTryCreateTable(typeof(SysTest));` }) diff --git a/src/ShardingCore/DIExtension.cs b/src/ShardingCore/DIExtension.cs index 4010515f..174b8a23 100644 --- a/src/ShardingCore/DIExtension.cs +++ b/src/ShardingCore/DIExtension.cs @@ -2,8 +2,11 @@ using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Internal; using Microsoft.EntityFrameworkCore.Query.Internal; +using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using ShardingCore.Bootstrapers; +using ShardingCore.Core.EntityMetadatas; using ShardingCore.Core.QueryRouteManagers; using ShardingCore.Core.QueryRouteManagers.Abstractions; using ShardingCore.Core.ShardingPage; @@ -18,23 +21,14 @@ using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; using ShardingCore.DbContexts; using ShardingCore.DIExtensions; using ShardingCore.EFCores; +using ShardingCore.EFCores.OptionsExtensions; using ShardingCore.Helpers; +using ShardingCore.Jobs; using ShardingCore.Sharding; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.ShardingQueryExecutors; using ShardingCore.TableCreator; using System; -using System.Collections.Generic; -using System.Linq; -using Microsoft.EntityFrameworkCore.Storage; -using ShardingCore.Bootstrapers; -using ShardingCore.Core.EntityMetadatas; -using ShardingCore.EFCores.OptionsExtensions; -using ShardingCore.Exceptions; -using ShardingCore.Extensions; -using ShardingCore.Jobs; -using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge; -using ShardingCore.Sharding.StreamMergeEngines; namespace ShardingCore { diff --git a/src/ShardingCore/DIExtensions/ShardingCoreConfigBuilder.cs b/src/ShardingCore/DIExtensions/ShardingCoreConfigBuilder.cs index 145c4636..7de4e9bd 100644 --- a/src/ShardingCore/DIExtensions/ShardingCoreConfigBuilder.cs +++ b/src/ShardingCore/DIExtensions/ShardingCoreConfigBuilder.cs @@ -36,16 +36,11 @@ namespace ShardingCore.DIExtensions { var shardingCoreBeginOptions = new ShardingCoreBeginOptions(); shardingCoreBeginOptionsConfigure?.Invoke(shardingCoreBeginOptions); - if (shardingCoreBeginOptions.ParallelQueryMaxThreadCount <= 0) + if (shardingCoreBeginOptions.MaxQueryConnectionsLimit <= 0) throw new ArgumentException( - $"{nameof(shardingCoreBeginOptions.ParallelQueryMaxThreadCount)} should greater than zero thread count"); - if (shardingCoreBeginOptions.ParallelQueryTimeOut.TotalMilliseconds <= 0) - throw new ArgumentException( - $"{nameof(shardingCoreBeginOptions.ParallelQueryTimeOut)} should greater than zero milliseconds"); + $"{nameof(shardingCoreBeginOptions.MaxQueryConnectionsLimit)} should greater than and equal 1"); ShardingConfigOption.EnsureCreatedWithOutShardingTable = shardingCoreBeginOptions.EnsureCreatedWithOutShardingTable; ShardingConfigOption.AutoTrackEntity = shardingCoreBeginOptions.AutoTrackEntity; - ShardingConfigOption.ParallelQueryMaxThreadCount = shardingCoreBeginOptions.ParallelQueryMaxThreadCount; - ShardingConfigOption.ParallelQueryTimeOut = shardingCoreBeginOptions.ParallelQueryTimeOut; ShardingConfigOption.CreateShardingTableOnStart = shardingCoreBeginOptions.CreateShardingTableOnStart; ShardingConfigOption.IgnoreCreateTableError = shardingCoreBeginOptions.IgnoreCreateTableError; ShardingConfigOption.MaxQueryConnectionsLimit = shardingCoreBeginOptions.MaxQueryConnectionsLimit; @@ -96,15 +91,6 @@ namespace ShardingCore.DIExtensions /// public bool AutoTrackEntity { get; set; } - /// - /// 单次查询并发线程数目(最小1)默认cpu核心数*2 - /// - public int ParallelQueryMaxThreadCount { get; set; } = Environment.ProcessorCount*2; - /// - /// 默认30秒超时 - /// - public TimeSpan ParallelQueryTimeOut { get; set; } = TimeSpan.FromSeconds(30); - /// /// 忽略建表时的错误 /// diff --git a/src/ShardingCore/Exceptions/ShardingCoreParallelQueryTimeOutException.cs b/src/ShardingCore/Exceptions/ShardingCoreParallelQueryTimeOutException.cs deleted file mode 100644 index 6bbbcaaf..00000000 --- a/src/ShardingCore/Exceptions/ShardingCoreParallelQueryTimeOutException.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using System.Runtime.Serialization; -using System.Text; - -namespace ShardingCore.Exceptions -{ - /* - * @Author: xjm - * @Description: - * @Date: 2021/10/3 22:25:59 - * @Ver: 1.0 - * @Email: 326308290@qq.com - */ - [ExcludeFromCodeCoverage] - public class ShardingCoreParallelQueryTimeOutException:ShardingCoreException - { - - public ShardingCoreParallelQueryTimeOutException(string message) : base(message) - { - } - } -} diff --git a/src/ShardingCore/Extensions/ConnectionModeExtension.cs b/src/ShardingCore/Extensions/ConnectionModeExtension.cs index f51a1077..82f032f6 100644 --- a/src/ShardingCore/Extensions/ConnectionModeExtension.cs +++ b/src/ShardingCore/Extensions/ConnectionModeExtension.cs @@ -1,34 +1,34 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Microsoft.EntityFrameworkCore; -using ShardingCore.Core; +//using System; +//using System.Collections.Generic; +//using System.Linq; +//using System.Text; +//using System.Threading.Tasks; +//using Microsoft.EntityFrameworkCore; +//using ShardingCore.Core; -namespace ShardingCore.Extensions -{ - internal static class ConnectionModeExtension - { - public static async Task ReleaseConnectionAsync(this Task executeTask, DbContext dbContext, - ConnectionModeEnum connectionMode) - { - try - { - return await executeTask; - } - finally - { - if (connectionMode == ConnectionModeEnum.CONNECTION_STRICTLY) - { -#if !EFCORE2 - await dbContext.DisposeAsync(); -#endif -#if EFCORE2 - dbContext.Dispose(); -#endif - } - } - } - } -} +//namespace ShardingCore.Extensions +//{ +// internal static class ConnectionModeExtension +// { +// public static async Task ReleaseConnectionAsync(this Task executeTask, DbContext dbContext, +// ConnectionModeEnum connectionMode) +// { +// try +// { +// return await executeTask; +// } +// finally +// { +//// if (connectionMode == ConnectionModeEnum.CONNECTION_STRICTLY) +//// { +////#if !EFCORE2 +//// await dbContext.DisposeAsync(); +////#endif +////#if EFCORE2 +//// dbContext.Dispose(); +////#endif +//// } +// } +// } +// } +//} diff --git a/src/ShardingCore/IShardingConfigOption.cs b/src/ShardingCore/IShardingConfigOption.cs index 390c5b35..2669e553 100644 --- a/src/ShardingCore/IShardingConfigOption.cs +++ b/src/ShardingCore/IShardingConfigOption.cs @@ -62,15 +62,6 @@ namespace ShardingCore /// 自动追踪实体 /// public bool AutoTrackEntity { get; set; } - - /// - /// 单次查询并发线程数目(1-65536) - /// - public int ParallelQueryMaxThreadCount { get; set; } - /// - /// 并发查询超时时间 - /// - public TimeSpan ParallelQueryTimeOut { get; set; } /// /// 默认数据源名称 /// diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryStreamMergeAsyncEnumerator.cs index efecff87..92354f30 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryStreamMergeAsyncEnumerator.cs @@ -11,57 +11,50 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync { internal class InMemoryStreamMergeAsyncEnumerator : IStreamMergeAsyncEnumerator { + private readonly bool _async; private readonly IEnumerator _inMemoryEnumerator; private bool skip; - public InMemoryStreamMergeAsyncEnumerator(IAsyncEnumerator asyncSource) + public InMemoryStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator asyncSource, bool async) { if (_inMemoryEnumerator != null) throw new ArgumentNullException(nameof(_inMemoryEnumerator)); + _async = async; - _inMemoryEnumerator = GetAllRowsAsync(asyncSource).WaitAndUnwrapException(); + if (_async) + _inMemoryEnumerator = GetAllRowsAsync(asyncSource).WaitAndUnwrapException(); + else + _inMemoryEnumerator = GetAllRows(asyncSource); _inMemoryEnumerator.MoveNext(); skip = true; } - private async Task> GetAllRowsAsync(IAsyncEnumerator asyncSource) + private async Task> GetAllRowsAsync(IStreamMergeAsyncEnumerator streamMergeAsyncEnumerator) { var linkedList = new LinkedList(); - if (asyncSource.Current != null) - { - linkedList.AddLast(asyncSource.Current); #if !EFCORE2 - while (await asyncSource.MoveNextAsync()) + while (await streamMergeAsyncEnumerator.MoveNextAsync()) #endif #if EFCORE2 - while (await asyncSource.MoveNext()) + while (await streamMergeAsyncEnumerator.MoveNext(new CancellationToken())) #endif - { - linkedList.AddLast(asyncSource.Current); - } + { + linkedList.AddLast(streamMergeAsyncEnumerator.GetCurrent()); } return linkedList.GetEnumerator(); } - - public InMemoryStreamMergeAsyncEnumerator(IEnumerator syncSource) - { - if (_inMemoryEnumerator != null) - throw new ArgumentNullException(nameof(_inMemoryEnumerator)); - _inMemoryEnumerator = GetAllRows(syncSource); - _inMemoryEnumerator.MoveNext(); - skip = true; - } - private IEnumerator GetAllRows(IEnumerator syncSource) + private IEnumerator GetAllRows(IStreamMergeAsyncEnumerator streamMergeAsyncEnumerator) { var linkedList = new LinkedList(); - if (syncSource.Current != null) +#if !EFCORE2 + while ( streamMergeAsyncEnumerator.MoveNext()) +#endif +#if EFCORE2 + while (streamMergeAsyncEnumerator.MoveNext()) +#endif { - linkedList.AddLast(syncSource.Current); - while (syncSource.MoveNext()) - { - linkedList.AddLast(syncSource.Current); - } + linkedList.AddLast(streamMergeAsyncEnumerator.GetCurrent()); } return linkedList.GetEnumerator(); @@ -77,20 +70,20 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync return false; } #if !EFCORE2 - public ValueTask DisposeAsync() + public ValueTask DisposeAsync() { _inMemoryEnumerator.Dispose(); return new ValueTask(); } - public ValueTask MoveNextAsync() + public ValueTask MoveNextAsync() { if (skip) { skip = false; return new ValueTask(null != _inMemoryEnumerator.Current); } - return new ValueTask(_inMemoryEnumerator.MoveNext()); + return new ValueTask(_inMemoryEnumerator.MoveNext()); } public void Dispose() @@ -139,7 +132,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync _inMemoryEnumerator?.Dispose(); } - public Task MoveNext(CancellationToken cancellationToken = new CancellationToken()) + public Task MoveNext(CancellationToken cancellationToken = new CancellationToken()) { cancellationToken.ThrowIfCancellationRequested(); if (skip) @@ -147,7 +140,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync skip = false; return Task.FromResult(null != _inMemoryEnumerator.Current); } - return Task.FromResult(_inMemoryEnumerator.MoveNext()); + return Task.FromResult(_inMemoryEnumerator.MoveNext()); } diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs index 2f9e54ca..11937f97 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs @@ -21,11 +21,14 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync private int realSkip = 0; private int realTake = 0; - public PaginationStreamMergeAsyncEnumerator(StreamMergeContext mergeContext, IEnumerable> sources) + public PaginationStreamMergeAsyncEnumerator(StreamMergeContext mergeContext, IEnumerable> sources):this(mergeContext, sources,mergeContext.Skip, mergeContext.Take) + { + } + public PaginationStreamMergeAsyncEnumerator(StreamMergeContext mergeContext, IEnumerable> sources,int? skip,int? take) { _mergeContext = mergeContext; - _skip = mergeContext.Skip; - _take = mergeContext.Take; + _skip = skip; + _take = take; if (_mergeContext.HasGroupQuery()) _enumerator = new MultiAggregateOrderStreamMergeAsyncEnumerator(_mergeContext, sources); else diff --git a/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs index d665160b..4413bc14 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs @@ -68,19 +68,43 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions //} - public Task>[] GetDataSourceGroupAndExecutorGroup(IEnumerable sqlRouteUnits,Func> sqlExecutorUnitExecuteAsync,CancellationToken cancellationToken=new CancellationToken()) + public Task>[] GetDataSourceGroupAndExecutorGroup(bool async,IEnumerable sqlRouteUnits, Func>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken()) { - var waitTaskQueue = AggregateQueryByDataSourceName(sqlRouteUnits).Select(GetSqlExecutorGroups).Select(executorGroups => + var waitTaskQueue = AggregateQueryByDataSourceName(sqlRouteUnits) + .Select(GetSqlExecutorGroups) + .Select(dataSourceSqlExecutorUnit => { return Task.Run(async () => { + var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups; LinkedList result = new LinkedList(); + //同数据库下多组数据间采用串行 foreach (var executorGroup in executorGroups) { - var routeQueryResults = await ExecuteAsync(executorGroup.Groups, sqlExecutorUnitExecuteAsync,cancellationToken); - foreach (var routeQueryResult in routeQueryResults) + //同组采用并行最大化用户配置链接数 + var routeQueryResults = await ExecuteAsync(executorGroup.Groups, sqlExecutorUnitExecuteAsync, cancellationToken); + //严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext + if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY) { - result.AddLast(routeQueryResult); + MergeParallelExecuteResult(result, routeQueryResults.Select(o => o.MergeResult), async); + var dbContexts = routeQueryResults.Select(o => o.DbContext); + foreach (var dbContext in dbContexts) + { +#if !EFCORE2 + await dbContext.DisposeAsync(); + +#endif +#if EFCORE2 + dbContext.Dispose(); +#endif + } + } + else + { + foreach (var routeQueryResult in routeQueryResults) + { + result.AddLast(routeQueryResult.MergeResult); + } } } @@ -90,6 +114,14 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions return waitTaskQueue; } + public virtual void MergeParallelExecuteResult(LinkedList previewResults, IEnumerable parallelResults, bool async) + { + foreach (var parallelResult in parallelResults) + { + previewResults.AddLast(parallelResult); + } + } + protected virtual IEnumerable GetDefaultSqlRouteUnits() { @@ -107,7 +139,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions return sqlRouteUnits.GroupBy(o => o.DataSourceName); } - protected List> GetSqlExecutorGroups(IGrouping sqlGroups) + protected DataSourceSqlExecutorUnit GetSqlExecutorGroups(IGrouping sqlGroups) { var streamMergeContext = GetStreamMergeContext(); var maxQueryConnectionsLimit = streamMergeContext.GetMaxQueryConnectionsLimit(); @@ -123,33 +155,41 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions var sqlExecutorUnitPartitions = sqlGroups .Select((o, i) => new { Obj = o, index = i % exceptCount }).GroupBy(o => o.index) .Select(o => o.Select(g => new SqlExecutorUnit(connectionMode, g.Obj)).ToList()).ToList(); - return sqlExecutorUnitPartitions.Select(o => new SqlExecutorGroup(o)).ToList(); + var sqlExecutorGroups = sqlExecutorUnitPartitions.Select(o => new SqlExecutorGroup(connectionMode, o)).ToList(); + return new DataSourceSqlExecutorUnit(connectionMode, sqlExecutorGroups); } - - protected async Task> ExecuteAsync(List sqlExecutorUnits, Func> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken()) + /// + /// 同库同组下面的并行异步执行,需要归并成一个结果 + /// + /// + /// + /// + /// + /// + protected async Task>> ExecuteAsync(List sqlExecutorUnits, Func>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken()) { if (sqlExecutorUnits.Count <= 0) { - return new LinkedList(); + return new LinkedList>(); } else { - var result=new LinkedList(); - Task[] tasks=null; + var result = new LinkedList>(); + Task>[] tasks = null; if (sqlExecutorUnits.Count > 1) { - tasks = sqlExecutorUnits.Skip(1).Select(sqlExecutorUnit => - { - return Task.Run(async () => - { - return await sqlExecutorUnitExecuteAsync(sqlExecutorUnit); + tasks = sqlExecutorUnits.Skip(1).Select(sqlExecutorUnit => + { + return Task.Run(async () => + { + return await sqlExecutorUnitExecuteAsync(sqlExecutorUnit); - }, cancellationToken); - }).ToArray(); + }, cancellationToken); + }).ToArray(); } else { - tasks = Array.Empty>(); + tasks = Array.Empty>>(); } var firstResult = await sqlExecutorUnitExecuteAsync(sqlExecutorUnits[0]); result.AddLast(firstResult); diff --git a/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractInMemoryAsyncMergeEngine.cs index b85f509b..19dc7f4c 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractInMemoryAsyncMergeEngine.cs @@ -86,7 +86,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge public async Task>> ExecuteAsync(Func> efQuery, CancellationToken cancellationToken = new CancellationToken()) { var defaultSqlRouteUnits = GetDefaultSqlRouteUnits(); - var waitExecuteQueue = GetDataSourceGroupAndExecutorGroup>(defaultSqlRouteUnits, + var waitExecuteQueue = GetDataSourceGroupAndExecutorGroup>(true,defaultSqlRouteUnits, async sqlExecutorUnit => { var connectionMode = _mergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode); @@ -96,8 +96,9 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge var (asyncExecuteQueryable, dbContext) = CreateAsyncExecuteQueryable(dataSourceName, routeResult, connectionMode); - var queryResult = await efQuery(asyncExecuteQueryable).ReleaseConnectionAsync(dbContext, connectionMode); - return new RouteQueryResult(dataSourceName, routeResult, queryResult); + var queryResult = await efQuery(asyncExecuteQueryable); + var routeQueryResult = new RouteQueryResult(dataSourceName, routeResult, queryResult); + return new ShardingMergeResult>(dbContext, routeQueryResult); }).ToArray(); return (await Task.WhenAll(waitExecuteQueue)).SelectMany(o => o).ToList(); @@ -133,10 +134,10 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge return _mergeContext; } - public IQueryable GetQueryable() - { - return _queryable; - } + //public IQueryable GetQueryable() + //{ + // return _queryable; + //} protected MethodCallExpression GetMethodCallExpression() { diff --git a/src/ShardingCore/Sharding/MergeEngines/Abstractions/ShardingMergeResult.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/ShardingMergeResult.cs new file mode 100644 index 00000000..2759043d --- /dev/null +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/ShardingMergeResult.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; + +namespace ShardingCore.Sharding.MergeEngines.Abstractions +{ + internal class ShardingMergeResult + { + public DbContext DbContext { get; } + public TResult MergeResult { get; } + + public ShardingMergeResult(DbContext dbContext,TResult mergeResult) + { + DbContext = dbContext; + MergeResult = mergeResult; + } + } +} diff --git a/src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/AbstractEnumeratorStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/AbstractEnumeratorStreamMergeEngine.cs index 02a96062..daaf6d18 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/AbstractEnumeratorStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/AbstractEnumeratorStreamMergeEngine.cs @@ -90,8 +90,53 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge /// 获取路由查询的迭代器 /// /// + /// /// public abstract IStreamMergeAsyncEnumerator[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken()); + + /// + /// 合并流式聚合内存最小化 + /// + /// + /// + /// + /// + /// + /// + public override void MergeParallelExecuteResult(LinkedList previewResults, IEnumerable parallelResults,bool async) + { + if (previewResults.Count > 1) + { + throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} {nameof(previewResults)} has more than one element in container"); + } + + var parallelCount = parallelResults.Count(); + if (parallelCount == 0) + return; + //聚合 + if (previewResults is LinkedList> previewInMemoryStreamEnumeratorResults && parallelResults is IEnumerable> parallelStreamEnumeratorResults) + { + var mergeAsyncEnumerators = new LinkedList>(); + if (previewInMemoryStreamEnumeratorResults.Count == 1) + { + mergeAsyncEnumerators.AddLast(previewInMemoryStreamEnumeratorResults.First()); + } + foreach (var parallelStreamEnumeratorResult in parallelStreamEnumeratorResults) + { + mergeAsyncEnumerators.AddLast(parallelStreamEnumeratorResult); + } + + var combineStreamMergeAsyncEnumerator = CombineInMemoryStreamMergeAsyncEnumerator(mergeAsyncEnumerators.ToArray()); + var inMemoryStreamMergeAsyncEnumerator = new InMemoryStreamMergeAsyncEnumerator(combineStreamMergeAsyncEnumerator, async); + previewInMemoryStreamEnumeratorResults.Clear(); + previewInMemoryStreamEnumeratorResults.AddLast(inMemoryStreamMergeAsyncEnumerator); + //合并 + return; + } + + throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} is not {typeof(IStreamMergeAsyncEnumerator)}"); + } + /// /// 合并成一个迭代器 /// @@ -99,27 +144,32 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge /// public abstract IStreamMergeAsyncEnumerator CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators); + public virtual IStreamMergeAsyncEnumerator CombineInMemoryStreamMergeAsyncEnumerator( + IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + { + return CombineStreamMergeAsyncEnumerator(streamsAsyncEnumerators); + } + /// /// 开启异步线程获取并发迭代器 /// /// /// - /// /// /// - public async Task> AsyncParallelEnumerator(IQueryable queryable, bool async,ConnectionModeEnum connectionMode, + public async Task> AsyncParallelEnumerator(IQueryable queryable, bool async, CancellationToken cancellationToken = new CancellationToken()) { cancellationToken.ThrowIfCancellationRequested(); if (async) { var asyncEnumerator = await GetAsyncEnumerator0(queryable); - return connectionMode==ConnectionModeEnum.MEMORY_STRICTLY? new StreamMergeAsyncEnumerator(asyncEnumerator):new InMemoryStreamMergeAsyncEnumerator(asyncEnumerator); + return new StreamMergeAsyncEnumerator(asyncEnumerator); } else { var enumerator = GetEnumerator0(queryable); - return connectionMode == ConnectionModeEnum.MEMORY_STRICTLY ? new StreamMergeAsyncEnumerator(enumerator):new InMemoryStreamMergeAsyncEnumerator(enumerator); + return new StreamMergeAsyncEnumerator(enumerator); } } diff --git a/src/ShardingCore/Sharding/MergeEngines/Common/DataSourceSqlExecutorUnit.cs b/src/ShardingCore/Sharding/MergeEngines/Common/DataSourceSqlExecutorUnit.cs new file mode 100644 index 00000000..c4ea830f --- /dev/null +++ b/src/ShardingCore/Sharding/MergeEngines/Common/DataSourceSqlExecutorUnit.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using ShardingCore.Core; + +namespace ShardingCore.Sharding.MergeEngines.Common +{ + internal class DataSourceSqlExecutorUnit + { + public ConnectionModeEnum ConnectionMode { get; } + public List> SqlExecutorGroups { get; } + + public DataSourceSqlExecutorUnit(ConnectionModeEnum connectionMode,List> sqlExecutorGroups) + { + ConnectionMode = connectionMode; + SqlExecutorGroups = sqlExecutorGroups; + } + } +} diff --git a/src/ShardingCore/Sharding/MergeEngines/Common/SqlExecutorGroup.cs b/src/ShardingCore/Sharding/MergeEngines/Common/SqlExecutorGroup.cs index 67d7e41e..fc8f520b 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Common/SqlExecutorGroup.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Common/SqlExecutorGroup.cs @@ -3,19 +3,23 @@ using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; +using ShardingCore.Core; namespace ShardingCore.Sharding.MergeEngines.Common { internal sealed class SqlExecutorGroup { - public SqlExecutorGroup(List groups) + public SqlExecutorGroup(ConnectionModeEnum connectionMode,List groups) { + ConnectionMode = connectionMode; Groups = groups; } + public ConnectionModeEnum ConnectionMode { get; } /// /// 执行组 /// public List Groups { get; } + } } diff --git a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppendOrderSequenceEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppendOrderSequenceEnumeratorAsyncStreamMergeEngine.cs index d9675905..7b66a106 100644 --- a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppendOrderSequenceEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppendOrderSequenceEnumeratorAsyncStreamMergeEngine.cs @@ -12,6 +12,7 @@ using ShardingCore.Extensions.InternalExtensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.MergeEngines.Abstractions; using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge; using ShardingCore.Sharding.MergeEngines.Common; using ShardingCore.Sharding.PaginationConfigurations; @@ -105,21 +106,22 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. StreamMergeContext.ReSetOrders(reSetOrders); var sqlSequenceRouteUnits = sequenceResults.Select(sequenceResult => new SqlSequenceRouteUnit(sequenceResult)); - var enumeratorTasks = GetDataSourceGroupAndExecutorGroup>(sqlSequenceRouteUnits, + var enumeratorTasks = GetDataSourceGroupAndExecutorGroup>(async,sqlSequenceRouteUnits, async sqlExecutorUnit => { var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode); var (newQueryable, dbContext) = CreateAsyncExecuteQueryable(noPaginationQueryable, ((SqlSequenceRouteUnit)sqlExecutorUnit.RouteUnit).SequenceResult, reSetOrders, connectionMode); - return await AsyncParallelEnumerator(newQueryable, async, connectionMode, cancellationToken) - .ReleaseConnectionAsync(dbContext, connectionMode); + var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async, cancellationToken); + return new ShardingMergeResult>(dbContext, + streamMergeAsyncEnumerator); }, cancellationToken); - - var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray(); + + var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o => o).ToArray(); return streamEnumerators; } - private (IQueryable,DbContext) CreateAsyncExecuteQueryable(IQueryable noPaginationQueryable, SequenceResult sequenceResult, IEnumerable reSetOrders,ConnectionModeEnum connectionMode) + private (IQueryable, DbContext) CreateAsyncExecuteQueryable(IQueryable noPaginationQueryable, SequenceResult sequenceResult, IEnumerable reSetOrders, ConnectionModeEnum connectionMode) { var shardingDbContext = StreamMergeContext.CreateDbContext(sequenceResult.DSName, sequenceResult.TableRouteResult, connectionMode); var newQueryable = (IQueryable)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take).OrderWithExpression(reSetOrders)) diff --git a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs index 54264860..ebc8346e 100644 --- a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs @@ -1,13 +1,16 @@ +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Core; using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; +using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.MergeEngines.Abstractions; using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge; namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync @@ -30,15 +33,15 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer { cancellationToken.ThrowIfCancellationRequested(); var defaultSqlRouteUnits = GetDefaultSqlRouteUnits(); - var enumeratorTasks = GetDataSourceGroupAndExecutorGroup>(defaultSqlRouteUnits, + var enumeratorTasks = GetDataSourceGroupAndExecutorGroup>(async,defaultSqlRouteUnits, async sqlExecutorUnit => { var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode); var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName; var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult; var (newQueryable,dbContext) = CreateAsyncExecuteQueryable(dataSourceName, routeResult, connectionMode); - return await AsyncParallelEnumerator(newQueryable, async,connectionMode, cancellationToken) - .ReleaseConnectionAsync(dbContext, connectionMode); + var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async, cancellationToken); + return new ShardingMergeResult>(dbContext, streamMergeAsyncEnumerator); }, cancellationToken); var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray(); @@ -53,6 +56,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer return (newQueryable,shardingDbContext); } + public override IStreamMergeAsyncEnumerator CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) { if (StreamMergeContext.IsPaginationQuery()) @@ -61,5 +65,13 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer return new MultiAggregateOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); return new MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); } + + public override IStreamMergeAsyncEnumerator CombineInMemoryStreamMergeAsyncEnumerator( + IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + { + if (StreamMergeContext.IsPaginationQuery()) + return new PaginationStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators,0, StreamMergeContext.Skip.GetValueOrDefault() + StreamMergeContext.Take.GetValueOrDefault()); + return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators); + } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs index df0db1ff..34150825 100644 --- a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs @@ -12,6 +12,7 @@ using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.MergeEngines.Abstractions; using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge; namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync @@ -47,7 +48,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. var reverseOrderQueryable = noPaginationNoOrderQueryable.Take((int)realSkip+(int)take).OrderWithExpression(propertyOrders); var defaultSqlRouteUnits = GetDefaultSqlRouteUnits(); - var enumeratorTasks = GetDataSourceGroupAndExecutorGroup>(defaultSqlRouteUnits, + var enumeratorTasks = GetDataSourceGroupAndExecutorGroup>(async,defaultSqlRouteUnits, async sqlExecutorUnit => { var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode); @@ -55,7 +56,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult; var (newQueryable,dbContext) = CreateAsyncExecuteQueryable(dataSourceName, reverseOrderQueryable, routeResult, connectionMode); - return await AsyncParallelEnumerator(newQueryable, async, connectionMode, cancellationToken).ReleaseConnectionAsync(dbContext,connectionMode); + var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async,cancellationToken); + return new ShardingMergeResult>(dbContext, + streamMergeAsyncEnumerator); }); @@ -85,5 +88,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. return new MultiAggregateOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); return new MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); } + public override IStreamMergeAsyncEnumerator CombineInMemoryStreamMergeAsyncEnumerator( + IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + { + if (StreamMergeContext.IsPaginationQuery()) + return new PaginationStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators, 0, StreamMergeContext.Skip.GetValueOrDefault() + StreamMergeContext.Take.GetValueOrDefault()); + return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators); + } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs index d47c5fed..d957a8e1 100644 --- a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs @@ -11,6 +11,7 @@ using ShardingCore.Extensions.InternalExtensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.MergeEngines.Abstractions; using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge; using ShardingCore.Sharding.MergeEngines.Common; using ShardingCore.Sharding.PaginationConfigurations; @@ -91,14 +92,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. var sequenceResults = new SequencePaginationList(sortRouteResults.Select(o => o.RouteQueryResult)).Skip(skip).Take(take).ToList(); var sqlSequenceRouteUnits = sequenceResults.Select(sequenceResult => new SqlSequenceRouteUnit(sequenceResult)); - var enumeratorTasks = GetDataSourceGroupAndExecutorGroup>(sqlSequenceRouteUnits, + var enumeratorTasks = GetDataSourceGroupAndExecutorGroup>(async,sqlSequenceRouteUnits, async sqlExecutorUnit => { var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode); var (newQueryable, dbContext) = CreateAsyncExecuteQueryable(noPaginationQueryable, ((SqlSequenceRouteUnit)sqlExecutorUnit.RouteUnit).SequenceResult, connectionMode); - return await AsyncParallelEnumerator(newQueryable, async, connectionMode, cancellationToken) - .ReleaseConnectionAsync(dbContext, connectionMode); + var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async, cancellationToken); + return new ShardingMergeResult>(dbContext,streamMergeAsyncEnumerator); }, cancellationToken); var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray(); return streamEnumerators; diff --git a/src/ShardingCore/ShardingConfigOption.cs b/src/ShardingCore/ShardingConfigOption.cs index 4f60b4da..2364dec8 100644 --- a/src/ShardingCore/ShardingConfigOption.cs +++ b/src/ShardingCore/ShardingConfigOption.cs @@ -230,15 +230,6 @@ namespace ShardingCore /// public bool AutoTrackEntity { get; set; } - /// - /// 单次查询并发线程数目(最小1) - /// - public int ParallelQueryMaxThreadCount { get; set; } = Environment.ProcessorCount*2; - /// - /// 默认30秒超时 - /// - public TimeSpan ParallelQueryTimeOut { get; set; }=TimeSpan.FromSeconds(30); - public string DefaultDataSourceName { get; set; } public string DefaultConnectionString { get; set; } public int MaxQueryConnectionsLimit { get; set; } = Environment.ProcessorCount; diff --git a/test/ShardingCore.Test/ShardingTest.cs b/test/ShardingCore.Test/ShardingTest.cs index 163085c1..b1817e49 100644 --- a/test/ShardingCore.Test/ShardingTest.cs +++ b/test/ShardingCore.Test/ShardingTest.cs @@ -173,14 +173,6 @@ namespace ShardingCore.Test Assert.False(isKey3); var isKey4 = userModMetadata.ShardingTableFieldIsKey(); Assert.True(isKey4); - try - { - throw new ShardingCoreParallelQueryTimeOutException("test"); - } - catch (Exception e) - { - Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType()); - } await _virtualDbContext.AddRangeAsync(logDays); var bulkShardingExpression = _virtualDbContext.BulkShardingExpression(o => new[] { "A", "B" }.Contains(o.Area)); diff --git a/test/ShardingCore.Test/ShardingTestSync.cs b/test/ShardingCore.Test/ShardingTestSync.cs index dd5d3028..e1274fdb 100644 --- a/test/ShardingCore.Test/ShardingTestSync.cs +++ b/test/ShardingCore.Test/ShardingTestSync.cs @@ -154,14 +154,6 @@ namespace ShardingCore.Test Assert.False(isKey3); var isKey4 = userModMetadata.ShardingTableFieldIsKey(); Assert.True(isKey4); - try - { - throw new ShardingCoreParallelQueryTimeOutException("test"); - } - catch (Exception e) - { - Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType()); - } _virtualDbContext.AddRange(logDays); var bulkShardingExpression = _virtualDbContext.BulkShardingExpression(o => new[] { "A", "B" }.Contains(o.Area)); diff --git a/test/ShardingCore.Test/Startup.cs b/test/ShardingCore.Test/Startup.cs index 54d0e703..21a6ce54 100644 --- a/test/ShardingCore.Test/Startup.cs +++ b/test/ShardingCore.Test/Startup.cs @@ -38,8 +38,8 @@ namespace ShardingCore.Test .Begin(o => { #if DEBUG - o.CreateShardingTableOnStart = true; - o.EnsureCreatedWithOutShardingTable = true; + //o.CreateShardingTableOnStart = true; + //o.EnsureCreatedWithOutShardingTable = true; #endif o.AutoTrackEntity = true; }) diff --git a/test/ShardingCore.Test2x/ShardingTest.cs b/test/ShardingCore.Test2x/ShardingTest.cs index f2a18ff7..c650f4bc 100644 --- a/test/ShardingCore.Test2x/ShardingTest.cs +++ b/test/ShardingCore.Test2x/ShardingTest.cs @@ -163,14 +163,6 @@ namespace ShardingCore.Test2x Assert.False(isKey3); var isKey4 = userModMetadata.ShardingTableFieldIsKey(); Assert.True(isKey4); - try - { - throw new ShardingCoreParallelQueryTimeOutException("test"); - } - catch (Exception e) - { - Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType()); - } await _virtualDbContext.AddRangeAsync(logDays); var bulkShardingExpression = _virtualDbContext.BulkShardingExpression(o => new[] { "A", "B" }.Contains(o.Area)); diff --git a/test/ShardingCore.Test2x/ShardingTestSync.cs b/test/ShardingCore.Test2x/ShardingTestSync.cs index d90e66cd..c15c5056 100644 --- a/test/ShardingCore.Test2x/ShardingTestSync.cs +++ b/test/ShardingCore.Test2x/ShardingTestSync.cs @@ -153,15 +153,6 @@ namespace ShardingCore.Test2x Assert.False(isKey3); var isKey4 = userModMetadata.ShardingTableFieldIsKey(); Assert.True(isKey4); - try - { - throw new ShardingCoreParallelQueryTimeOutException("test"); - } - catch (Exception e) - { - Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType()); - } - _virtualDbContext.AddRange(logDays); var bulkShardingExpression = _virtualDbContext.BulkShardingExpression(o => new[] { "A", "B" }.Contains(o.Area)); Assert.Equal(2, bulkShardingExpression.Count); diff --git a/test/ShardingCore.Test3x/ShardingTest.cs b/test/ShardingCore.Test3x/ShardingTest.cs index 1fe1e424..76a8867d 100644 --- a/test/ShardingCore.Test3x/ShardingTest.cs +++ b/test/ShardingCore.Test3x/ShardingTest.cs @@ -163,14 +163,6 @@ namespace ShardingCore.Test3x Assert.False(isKey3); var isKey4 = userModMetadata.ShardingTableFieldIsKey(); Assert.True(isKey4); - try - { - throw new ShardingCoreParallelQueryTimeOutException("test"); - } - catch (Exception e) - { - Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType()); - } await _virtualDbContext.AddRangeAsync(logDays); var bulkShardingExpression = _virtualDbContext.BulkShardingExpression(o => new[] { "A", "B" }.Contains(o.Area)); diff --git a/test/ShardingCore.Test3x/ShardingTestSync.cs b/test/ShardingCore.Test3x/ShardingTestSync.cs index 455d147b..7aadc985 100644 --- a/test/ShardingCore.Test3x/ShardingTestSync.cs +++ b/test/ShardingCore.Test3x/ShardingTestSync.cs @@ -154,14 +154,6 @@ namespace ShardingCore.Test3x Assert.False(isKey3); var isKey4 = userModMetadata.ShardingTableFieldIsKey(); Assert.True(isKey4); - try - { - throw new ShardingCoreParallelQueryTimeOutException("test"); - } - catch (Exception e) - { - Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType()); - } _virtualDbContext.AddRange(logDays); var bulkShardingExpression = _virtualDbContext.BulkShardingExpression(o => new[] { "A", "B" }.Contains(o.Area)); diff --git a/test/ShardingCore.Test5x/ShardingTest.cs b/test/ShardingCore.Test5x/ShardingTest.cs index 60a01b46..d725dcdd 100644 --- a/test/ShardingCore.Test5x/ShardingTest.cs +++ b/test/ShardingCore.Test5x/ShardingTest.cs @@ -163,14 +163,6 @@ namespace ShardingCore.Test5x Assert.False(isKey3); var isKey4 = userModMetadata.ShardingTableFieldIsKey(); Assert.True(isKey4); - try - { - throw new ShardingCoreParallelQueryTimeOutException("test"); - } - catch (Exception e) - { - Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType()); - } await _virtualDbContext.AddRangeAsync(logDays); var bulkShardingExpression = _virtualDbContext.BulkShardingExpression(o => new[] { "A", "B" }.Contains(o.Area)); diff --git a/test/ShardingCore.Test5x/ShardingTestSync.cs b/test/ShardingCore.Test5x/ShardingTestSync.cs index 522108c7..0f78ef1c 100644 --- a/test/ShardingCore.Test5x/ShardingTestSync.cs +++ b/test/ShardingCore.Test5x/ShardingTestSync.cs @@ -154,14 +154,6 @@ namespace ShardingCore.Test5x Assert.False(isKey3); var isKey4 = userModMetadata.ShardingTableFieldIsKey(); Assert.True(isKey4); - try - { - throw new ShardingCoreParallelQueryTimeOutException("test"); - } - catch (Exception e) - { - Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType()); - } _virtualDbContext.AddRange(logDays); var bulkShardingExpression = _virtualDbContext.BulkShardingExpression(o => new[] { "A", "B" }.Contains(o.Area));