完美集成connection mode并且优化connection mode

This commit is contained in:
xuejiaming 2021-12-06 14:19:54 +08:00
parent ec109517cd
commit 9eff3f47d5
29 changed files with 282 additions and 253 deletions

View File

@ -92,7 +92,7 @@ AMD Ryzen 9 3900X, 1 CPU, 24 logical and 12 physical cores
| NoShardingNoIndexToListAsync | 10 | 25,865.136 ms | 115.6391 ms | 102.5111 ms | 25,847.258 ms |
| ShardingNoIndexToListAsync | 10 | 5,502.922 ms | 92.7201 ms | 86.7305 ms | 5,483.847 ms |
具体可以通过first前两次结果来计算得出结论单次查询的的损耗为0.3-0.4毫秒之间,通过数据聚合和数据路由的损耗单次在0.3ms-0.4ms,其中创建dbcontext为0.1毫秒目前没有好的优化方案,0.013毫秒左右是路由表达式解析和编译,复杂表达式可能更加耗时,剩下的0.28毫秒为数据源和表后缀的解析等操作包括实例的反射创建和数据的聚合,
具体可以通过first前两次结果来计算得出结论单次查询的的损耗为0.2-0.3毫秒之间,通过数据聚合和数据路由的损耗单次在0.3ms-0.4ms,其中创建dbcontext为0.1毫秒目前没有好的优化方案,0.013毫秒左右是路由表达式解析和编译,复杂表达式可能更加耗时,剩下的0.2毫秒为数据源和表后缀的解析等操作包括实例的反射创建和数据的聚合,
sqlserver的各项数据在分表和未分表的情况下都几乎差不多可以得出在770w数据集情况下数据库还并未是数据瓶颈的关键但是mysql可以看到在分表和未分表的情况下如果涉及到没有索引的全表扫描那么性能的差距将是分表后的表数目之多测试中为5-6倍也就是分表数目

View File

@ -34,8 +34,6 @@ namespace Sample.SqlServer
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
o.AutoTrackEntity = true;
o.ParallelQueryMaxThreadCount = 100;
o.ParallelQueryTimeOut = TimeSpan.FromSeconds(10);
//if SysTest entity not exists in db and db is exists
//o.AddEntityTryCreateTable<SysTest>(); // or `o.AddEntitiesTryCreateTable(typeof(SysTest));`
})

View File

