修复last的bug

This commit is contained in:
xuejiaming 2022-07-05 14:09:17 +08:00
parent b613181c37
commit 02d15b6599
3 changed files with 53 additions and 5 deletions

View File

@ -20,7 +20,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
protected override IExecutor<IStreamMergeAsyncEnumerator<TEntity>> CreateExecutor0(bool async)
{
return new DefaultEnumeratorExecutor<TEntity>(GetStreamMergeContext(), GetStreamMergeCombine(), async);
return new LastOrDefaultEnumeratorExecutor<TEntity>(GetStreamMergeContext(), GetStreamMergeCombine(), async);
}
}
}

View File

@ -80,11 +80,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
return new SingleOrDefaultStreamEnumerable<TEntity>(_streamMergeContext);
case nameof(Enumerable.Last):
case nameof(Enumerable.LastOrDefault):
{
_streamMergeContext.ReSetSkip(0);
_streamMergeContext.ReverseOrder();
return new LastOrDefaultStreamEnumerable<TEntity>(_streamMergeContext);
}
}
//未开启系统分表或者本次查询涉及多张分表

View File

@ -0,0 +1,52 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators
{
internal class LastOrDefaultEnumeratorExecutor<TResult> : AbstractEnumeratorExecutor<TResult>
{
private readonly IStreamMergeCombine _streamMergeCombine;
private readonly bool _async;
public LastOrDefaultEnumeratorExecutor(StreamMergeContext streamMergeContext, IStreamMergeCombine streamMergeCombine, bool async) : base(streamMergeContext)
{
_streamMergeCombine = streamMergeCombine;
_async = async;
}
protected override IStreamMergeCombine GetStreamMergeCombine()
{
return _streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
if (GetStreamMergeContext().IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), streamsAsyncEnumerators, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目
return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
}
protected override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>> ExecuteUnitAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var streamMergeContext = GetStreamMergeContext();
streamMergeContext.ReverseOrder();
streamMergeContext.ReSetSkip(0);
var connectionMode = streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var shardingDbContext = streamMergeContext.CreateDbContext(sqlExecutorUnit.RouteUnit, connectionMode);
var newQueryable = (IQueryable<TResult>)streamMergeContext.GetReWriteQueryable().RemoveAnyOrderBy().OrderWithExpression(streamMergeContext.Orders)
.ReplaceDbContextQueryable(shardingDbContext);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, _async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>(shardingDbContext, streamMergeAsyncEnumerator);
}
}
}