优化first or default

This commit is contained in:
xuejiaming 2022-07-04 12:13:32 +08:00
parent 03921e9766
commit ed89215482
17 changed files with 164 additions and 38 deletions

View File

@ -54,11 +54,18 @@ namespace Sample.MySql
{
o.UseShardingQuery((conStr,builder)=>
{
builder.UseMySql(conStr, new MySqlServerVersion(new Version())).UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking).UseLoggerFactory(efLogger);
builder.UseMySql(conStr, new MySqlServerVersion(new Version()))
.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking)
.UseLoggerFactory(efLogger)
.EnableSensitiveDataLogging();
});
o.UseShardingTransaction((connection, builder) =>
{
builder.UseMySql(connection, new MySqlServerVersion(new Version())).UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking).UseLoggerFactory(efLogger);
builder
.UseMySql(connection, new MySqlServerVersion(new Version())).
UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking)
.UseLoggerFactory(efLogger)
.EnableSensitiveDataLogging();
});
o.AddDefaultDataSource("ds0", "server=127.0.0.1;port=3306;database=dbdbd0;userid=root;password=root;");
}).ReplaceService<ITableEnsureManager,MySqlTableEnsureManager>(ServiceLifetime.Singleton)

View File

@ -0,0 +1,29 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Extensions
{
public static class StreamMergeEnumerableExtension
{
public static async Task<List<TEntity>> ToStreamListAsync<TEntity>(this IAsyncEnumerable<TEntity> source, int? take=null,CancellationToken cancellationToken=default)
{
#if EFCORE2
var list = await asyncEnumeratorStreamMergeEngine.ToList<TEntity>(cancellationToken);
#endif
#if !EFCORE2
var list = new List<TEntity>(take??4);
await foreach (var element in source.WithCancellation(cancellationToken))
{
list.Add(element);
}
#endif
return list;
}
public static List<TEntity> ToStreamList<TEntity>(this IEnumerable<TEntity> source)
{
return source.ToList();
}
}
}

View File