@ -2,8 +2,11 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.EntityFrameworkCore.Query.Internal;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using ShardingCore.Bootstrapers;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.QueryRouteManagers;
using ShardingCore.Core.QueryRouteManagers.Abstractions;
using ShardingCore.Core.ShardingPage;
@ -18,23 +21,14 @@ using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.DbContexts;
using ShardingCore.DIExtensions;
using ShardingCore.EFCores;
using ShardingCore.EFCores.OptionsExtensions;
using ShardingCore.Helpers;
using ShardingCore.Jobs;
using ShardingCore.Sharding;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingQueryExecutors;
using ShardingCore.TableCreator;
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Bootstrapers;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.EFCores.OptionsExtensions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Jobs;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore
{

View File

@ -36,16 +36,11 @@ namespace ShardingCore.DIExtensions
{
var shardingCoreBeginOptions = new ShardingCoreBeginOptions();
shardingCoreBeginOptionsConfigure?.Invoke(shardingCoreBeginOptions);
if (shardingCoreBeginOptions.ParallelQueryMaxThreadCount <= 0)
if (shardingCoreBeginOptions.MaxQueryConnectionsLimit <= 0)
throw new ArgumentException(
$"{nameof(shardingCoreBeginOptions.ParallelQueryMaxThreadCount)} should greater than zero thread count");
if (shardingCoreBeginOptions.ParallelQueryTimeOut.TotalMilliseconds <= 0)
throw new ArgumentException(
$"{nameof(shardingCoreBeginOptions.ParallelQueryTimeOut)} should greater than zero milliseconds");
$"{nameof(shardingCoreBeginOptions.MaxQueryConnectionsLimit)} should greater than and equal 1");
ShardingConfigOption.EnsureCreatedWithOutShardingTable = shardingCoreBeginOptions.EnsureCreatedWithOutShardingTable;
ShardingConfigOption.AutoTrackEntity = shardingCoreBeginOptions.AutoTrackEntity;
ShardingConfigOption.ParallelQueryMaxThreadCount = shardingCoreBeginOptions.ParallelQueryMaxThreadCount;
ShardingConfigOption.ParallelQueryTimeOut = shardingCoreBeginOptions.ParallelQueryTimeOut;
ShardingConfigOption.CreateShardingTableOnStart = shardingCoreBeginOptions.CreateShardingTableOnStart;
ShardingConfigOption.IgnoreCreateTableError = shardingCoreBeginOptions.IgnoreCreateTableError;
ShardingConfigOption.MaxQueryConnectionsLimit = shardingCoreBeginOptions.MaxQueryConnectionsLimit;
@ -96,15 +91,6 @@ namespace ShardingCore.DIExtensions
/// </summary>
public bool AutoTrackEntity { get; set; }
/// <summary>
/// 单次查询并发线程数目(最小1)默认cpu核心数*2
/// </summary>
public int ParallelQueryMaxThreadCount { get; set; } = Environment.ProcessorCount*2;
/// <summary>
/// 默认30秒超时
/// </summary>
public TimeSpan ParallelQueryTimeOut { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// 忽略建表时的错误
/// </summary>

View File

@ -1,24 +0,0 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.Serialization;
using System.Text;
namespace ShardingCore.Exceptions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/10/3 22:25:59
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
[ExcludeFromCodeCoverage]
public class ShardingCoreParallelQueryTimeOutException:ShardingCoreException
{
public ShardingCoreParallelQueryTimeOutException(string message) : base(message)
{
}
}
}

View File

@ -1,34 +1,34 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
//using System;
//using System.Collections.Generic;
//using System.Linq;
//using System.Text;
//using System.Threading.Tasks;
//using Microsoft.EntityFrameworkCore;
//using ShardingCore.Core;
namespace ShardingCore.Extensions
{
internal static class ConnectionModeExtension
{
public static async Task<T> ReleaseConnectionAsync<T>(this Task<T> executeTask, DbContext dbContext,
ConnectionModeEnum connectionMode)
{
try
{
return await executeTask;
}
finally
{
if (connectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
{
#if !EFCORE2
await dbContext.DisposeAsync();
#endif
#if EFCORE2
dbContext.Dispose();
#endif
}
}
}
}
}
//namespace ShardingCore.Extensions
//{
// internal static class ConnectionModeExtension
// {
// public static async Task<T> ReleaseConnectionAsync<T>(this Task<T> executeTask, DbContext dbContext,
// ConnectionModeEnum connectionMode)
// {
// try
// {
// return await executeTask;
// }
// finally
// {
//// if (connectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
//// {
////#if !EFCORE2
//// await dbContext.DisposeAsync();
////#endif
////#if EFCORE2
//// dbContext.Dispose();
////#endif
//// }
// }
// }
// }
//}

View File

@ -62,15 +62,6 @@ namespace ShardingCore
/// 自动追踪实体
/// </summary>
public bool AutoTrackEntity { get; set; }
/// <summary>
/// 单次查询并发线程数目(1-65536)
/// </summary>
public int ParallelQueryMaxThreadCount { get; set; }
/// <summary>
/// 并发查询超时时间
/// </summary>
public TimeSpan ParallelQueryTimeOut { get; set; }
/// <summary>
/// 默认数据源名称
/// </summary>

View File

@ -11,57 +11,50 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
{
internal class InMemoryStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
{
private readonly bool _async;
private readonly IEnumerator<T> _inMemoryEnumerator;
private bool skip;
public InMemoryStreamMergeAsyncEnumerator(IAsyncEnumerator<T> asyncSource)
public InMemoryStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<T> asyncSource, bool async)
{
if (_inMemoryEnumerator != null)
throw new ArgumentNullException(nameof(_inMemoryEnumerator));
_async = async;
_inMemoryEnumerator = GetAllRowsAsync(asyncSource).WaitAndUnwrapException();
if (_async)
_inMemoryEnumerator = GetAllRowsAsync(asyncSource).WaitAndUnwrapException();
else
_inMemoryEnumerator = GetAllRows(asyncSource);
_inMemoryEnumerator.MoveNext();
skip = true;
}
private async Task<IEnumerator<T>> GetAllRowsAsync(IAsyncEnumerator<T> asyncSource)
private async Task<IEnumerator<T>> GetAllRowsAsync(IStreamMergeAsyncEnumerator<T> streamMergeAsyncEnumerator)
{
var linkedList = new LinkedList<T>();
if (asyncSource.Current != null)
{
linkedList.AddLast(asyncSource.Current);
#if !EFCORE2
while (await asyncSource.MoveNextAsync())
while (await streamMergeAsyncEnumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await asyncSource.MoveNext())
while (await streamMergeAsyncEnumerator.MoveNext(new CancellationToken()))
#endif
{
linkedList.AddLast(asyncSource.Current);
}
{
linkedList.AddLast(streamMergeAsyncEnumerator.GetCurrent());
}
return linkedList.GetEnumerator();
}
public InMemoryStreamMergeAsyncEnumerator(IEnumerator<T> syncSource)
{
if (_inMemoryEnumerator != null)
throw new ArgumentNullException(nameof(_inMemoryEnumerator));
_inMemoryEnumerator = GetAllRows(syncSource);
_inMemoryEnumerator.MoveNext();
skip = true;
}
private IEnumerator<T> GetAllRows(IEnumerator<T> syncSource)
private IEnumerator<T> GetAllRows(IStreamMergeAsyncEnumerator<T> streamMergeAsyncEnumerator)
{
var linkedList = new LinkedList<T>();
if (syncSource.Current != null)
#if !EFCORE2
while ( streamMergeAsyncEnumerator.MoveNext())
#endif
#if EFCORE2
while (streamMergeAsyncEnumerator.MoveNext())
#endif
{
linkedList.AddLast(syncSource.Current);
while (syncSource.MoveNext())
{
linkedList.AddLast(syncSource.Current);
}
linkedList.AddLast(streamMergeAsyncEnumerator.GetCurrent());
}
return linkedList.GetEnumerator();
@ -77,20 +70,20 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
return false;
}
#if !EFCORE2
public ValueTask DisposeAsync()
public ValueTask DisposeAsync()
{
_inMemoryEnumerator.Dispose();
return new ValueTask();
}
public ValueTask<bool> MoveNextAsync()
public ValueTask<bool> MoveNextAsync()
{
if (skip)
{
skip = false;
return new ValueTask<bool>(null != _inMemoryEnumerator.Current);
}
return new ValueTask<bool>(_inMemoryEnumerator.MoveNext());
return new ValueTask<bool>(_inMemoryEnumerator.MoveNext());
}
public void Dispose()
@ -139,7 +132,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
_inMemoryEnumerator?.Dispose();
}
public Task<bool> MoveNext(CancellationToken cancellationToken = new CancellationToken())
public Task<bool> MoveNext(CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
if (skip)
@ -147,7 +140,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
skip = false;
return Task.FromResult(null != _inMemoryEnumerator.Current);
}
return Task.FromResult(_inMemoryEnumerator.MoveNext());
return Task.FromResult(_inMemoryEnumerator.MoveNext());
}

View File

@ -21,11 +21,14 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
private int realSkip = 0;
private int realTake = 0;
public PaginationStreamMergeAsyncEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeAsyncEnumerator<T>> sources)
public PaginationStreamMergeAsyncEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeAsyncEnumerator<T>> sources):this(mergeContext, sources,mergeContext.Skip, mergeContext.Take)
{
}
public PaginationStreamMergeAsyncEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeAsyncEnumerator<T>> sources,int? skip,int? take)
{
_mergeContext = mergeContext;
_skip = mergeContext.Skip;
_take = mergeContext.Take;
_skip = skip;
_take = take;
if (_mergeContext.HasGroupQuery())
_enumerator = new MultiAggregateOrderStreamMergeAsyncEnumerator<T>(_mergeContext, sources);
else

View File

@ -68,19 +68,43 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
//}
public Task<LinkedList<TResult>>[] GetDataSourceGroupAndExecutorGroup<TResult>(IEnumerable<ISqlRouteUnit> sqlRouteUnits,Func<SqlExecutorUnit,Task<TResult>> sqlExecutorUnitExecuteAsync,CancellationToken cancellationToken=new CancellationToken())
public Task<LinkedList<TResult>>[] GetDataSourceGroupAndExecutorGroup<TResult>(bool async,IEnumerable<ISqlRouteUnit> sqlRouteUnits, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
{
var waitTaskQueue = AggregateQueryByDataSourceName(sqlRouteUnits).Select(GetSqlExecutorGroups).Select(executorGroups =>
var waitTaskQueue = AggregateQueryByDataSourceName(sqlRouteUnits)
.Select(GetSqlExecutorGroups)
.Select(dataSourceSqlExecutorUnit =>
{
return Task.Run(async () =>
{
var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups;
LinkedList<TResult> result = new LinkedList<TResult>();
//同数据库下多组数据间采用串行
foreach (var executorGroup in executorGroups)
{
var routeQueryResults = await ExecuteAsync<TResult>(executorGroup.Groups, sqlExecutorUnitExecuteAsync,cancellationToken);
foreach (var routeQueryResult in routeQueryResults)
//同组采用并行最大化用户配置链接数
var routeQueryResults = await ExecuteAsync<TResult>(executorGroup.Groups, sqlExecutorUnitExecuteAsync, cancellationToken);
//严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext
if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
{
result.AddLast(routeQueryResult);
MergeParallelExecuteResult(result, routeQueryResults.Select(o => o.MergeResult), async);
var dbContexts = routeQueryResults.Select(o => o.DbContext);
foreach (var dbContext in dbContexts)
{
#if !EFCORE2
await dbContext.DisposeAsync();
#endif
#if EFCORE2
dbContext.Dispose();
#endif
}
}
else
{
foreach (var routeQueryResult in routeQueryResults)
{
result.AddLast(routeQueryResult.MergeResult);
}
}
}
@ -90,6 +114,14 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
return waitTaskQueue;
}
public virtual void MergeParallelExecuteResult<TResult>(LinkedList<TResult> previewResults, IEnumerable<TResult> parallelResults, bool async)
{
foreach (var parallelResult in parallelResults)
{
previewResults.AddLast(parallelResult);
}
}
protected virtual IEnumerable<ISqlRouteUnit> GetDefaultSqlRouteUnits()
{
@ -107,7 +139,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
return sqlRouteUnits.GroupBy(o => o.DataSourceName);
}
protected List<SqlExecutorGroup<SqlExecutorUnit>> GetSqlExecutorGroups(IGrouping<string, ISqlRouteUnit> sqlGroups)
protected DataSourceSqlExecutorUnit GetSqlExecutorGroups(IGrouping<string, ISqlRouteUnit> sqlGroups)
{
var streamMergeContext = GetStreamMergeContext();
var maxQueryConnectionsLimit = streamMergeContext.GetMaxQueryConnectionsLimit();
@ -123,33 +155,41 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
var sqlExecutorUnitPartitions = sqlGroups
.Select((o, i) => new { Obj = o, index = i % exceptCount }).GroupBy(o => o.index)
.Select(o => o.Select(g => new SqlExecutorUnit(connectionMode, g.Obj)).ToList()).ToList();
return sqlExecutorUnitPartitions.Select(o => new SqlExecutorGroup<SqlExecutorUnit>(o)).ToList();
var sqlExecutorGroups = sqlExecutorUnitPartitions.Select(o => new SqlExecutorGroup<SqlExecutorUnit>(connectionMode, o)).ToList();
return new DataSourceSqlExecutorUnit(connectionMode, sqlExecutorGroups);
}
protected async Task<LinkedList<TResult>> ExecuteAsync<TResult>(List<SqlExecutorUnit> sqlExecutorUnits, Func<SqlExecutorUnit, Task<TResult>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
/// <summary>
/// 同库同组下面的并行异步执行,需要归并成一个结果
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="sqlExecutorUnits"></param>
/// <param name="sqlExecutorUnitExecuteAsync"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected async Task<LinkedList<ShardingMergeResult<TResult>>> ExecuteAsync<TResult>(List<SqlExecutorUnit> sqlExecutorUnits, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
{
if (sqlExecutorUnits.Count <= 0)
{
return new LinkedList<TResult>();
return new LinkedList<ShardingMergeResult<TResult>>();
}
else
{
var result=new LinkedList<TResult>();
Task<TResult>[] tasks=null;
var result = new LinkedList<ShardingMergeResult<TResult>>();
Task<ShardingMergeResult<TResult>>[] tasks = null;
if (sqlExecutorUnits.Count > 1)
{
tasks = sqlExecutorUnits.Skip(1).Select(sqlExecutorUnit =>
{
return Task.Run(async () =>
{
return await sqlExecutorUnitExecuteAsync(sqlExecutorUnit);
tasks = sqlExecutorUnits.Skip(1).Select(sqlExecutorUnit =>
{
return Task.Run(async () =>
{
return await sqlExecutorUnitExecuteAsync(sqlExecutorUnit);
}, cancellationToken);
}).ToArray();
}, cancellationToken);
}).ToArray();
}
else
{
tasks = Array.Empty<Task<TResult>>();
tasks = Array.Empty<Task<ShardingMergeResult<TResult>>>();
}
var firstResult = await sqlExecutorUnitExecuteAsync(sqlExecutorUnits[0]);
result.AddLast(firstResult);

View File

@ -86,7 +86,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
public async Task<List<RouteQueryResult<TResult>>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken())
{
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var waitExecuteQueue = GetDataSourceGroupAndExecutorGroup<RouteQueryResult<TResult>>(defaultSqlRouteUnits,
var waitExecuteQueue = GetDataSourceGroupAndExecutorGroup<RouteQueryResult<TResult>>(true,defaultSqlRouteUnits,
async sqlExecutorUnit =>
{
var connectionMode = _mergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
@ -96,8 +96,9 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
var (asyncExecuteQueryable, dbContext) =
CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult, connectionMode);
var queryResult = await efQuery(asyncExecuteQueryable).ReleaseConnectionAsync(dbContext, connectionMode);
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
var queryResult = await efQuery(asyncExecuteQueryable);
var routeQueryResult = new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
return new ShardingMergeResult<RouteQueryResult<TResult>>(dbContext, routeQueryResult);
}).ToArray();
return (await Task.WhenAll(waitExecuteQueue)).SelectMany(o => o).ToList();
@ -133,10 +134,10 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
return _mergeContext;
}
public IQueryable<TEntity> GetQueryable()
{
return _queryable;
}
//public IQueryable<TEntity> GetQueryable()
//{
// return _queryable;
//}
protected MethodCallExpression GetMethodCallExpression()
{

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
internal class ShardingMergeResult<TResult>
{
public DbContext DbContext { get; }
public TResult MergeResult { get; }
public ShardingMergeResult(DbContext dbContext,TResult mergeResult)
{
DbContext = dbContext;
MergeResult = mergeResult;
}
}
}

View File

@ -90,8 +90,53 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
/// 获取路由查询的迭代器
/// </summary>
/// <param name="async"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public abstract IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken());
/// <summary>
/// 合并流式聚合内存最小化
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="previewResults"></param>
/// <param name="parallelResults"></param>
/// <param name="async"></param>
/// <returns></returns>
/// <exception cref="ShardingCoreInvalidOperationException"></exception>
public override void MergeParallelExecuteResult<TResult>(LinkedList<TResult> previewResults, IEnumerable<TResult> parallelResults,bool async)
{
if (previewResults.Count > 1)
{
throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} {nameof(previewResults)} has more than one element in container");
}
var parallelCount = parallelResults.Count();
if (parallelCount == 0)
return;
//聚合
if (previewResults is LinkedList<IStreamMergeAsyncEnumerator<TEntity>> previewInMemoryStreamEnumeratorResults && parallelResults is IEnumerable<IStreamMergeAsyncEnumerator<TEntity>> parallelStreamEnumeratorResults)
{
var mergeAsyncEnumerators = new LinkedList<IStreamMergeAsyncEnumerator<TEntity>>();
if (previewInMemoryStreamEnumeratorResults.Count == 1)
{
mergeAsyncEnumerators.AddLast(previewInMemoryStreamEnumeratorResults.First());
}
foreach (var parallelStreamEnumeratorResult in parallelStreamEnumeratorResults)
{
mergeAsyncEnumerators.AddLast(parallelStreamEnumeratorResult);
}
var combineStreamMergeAsyncEnumerator = CombineInMemoryStreamMergeAsyncEnumerator(mergeAsyncEnumerators.ToArray());
var inMemoryStreamMergeAsyncEnumerator = new InMemoryStreamMergeAsyncEnumerator<TEntity>(combineStreamMergeAsyncEnumerator, async);
previewInMemoryStreamEnumeratorResults.Clear();
previewInMemoryStreamEnumeratorResults.AddLast(inMemoryStreamMergeAsyncEnumerator);
//合并
return;
}
throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} is not {typeof(IStreamMergeAsyncEnumerator<TEntity>)}");
}
/// <summary>
/// 合并成一个迭代器
/// </summary>
@ -99,27 +144,32 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
/// <returns></returns>
public abstract IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators);
public virtual IStreamMergeAsyncEnumerator<TEntity> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
return CombineStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
}
/// <summary>
/// 开启异步线程获取并发迭代器
/// </summary>
/// <param name="queryable"></param>
/// <param name="async"></param>
/// <param name="connectionMode"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumerator(IQueryable<TEntity> queryable, bool async,ConnectionModeEnum connectionMode,
public async Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumerator(IQueryable<TEntity> queryable, bool async,
CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
if (async)
{
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
return connectionMode==ConnectionModeEnum.MEMORY_STRICTLY? new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator):new InMemoryStreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
else
{
var enumerator = GetEnumerator0(queryable);
return connectionMode == ConnectionModeEnum.MEMORY_STRICTLY ? new StreamMergeAsyncEnumerator<TEntity>(enumerator):new InMemoryStreamMergeAsyncEnumerator<TEntity>(enumerator);
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core;
namespace ShardingCore.Sharding.MergeEngines.Common
{
internal class DataSourceSqlExecutorUnit
{
public ConnectionModeEnum ConnectionMode { get; }
public List<SqlExecutorGroup<SqlExecutorUnit>> SqlExecutorGroups { get; }
public DataSourceSqlExecutorUnit(ConnectionModeEnum connectionMode,List<SqlExecutorGroup<SqlExecutorUnit>> sqlExecutorGroups)
{
ConnectionMode = connectionMode;
SqlExecutorGroups = sqlExecutorGroups;
}
}
}

View File

@ -3,19 +3,23 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core;
namespace ShardingCore.Sharding.MergeEngines.Common
{
internal sealed class SqlExecutorGroup<T>
{
public SqlExecutorGroup(List<T> groups)
public SqlExecutorGroup(ConnectionModeEnum connectionMode,List<T> groups)
{
ConnectionMode = connectionMode;
Groups = groups;
}
public ConnectionModeEnum ConnectionMode { get; }
/// <summary>
/// 执行组
/// </summary>
public List<T> Groups { get; }
}
}

View File

@ -12,6 +12,7 @@ using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.PaginationConfigurations;
@ -105,21 +106,22 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
StreamMergeContext.ReSetOrders(reSetOrders);
var sqlSequenceRouteUnits = sequenceResults.Select(sequenceResult => new SqlSequenceRouteUnit(sequenceResult));
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(sqlSequenceRouteUnits,
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(async,sqlSequenceRouteUnits,
async sqlExecutorUnit =>
{
var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var (newQueryable, dbContext) = CreateAsyncExecuteQueryable(noPaginationQueryable,
((SqlSequenceRouteUnit)sqlExecutorUnit.RouteUnit).SequenceResult, reSetOrders, connectionMode);
return await AsyncParallelEnumerator(newQueryable, async, connectionMode, cancellationToken)
.ReleaseConnectionAsync(dbContext, connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext,
streamMergeAsyncEnumerator);
}, cancellationToken);
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o => o).ToArray();
return streamEnumerators;
}
private (IQueryable<TEntity>,DbContext) CreateAsyncExecuteQueryable(IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult, IEnumerable<PropertyOrder> reSetOrders,ConnectionModeEnum connectionMode)
private (IQueryable<TEntity>, DbContext) CreateAsyncExecuteQueryable(IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult, IEnumerable<PropertyOrder> reSetOrders, ConnectionModeEnum connectionMode)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(sequenceResult.DSName, sequenceResult.TableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take).OrderWithExpression(reSetOrders))

View File

@ -1,13 +1,16 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
@ -30,15 +33,15 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
{
cancellationToken.ThrowIfCancellationRequested();
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(defaultSqlRouteUnits,
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(async,defaultSqlRouteUnits,
async sqlExecutorUnit =>
{
var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var (newQueryable,dbContext) = CreateAsyncExecuteQueryable(dataSourceName, routeResult, connectionMode);
return await AsyncParallelEnumerator(newQueryable, async,connectionMode, cancellationToken)
.ReleaseConnectionAsync(dbContext, connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext, streamMergeAsyncEnumerator);
}, cancellationToken);
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray();
@ -53,6 +56,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
return (newQueryable,shardingDbContext);
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.IsPaginationQuery())
@ -61,5 +65,13 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators,0, StreamMergeContext.Skip.GetValueOrDefault() + StreamMergeContext.Take.GetValueOrDefault());
return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
}
}
}

View File

@ -12,6 +12,7 @@ using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
@ -47,7 +48,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
var reverseOrderQueryable = noPaginationNoOrderQueryable.Take((int)realSkip+(int)take).OrderWithExpression(propertyOrders);
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(defaultSqlRouteUnits,
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(async,defaultSqlRouteUnits,
async sqlExecutorUnit =>
{
var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode);
@ -55,7 +56,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var (newQueryable,dbContext) =
CreateAsyncExecuteQueryable(dataSourceName, reverseOrderQueryable, routeResult, connectionMode);
return await AsyncParallelEnumerator(newQueryable, async, connectionMode, cancellationToken).ReleaseConnectionAsync(dbContext,connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async,cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext,
streamMergeAsyncEnumerator);
});
@ -85,5 +88,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators, 0, StreamMergeContext.Skip.GetValueOrDefault() + StreamMergeContext.Take.GetValueOrDefault());
return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
}
}
}

