添加single skip[#168] [#166]

This commit is contained in:
xuejiaming 2022-07-05 13:05:06 +08:00
parent deffbd5f77
commit 050fc05e37
8 changed files with 164 additions and 27 deletions

View File

@ -27,10 +27,10 @@ namespace ShardingCore.Sharding.MergeContexts
{
supportSingleEntityMethodNames.Add(nameof(Enumerable.First));
supportSingleEntityMethodNames.Add(nameof(Enumerable.FirstOrDefault));
supportSingleEntityMethodNames.Add(nameof(Enumerable.Single));
supportSingleEntityMethodNames.Add(nameof(Enumerable.SingleOrDefault));
singleEntityMethodNames.Add(nameof(Enumerable.Last));
singleEntityMethodNames.Add(nameof(Enumerable.LastOrDefault));
singleEntityMethodNames.Add(nameof(Enumerable.Single));
singleEntityMethodNames.Add(nameof(Enumerable.SingleOrDefault));
}
public IRewriteResult GetRewriteQueryable(IMergeQueryCompilerContext mergeQueryCompilerContext, IParseResult parseResult)

View File

@ -0,0 +1,26 @@
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Enumerators;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables
{
internal class SingleOrDefaultStreamEnumerable<TEntity> : AbstractStreamEnumerable<TEntity>
{
public SingleOrDefaultStreamEnumerable(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
}
protected override IStreamMergeCombine GetStreamMergeCombine()
{
return DefaultStreamMergeCombine.Instance;
}
protected override IExecutor<IStreamMergeAsyncEnumerator<TEntity>> CreateExecutor0(bool async)
{
return new DefaultEnumeratorExecutor<TEntity>(GetStreamMergeContext(), GetStreamMergeCombine(), async);
}
}
}

View File

@ -47,7 +47,7 @@ namespace ShardingCore.Sharding.MergeEngines
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = asyncEnumeratorStreamMergeEngine.ToStreamList();
return GetFirstOrDefault(list);
return list.FirstOrDefault();
}
public async Task<TEntity> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
@ -57,18 +57,8 @@ namespace ShardingCore.Sharding.MergeEngines
var take = _streamMergeContext.GetTake();
var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken);
return GetFirstOrDefault(list);
}
private TEntity GetFirstOrDefault(List<TEntity> list)
{
if (list.IsEmpty())
{
return default;
}
return list.FirstOrDefault();
}
}
}

View File

@ -42,7 +42,7 @@ namespace ShardingCore.Sharding.MergeEngines
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = asyncEnumeratorStreamMergeEngine.ToStreamList();
return GetFirst(list);
return list.First();
}
public async Task<TEntity> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
@ -52,14 +52,6 @@ namespace ShardingCore.Sharding.MergeEngines
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var take = _streamMergeContext.GetTake();
var list =await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take,cancellationToken);
return GetFirst(list);
}
private TEntity GetFirst(List<TEntity> list)
{
if (list.IsEmpty())
throw new InvalidOperationException("Sequence contains no elements.");
return list.First();
}

View File

@ -0,0 +1,59 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Extensions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines
{
internal class SingleOrDefaultSkipAsyncInMemoryMergeEngine<TEntity> : IEnsureMergeResult<TEntity>
{
private readonly StreamMergeContext _streamMergeContext;
public SingleOrDefaultSkipAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext)
{
_streamMergeContext = streamMergeContext;
}
// protected override IExecutor<RouteQueryResult<TEntity>> CreateExecutor0(bool async)
// {
// return new FirstOrDefaultMethodExecutor<TEntity>(GetStreamMergeContext());
// }
//
// protected override TEntity DoMergeResult0(List<RouteQueryResult<TEntity>> resultList)
// {
// var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList();
//
// if (notNullResult.IsEmpty())
// return default;
//
// var streamMergeContext = GetStreamMergeContext();
// if (streamMergeContext.Orders.Any())
// return notNullResult.AsQueryable().OrderWithExpression(streamMergeContext.Orders, streamMergeContext.GetShardingComparer()).FirstOrDefault();
//
// return notNullResult.FirstOrDefault();
// }
public TEntity MergeResult()
{
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = asyncEnumeratorStreamMergeEngine.ToStreamList();
return list.SingleOrDefault();
}
public async Task<TEntity> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var take = _streamMergeContext.GetTake();
var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken);
return list.SingleOrDefault();
}
}
}

View File

@ -0,0 +1,58 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Extensions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines
{
internal class SingleSkipAsyncInMemoryMergeEngine<TEntity> : IEnsureMergeResult<TEntity>
{
private readonly StreamMergeContext _streamMergeContext;
public SingleSkipAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext)
{
_streamMergeContext = streamMergeContext;
}
// protected override IExecutor<RouteQueryResult<TEntity>> CreateExecutor0(bool async)
// {
// return new FirstOrDefaultMethodExecutor<TEntity>(GetStreamMergeContext());
// }
//
// protected override TEntity DoMergeResult0(List<RouteQueryResult<TEntity>> resultList)
// {
// var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList();
//
// if (notNullResult.IsEmpty())
// return default;
//
// var streamMergeContext = GetStreamMergeContext();
// if (streamMergeContext.Orders.Any())
// return notNullResult.AsQueryable().OrderWithExpression(streamMergeContext.Orders, streamMergeContext.GetShardingComparer()).FirstOrDefault();
//
// return notNullResult.FirstOrDefault();
// }
public TEntity MergeResult()
{
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = asyncEnumeratorStreamMergeEngine.ToStreamList();
return list.Single();
}
public async Task<TEntity> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var take = _streamMergeContext.GetTake();
var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken);
return list.Single();
}
}
}

View File

@ -83,9 +83,9 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
case nameof(Enumerable.LastOrDefault):
return EnsureResultTypeMergeExecute<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.Single):
return EnsureResultTypeMergeExecute<TResult>(typeof(SingleAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
return EnsureResultTypeMergeExecute<TResult>(typeof(SingleSkipAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.SingleOrDefault):
return EnsureResultTypeMergeExecute<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
return EnsureResultTypeMergeExecute<TResult>(typeof(SingleOrDefaultSkipAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.Count):
return EnsureResultTypeMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.LongCount):

View File

@ -62,10 +62,22 @@ namespace ShardingCore.Sharding.ShardingExecutors
_isCrossTable = shardingRouteResult.IsCrossTable;
_existCrossTableTails = shardingRouteResult.ExistCrossTableTails;
var queryMethodName = queryCompilerContext.GetQueryMethodName();
if (nameof(Enumerable.First) == queryMethodName || nameof(Enumerable.FirstOrDefault) == queryMethodName)
_fixedTake = GetMethodNameFixedTake(queryMethodName);
}
private int? GetMethodNameFixedTake(string queryMethodName)
{
switch (queryMethodName)
{
_fixedTake = 1;
case nameof(Enumerable.First):
case nameof(Enumerable.FirstOrDefault):
return 1;
case nameof(Enumerable.Single):
case nameof(Enumerable.SingleOrDefault):
return 2;
}
return null;
}
//
// private IEnumerable<TableRouteResult> GetTableRouteResults(IEnumerable<TableRouteResult> tableRouteResults)