完成skip+last的蜕变

This commit is contained in:
xuejiaming 2022-07-05 14:49:18 +08:00
parent 02d15b6599
commit eb77446d52
8 changed files with 55 additions and 14 deletions

View File

@ -20,6 +20,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
protected override IExecutor<IStreamMergeAsyncEnumerator<TEntity>> CreateExecutor0(bool async)
{
GetStreamMergeContext().ReSetTake(1);
return new DefaultEnumeratorExecutor<TEntity>(GetStreamMergeContext(), GetStreamMergeCombine(), async);
}
}

View File

@ -1,3 +1,5 @@
using System.Linq;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
@ -20,7 +22,15 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
protected override IExecutor<IStreamMergeAsyncEnumerator<TEntity>> CreateExecutor0(bool async)
{
return new LastOrDefaultEnumeratorExecutor<TEntity>(GetStreamMergeContext(), GetStreamMergeCombine(), async);
var streamMergeContext = GetStreamMergeContext();
var skip = streamMergeContext.Skip;
streamMergeContext.ReverseOrder();
streamMergeContext.ReSetSkip(0);
var reTake = skip.GetValueOrDefault() + 1;
streamMergeContext.ReSetTake(reTake);
var newQueryable = (IQueryable<TEntity>)streamMergeContext.GetReWriteQueryable().RemoveSkip().RemoveTake().RemoveAnyOrderBy().OrderWithExpression(streamMergeContext.Orders).ReTake(reTake);
return new LastOrDefaultEnumeratorExecutor<TEntity>(GetStreamMergeContext(), GetStreamMergeCombine(), newQueryable, async);
}
}
}

View File

@ -20,6 +20,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
protected override IExecutor<IStreamMergeAsyncEnumerator<TEntity>> CreateExecutor0(bool async)
{
GetStreamMergeContext().ReSetTake(1);
return new DefaultEnumeratorExecutor<TEntity>(GetStreamMergeContext(), GetStreamMergeCombine(), async);
}
}

View File

@ -13,11 +13,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators
internal class LastOrDefaultEnumeratorExecutor<TResult> : AbstractEnumeratorExecutor<TResult>
{
private readonly IStreamMergeCombine _streamMergeCombine;
private readonly IQueryable<TResult> _queryable;
private readonly bool _async;
public LastOrDefaultEnumeratorExecutor(StreamMergeContext streamMergeContext, IStreamMergeCombine streamMergeCombine, bool async) : base(streamMergeContext)
public LastOrDefaultEnumeratorExecutor(StreamMergeContext streamMergeContext, IStreamMergeCombine streamMergeCombine,IQueryable<TResult> queryable, bool async) : base(streamMergeContext)
{
_streamMergeCombine = streamMergeCombine;
_queryable = queryable;
_async = async;
}
@ -37,13 +39,10 @@ internal class LastOrDefaultEnumeratorExecutor<TResult> : AbstractEnumeratorExec
protected override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>> ExecuteUnitAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var streamMergeContext = GetStreamMergeContext();
streamMergeContext.ReverseOrder();
streamMergeContext.ReSetSkip(0);
var connectionMode = streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var shardingDbContext = streamMergeContext.CreateDbContext(sqlExecutorUnit.RouteUnit, connectionMode);
var newQueryable = (IQueryable<TResult>)streamMergeContext.GetReWriteQueryable().RemoveAnyOrderBy().OrderWithExpression(streamMergeContext.Orders)
.ReplaceDbContextQueryable(shardingDbContext);
var newQueryable = (IQueryable<TResult>)_queryable.ReplaceDbContextQueryable(shardingDbContext);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, _async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>(shardingDbContext, streamMergeAsyncEnumerator);

View File

@ -44,23 +44,29 @@ namespace ShardingCore.Sharding.MergeEngines
// }
public TEntity MergeResult()
{
var skip = _streamMergeContext.Skip;
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = asyncEnumeratorStreamMergeEngine.ToFixedElementStreamList(1);
return list.FirstOrDefault();
var list = asyncEnumeratorStreamMergeEngine.ToFixedElementStreamList(1);
if (list.VirtualElementCount >= (skip.GetValueOrDefault() + 1))
return list.FirstOrDefault();
return default;
}
public async Task<TEntity> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var skip = _streamMergeContext.Skip;
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1, cancellationToken);
return list.FirstOrDefault();
if (list.VirtualElementCount >= (skip.GetValueOrDefault() + 1))
return list.FirstOrDefault();
return default;
}
// if (notNullResult.IsEmpty())
// throw new InvalidOperationException("Sequence contains no elements.");
// var streamMergeContext = GetStreamMergeContext();

View File

@ -44,20 +44,28 @@ namespace ShardingCore.Sharding.MergeEngines
// }
public TEntity MergeResult()
{
var skip = _streamMergeContext.Skip;
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = asyncEnumeratorStreamMergeEngine.ToFixedElementStreamList(1);
return list.First();
if (list.VirtualElementCount >= (skip.GetValueOrDefault() + 1))
return list.First();
throw new InvalidOperationException("Sequence contains no elements.");
}
public async Task<TEntity> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var skip = _streamMergeContext.Skip;
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1, cancellationToken);
return list.First();
if (list.VirtualElementCount >= (skip.GetValueOrDefault() + 1))
return list.First();
throw new InvalidOperationException("Sequence contains no elements.");
}

View File

@ -77,7 +77,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
return 2;
case nameof(Enumerable.Last):
case nameof(Enumerable.LastOrDefault):
return 2;
return 1;
}
return null;

View File

@ -1127,6 +1127,22 @@ namespace ShardingCore.Test
var x = await _virtualDbContext.Set<Order>().OrderBy(o => o.Money).LastOrDefaultAsync();
Assert.NotNull(x);
Assert.Equal(319, x.Money);
var x2 = await _virtualDbContext.Set<Order>().Skip(10).OrderBy(o => o.Money).LastOrDefaultAsync();
Assert.NotNull(x2);
Assert.Equal(319, x2.Money);
var x3 = await _virtualDbContext.Set<Order>().Skip(9999999).OrderBy(o => o.Money).LastOrDefaultAsync();
Assert.Null(x3);
try
{
var x4 = await _virtualDbContext.Set<Order>().Skip(9999999).OrderBy(o => o.Money).LastAsync();
}
catch (Exception e)
{
Assert.True($"{e}".Contains("Sequence contains no elements."));
}
var x1 = await _virtualDbContext.Set<Order>().OrderBy(o => o.Money).LastAsync();
Assert.Equal(x, x1);
var y = await _virtualDbContext.Set<Order>().OrderBy(o => o.Money).FirstOrDefaultAsync();