@ -23,7 +23,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal abstract class AbstractBaseMergeEngine<TEntity>
internal abstract class AbstractBaseMergeEngine
{
protected abstract StreamMergeContext GetStreamMergeContext();
protected bool UseUnionAllMerge()

View File

@ -17,7 +17,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal abstract class AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine<TEntity>, IEnsureMergeResult<TResult>
internal abstract class AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine, IEnsureMergeResult<TResult>
{
protected AbstractEnsureMethodCallInMemoryAsyncMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{

View File

@ -17,7 +17,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal abstract class AbstractEnsureMethodCallInMemoryAverageAsyncMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine<TEntity>, IEnsureMergeResult<TResult>
internal abstract class AbstractEnsureMethodCallInMemoryAverageAsyncMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine, IEnsureMergeResult<TResult>
{
protected AbstractEnsureMethodCallInMemoryAverageAsyncMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{

View File

@ -16,7 +16,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal abstract class AbstractInMemoryAsyncMergeEngine<TEntity> : AbstractBaseMergeEngine<TEntity>, IInMemoryAsyncMergeEngine
internal abstract class AbstractInMemoryAsyncMergeEngine : AbstractBaseMergeEngine, IInMemoryAsyncMergeEngine
{
private readonly StreamMergeContext _mergeContext;
@ -27,8 +27,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
public async Task<List<RouteQueryResult<TResult>>> ExecuteAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var routeQueryResults = _mergeContext.PreperExecute(() => new List<RouteQueryResult<TResult>>(0));
if (routeQueryResults != null)
if (!_mergeContext.TryPrepareExecuteContinueQuery(() => new List<RouteQueryResult<TResult>>(0),out var routeQueryResults))
return routeQueryResults;
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var waitExecuteQueue = GetDataSourceGroupAndExecutorGroup<RouteQueryResult<TResult>>(true, defaultSqlRouteUnits, cancellationToken).ToArray();

View File

@ -21,7 +21,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal abstract class AbstractStreamEnumerable<TEntity> : AbstractBaseMergeEngine<TEntity>, IStreamEnumerable<TEntity>
internal abstract class AbstractStreamEnumerable<TEntity> : AbstractBaseMergeEngine, IStreamEnumerable<TEntity>
{
private readonly StreamMergeContext _streamMergeContext;

View File

@ -21,7 +21,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class MaxAsyncInMemoryMergeEngine<TEntity, TResult> :AbstractInMemoryAsyncMergeEngine<TEntity>, IEnsureMergeResult<TResult>
internal class MaxAsyncInMemoryMergeEngine<TEntity, TResult> :AbstractInMemoryAsyncMergeEngine, IEnsureMergeResult<TResult>
{
public MaxAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{

View File

@ -21,7 +21,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class MinAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine<TEntity>, IEnsureMergeResult<TResult>
internal class MinAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine, IEnsureMergeResult<TResult>
{
public MinAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{

View File

@ -29,9 +29,11 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var emptyQueryEnumerator = _mergeContext.PreperExecute(() => new EmptyQueryEnumerator<T>());
if (emptyQueryEnumerator != null)
if (!_mergeContext.TryPrepareExecuteContinueQuery(() => new EmptyQueryEnumerator<T>(),
out var emptyQueryEnumerator))
{
return emptyQueryEnumerator;
}
var asyncEnumerator = EnumeratorStreamMergeEngineFactory<T>.Create(_mergeContext).GetStreamEnumerable()
.GetAsyncEnumerator(cancellationToken);
@ -47,8 +49,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
#if EFCORE2
IAsyncEnumerator<T> IAsyncEnumerable<T>.GetEnumerator()
{
var emptyQueryEnumerator = _mergeContext.PreperExecute(() => new EmptyQueryEnumerator<T>());
if (emptyQueryEnumerator != null)
if (!_mergeContext.TryPrepareExecuteContinueQuery(() => new EmptyQueryEnumerator<T>(),out var emptyQueryEnumerator))
return emptyQueryEnumerator;
var asyncEnumerator = ((IAsyncEnumerable<T>)EnumeratorStreamMergeEngineFactory<T>.Create(_mergeContext).GetStreamEnumerable())
.GetEnumerator();
@ -63,8 +64,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
public IEnumerator<T> GetEnumerator()
{
var emptyQueryEnumerator = _mergeContext.PreperExecute(() => new EmptyQueryEnumerator<T>());
if (emptyQueryEnumerator != null)
if (!_mergeContext.TryPrepareExecuteContinueQuery(() => new EmptyQueryEnumerator<T>(),out var emptyQueryEnumerator))
return emptyQueryEnumerator;
var enumerator = ((IEnumerable<T>)EnumeratorStreamMergeEngineFactory<T>.Create(_mergeContext).GetStreamEnumerable())
.GetEnumerator();

View File

@ -0,0 +1,26 @@
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Enumerators;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables
{
internal class FirstOrDefaultStreamMergeEnumerable<TEntity>:AbstractStreamEnumerable<TEntity>
{
public FirstOrDefaultStreamMergeEnumerable(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
}
protected override IStreamMergeCombine GetStreamMergeCombine()
{
return DefaultStreamMergeCombine.Instance;
}
protected override IExecutor<IStreamMergeAsyncEnumerator<TEntity>> CreateExecutor0(bool async)
{
return new DefaultEnumeratorExecutor<TEntity>(GetStreamMergeContext(), GetStreamMergeCombine(), async);
}
}
}

View File

@ -69,6 +69,14 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
return new DefaultShardingStreamEnumerable<TEntity>(_streamMergeContext);
}
var queryMethodName = _streamMergeContext.MergeQueryCompilerContext.GetQueryMethodName();
switch (queryMethodName)
{
case nameof(Enumerable.First):
case nameof(Enumerable.FirstOrDefault):
return new FirstOrDefaultStreamMergeEnumerable<TEntity>(_streamMergeContext);
}
//未开启系统分表或者本次查询涉及多张分表
if (_streamMergeContext.IsPaginationQuery() && _streamMergeContext.IsSingleShardingEntityQuery() && _shardingPageManager.Current != null)
{

View File

@ -70,10 +70,11 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Abstractions
var circuitBreaker = CreateCircuitBreaker();
var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups;
LinkedList<TResult> result = new LinkedList<TResult>();
var executorGroupsCount = executorGroups.Count;
//同数据库下多组数据间采用串行
foreach (var executorGroup in executorGroups)
{
executorGroupsCount--;
//同组采用并行最大化用户配置链接数
var routeQueryResults = await GroupExecuteAsync(executorGroup.Groups, cancellationToken);
//严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext
@ -99,9 +100,14 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Abstractions
result.AddLast(routeQueryResult.MergeResult);
}
}
if (IsCancelled()|| circuitBreaker.Terminated(result))
break;
//是否存在下次轮询如果是的那么就需要判断
var hasNextLoop = executorGroupsCount > 0;
if (hasNextLoop)
{
if (IsCancelled()|| circuitBreaker.Terminated(result))
break;
}
}
return result;

View File

@ -44,7 +44,10 @@ namespace ShardingCore.Sharding.MergeEngines
// }
public TEntity MergeResult()
{
return MergeResultAsync().WaitAndUnwrapException(false);
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = asyncEnumeratorStreamMergeEngine.ToList();
return GetFirstOrDefault(list);
}
public async Task<TEntity> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
@ -52,18 +55,14 @@ namespace ShardingCore.Sharding.MergeEngines
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
#if EFCORE2
var list = await asyncEnumeratorStreamMergeEngine.ToList<TEntity>(cancellationToken);
#endif
#if !EFCORE2
var take = _streamMergeContext.GetTake();
var list = new List<TEntity>(take??31);
await foreach (var element in asyncEnumeratorStreamMergeEngine.WithCancellation(cancellationToken))
{
list.Add(element);
}
#endif
var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken);
return GetFirstOrDefault(list);
}
private TEntity GetFirstOrDefault(List<TEntity> list)
{
if (list.IsEmpty())
{
return default;

View File

@ -38,7 +38,24 @@ namespace ShardingCore.Sharding.MergeEngines
// }
public TEntity MergeResult()
{
return MergeResultAsync().WaitAndUnwrapException(false);
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
#if EFCORE2
var list = asyncEnumeratorStreamMergeEngine.ToList();
#endif
#if !EFCORE2
var take = _streamMergeContext.GetTake();
var list = new List<TEntity>(take??4);
foreach (var element in asyncEnumeratorStreamMergeEngine)
{
list.Add(element);
}
#endif
if (list.IsEmpty())
throw new InvalidOperationException("Sequence contains no elements.");
return list.First();
}
public async Task<TEntity> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
@ -52,7 +69,7 @@ namespace ShardingCore.Sharding.MergeEngines
#endif
#if !EFCORE2
var take = _streamMergeContext.GetTake();
var list = new List<TEntity>(take??31);
var list = new List<TEntity>(take??4);
await foreach (var element in asyncEnumeratorStreamMergeEngine.WithCancellation(cancellationToken))
{
list.Add(element);

View File

@ -180,9 +180,13 @@ namespace ShardingCore.Sharding
// return Skip.HasValue || Take.HasValue;
//}
/// <summary>
/// 任意skip或者take大于0那么就说明是分页的查询
/// </summary>
/// <returns></returns>
public bool IsPaginationQuery()
{
return Skip.GetValueOrDefault() > 0 || Take.GetValueOrDefault() > 0;
return Skip is > 0 || Take is > 0;
}
@ -275,8 +279,22 @@ namespace ShardingCore.Sharding
return GetShardingDbContext().GetShardingRuntimeContext().GetRequiredService<IShardingComparer>();
}
public TResult PreperExecute<TResult>(Func<TResult> emptyFunc)
/// <summary>
/// 如果返回false那么就说明不需要继续查询了
/// 返回true表示需要继续查询
/// </summary>
/// <param name="emptyFunc"></param>
/// <param name="r"></param>
/// <typeparam name="TResult"></typeparam>
/// <returns></returns>
/// <exception cref="ShardingCoreQueryRouteNotMatchException"></exception>
public bool TryPrepareExecuteContinueQuery<TResult>(Func<TResult> emptyFunc,out TResult r)
{
if (TakeZeroNoQueryExecute())
{
r = emptyFunc();
return false;
}
if (IsRouteNotMatch())
{
@ -286,11 +304,13 @@ namespace ShardingCore.Sharding
}
else
{
return emptyFunc();
r = emptyFunc();
return false;
}
}
return default;
r = default;
return true;
}
/// <summary>
/// 无路由匹配
@ -301,6 +321,15 @@ namespace ShardingCore.Sharding
return ShardingRouteResult.IsEmpty;
}
/// <summary>
/// take有值并且是0的情况下那么就说明不需要获取
/// </summary>
/// <returns></returns>
public bool TakeZeroNoQueryExecute()
{
return Take is 0;
}
private bool ThrowIfQueryRouteNotMatch()
{
return _shardingRouteConfigOptions.ThrowIfQueryRouteNotMatch;

View File

@ -31,6 +31,12 @@
<Compile Update="..\..\src\ShardingCore\Sharding\ShardingExecutors\Abstractions\ICompileParameter.cs">
<Link>Sharding\ShardingExecutors\Abstractions\ICompileParameter.cs</Link>
</Compile>
<Compile Update="..\..\src\ShardingCore\Sharding\MergeEngines\EnumeratorStreamMergeEngines\Abstractions\AbstractStreamMergeEnumerable.cs">
<Link>Sharding\MergeEngines\EnumeratorStreamMergeEngines\Abstractions\AbstractStreamMergeEnumerable.cs</Link>
</Compile>
<Compile Update="..\..\src\ShardingCore\Sharding\MergeEngines\EnumeratorStreamMergeEngines\Enumerables\FirstOrDefaultStreamMergeEnumerable.cs">
<Link>Sharding\MergeEngines\EnumeratorStreamMergeEngines\Enumerables\FirstOrDefaultStreamMergeEnumerable.cs</Link>
</Compile>
</ItemGroup>