View File

@ -11,6 +11,7 @@ using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.PaginationConfigurations;
@ -91,14 +92,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
var sequenceResults = new SequencePaginationList(sortRouteResults.Select(o => o.RouteQueryResult)).Skip(skip).Take(take).ToList();
var sqlSequenceRouteUnits = sequenceResults.Select(sequenceResult => new SqlSequenceRouteUnit(sequenceResult));
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(sqlSequenceRouteUnits,
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(async,sqlSequenceRouteUnits,
async sqlExecutorUnit =>
{
var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var (newQueryable, dbContext) = CreateAsyncExecuteQueryable(noPaginationQueryable,
((SqlSequenceRouteUnit)sqlExecutorUnit.RouteUnit).SequenceResult, connectionMode);
return await AsyncParallelEnumerator(newQueryable, async, connectionMode, cancellationToken)
.ReleaseConnectionAsync(dbContext, connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext,streamMergeAsyncEnumerator);
}, cancellationToken);
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray();
return streamEnumerators;

View File

@ -230,15 +230,6 @@ namespace ShardingCore
/// </summary>
public bool AutoTrackEntity { get; set; }
/// <summary>
/// 单次查询并发线程数目(最小1)
/// </summary>
public int ParallelQueryMaxThreadCount { get; set; } = Environment.ProcessorCount*2;
/// <summary>
/// 默认30秒超时
/// </summary>
public TimeSpan ParallelQueryTimeOut { get; set; }=TimeSpan.FromSeconds(30);
public string DefaultDataSourceName { get; set; }
public string DefaultConnectionString { get; set; }
public int MaxQueryConnectionsLimit { get; set; } = Environment.ProcessorCount;

View File

@ -173,14 +173,6 @@ namespace ShardingCore.Test
Assert.False(isKey3);
var isKey4 = userModMetadata.ShardingTableFieldIsKey();
Assert.True(isKey4);
try
{
throw new ShardingCoreParallelQueryTimeOutException("test");
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType());
}
await _virtualDbContext.AddRangeAsync(logDays);
var bulkShardingExpression = _virtualDbContext.BulkShardingExpression<ShardingDefaultDbContext, Order>(o => new[] { "A", "B" }.Contains(o.Area));

View File

@ -154,14 +154,6 @@ namespace ShardingCore.Test
Assert.False(isKey3);
var isKey4 = userModMetadata.ShardingTableFieldIsKey();
Assert.True(isKey4);
try
{
throw new ShardingCoreParallelQueryTimeOutException("test");
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType());
}
_virtualDbContext.AddRange(logDays);
var bulkShardingExpression = _virtualDbContext.BulkShardingExpression<ShardingDefaultDbContext, Order>(o => new[] { "A", "B" }.Contains(o.Area));

View File

@ -38,8 +38,8 @@ namespace ShardingCore.Test
.Begin(o =>
{
#if DEBUG
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
//o.CreateShardingTableOnStart = true;
//o.EnsureCreatedWithOutShardingTable = true;
#endif
o.AutoTrackEntity = true;
})

View File

@ -163,14 +163,6 @@ namespace ShardingCore.Test2x
Assert.False(isKey3);
var isKey4 = userModMetadata.ShardingTableFieldIsKey();
Assert.True(isKey4);
try
{
throw new ShardingCoreParallelQueryTimeOutException("test");
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType());
}
await _virtualDbContext.AddRangeAsync(logDays);
var bulkShardingExpression = _virtualDbContext.BulkShardingExpression<ShardingDefaultDbContext, Order>(o => new[] { "A", "B" }.Contains(o.Area));

View File

@ -153,15 +153,6 @@ namespace ShardingCore.Test2x
Assert.False(isKey3);
var isKey4 = userModMetadata.ShardingTableFieldIsKey();
Assert.True(isKey4);
try
{
throw new ShardingCoreParallelQueryTimeOutException("test");
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType());
}
_virtualDbContext.AddRange(logDays);
var bulkShardingExpression = _virtualDbContext.BulkShardingExpression<ShardingDefaultDbContext, Order>(o => new[] { "A", "B" }.Contains(o.Area));
Assert.Equal(2, bulkShardingExpression.Count);

View File

@ -163,14 +163,6 @@ namespace ShardingCore.Test3x
Assert.False(isKey3);
var isKey4 = userModMetadata.ShardingTableFieldIsKey();
Assert.True(isKey4);
try
{
throw new ShardingCoreParallelQueryTimeOutException("test");
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType());
}
await _virtualDbContext.AddRangeAsync(logDays);
var bulkShardingExpression = _virtualDbContext.BulkShardingExpression<ShardingDefaultDbContext, Order>(o => new[] { "A", "B" }.Contains(o.Area));

View File

@ -154,14 +154,6 @@ namespace ShardingCore.Test3x
Assert.False(isKey3);
var isKey4 = userModMetadata.ShardingTableFieldIsKey();
Assert.True(isKey4);
try
{
throw new ShardingCoreParallelQueryTimeOutException("test");
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType());
}
_virtualDbContext.AddRange(logDays);
var bulkShardingExpression = _virtualDbContext.BulkShardingExpression<ShardingDefaultDbContext, Order>(o => new[] { "A", "B" }.Contains(o.Area));

View File

@ -163,14 +163,6 @@ namespace ShardingCore.Test5x
Assert.False(isKey3);
var isKey4 = userModMetadata.ShardingTableFieldIsKey();
Assert.True(isKey4);
try
{
throw new ShardingCoreParallelQueryTimeOutException("test");
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType());
}
await _virtualDbContext.AddRangeAsync(logDays);
var bulkShardingExpression = _virtualDbContext.BulkShardingExpression<ShardingDefaultDbContext, Order>(o => new[] { "A", "B" }.Contains(o.Area));

View File

@ -154,14 +154,6 @@ namespace ShardingCore.Test5x
Assert.False(isKey3);
var isKey4 = userModMetadata.ShardingTableFieldIsKey();
Assert.True(isKey4);
try
{
throw new ShardingCoreParallelQueryTimeOutException("test");
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreParallelQueryTimeOutException), e.GetType());
}
_virtualDbContext.AddRange(logDays);
var bulkShardingExpression = _virtualDbContext.BulkShardingExpression<ShardingDefaultDbContext, Order>(o => new[] { "A", "B" }.Contains(o.Area));