优化first代码

This commit is contained in:
xuejiaming 2022-07-26 13:34:52 +08:00
parent 4ac8601583
commit 24a6938090
9 changed files with 118 additions and 86 deletions

View File

@ -18,13 +18,15 @@ namespace ShardingCore.Core.Internal
public int VirtualElementCount => _virtualElementCount;
public int ReallyElementCount => _list.Count;
public void Add(TEntity element)
public bool Add(TEntity element)
{
_virtualElementCount++;
if (VirtualElementCount <= _capacity)
{
_list.Add(element);
return true;
}
return false;
}
public TEntity FirstOrDefault()

View File

@ -26,19 +26,23 @@ namespace ShardingCore.Extensions
{
return source.ToList();
}
public static FixedElementCollection<TEntity> ToFixedElementStreamList<TEntity>(this IEnumerable<TEntity> source,int capacity)
public static FixedElementCollection<TEntity> ToFixedElementStreamList<TEntity>(this IEnumerable<TEntity> source,int capacity,int maxVirtualElementCount)
{
var fixedElementCollection = new FixedElementCollection<TEntity>(capacity);
var enumerator = source.GetEnumerator();
using var enumerator = source.GetEnumerator();
while (enumerator.MoveNext())
{
fixedElementCollection.Add(enumerator.Current);
if (fixedElementCollection.VirtualElementCount >= maxVirtualElementCount)
{
break;
}
}
return fixedElementCollection;
}
public static async Task<FixedElementCollection<TEntity>> ToFixedElementStreamListAsync<TEntity>(this IAsyncEnumerable<TEntity> source,int capacity,CancellationToken cancellationToken=default)
public static async Task<FixedElementCollection<TEntity>> ToFixedElementStreamListAsync<TEntity>(this IAsyncEnumerable<TEntity> source,int capacity,int maxVirtualElementCount,CancellationToken cancellationToken=default)
{
var fixedElementCollection = new FixedElementCollection<TEntity>(capacity);
#if EFCORE2
@ -46,12 +50,20 @@ namespace ShardingCore.Extensions
while (await asyncEnumerator.MoveNext(cancellationToken))
{
fixedElementCollection.Add(asyncEnumerator.Current);
if (fixedElementCollection.VirtualElementCount >= maxVirtualElementCount)
{
break;
}
}
#endif
#if !EFCORE2
await foreach (var element in source.WithCancellation(cancellationToken))
{
fixedElementCollection.Add(element);
if (fixedElementCollection.VirtualElementCount >= maxVirtualElementCount)
{
break;
}
}
#endif
return fixedElementCollection;

View File

@ -8,86 +8,72 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
{
internal class OneAtMostElementStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
{
private List<T>.Enumerator _enumerator;
private bool skip;
private int _moveIndex = 0;
private T _constantElement;
public OneAtMostElementStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<T> streamMergeAsyncEnumerator)
{
var list = new List<T>();
if (streamMergeAsyncEnumerator.HasElement())
{
list.Add(streamMergeAsyncEnumerator.ReallyCurrent);
}
_enumerator = list.GetEnumerator();
_enumerator.MoveNext();
skip = true;
_constantElement=streamMergeAsyncEnumerator.ReallyCurrent;
}
private bool MoveNext0()
{
if (_moveIndex >= 1)
{
return false;
}
_moveIndex++;
return HasElement();
}
#if !EFCORE2&&!EFCORE3&&!EFCORE5
public ValueTask DisposeAsync()
{
_enumerator.Dispose();
return ValueTask.CompletedTask;
}
public ValueTask<bool> MoveNextAsync()
{
var moveNext = _enumerator.MoveNext();
var moveNext = MoveNext0();
return ValueTask.FromResult<bool>(moveNext);
}
public void Dispose()
{
_enumerator.Dispose();
}
#endif
#if EFCORE3 || EFCORE5
public ValueTask DisposeAsync()
{
_enumerator.Dispose();
return new ValueTask();
}
public ValueTask<bool> MoveNextAsync()
{
var moveNext = _enumerator.MoveNext();
var moveNext = MoveNext0();
return new ValueTask<bool>(moveNext);
}
public void Dispose()
{
_enumerator.Dispose();
}
#endif
public bool MoveNext()
{
if (skip)
{
skip = false;
return null != _enumerator.Current;
}
var moveNext = _enumerator.MoveNext();
var moveNext = MoveNext0();
return moveNext;
}
public bool SkipFirst()
{
if (skip)
{
skip = false;
return true;
}
return false;
}
public bool HasElement()
{
return null != _enumerator.Current;
return null != _constantElement;
}
@ -102,29 +88,23 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
public T GetCurrent()
{
if (skip)
if (_moveIndex==0)
return default;
return _enumerator.Current;
return _constantElement;
}
public T GetReallyCurrent()
{
return _enumerator.Current;
return _constantElement;
}
#if EFCORE2
public void Dispose()
{
_enumerator.Dispose();
}
public Task<bool> MoveNext(CancellationToken cancellationToken = new CancellationToken())
{
if (skip)
{
skip = false;
return Task.FromResult(null != _enumerator.Current);
}
var moveNext = _enumerator.MoveNext();
var moveNext = MoveNext0();
return Task.FromResult(moveNext);
}

View File

@ -90,7 +90,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Abstractions
if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
{
MergeParallelExecuteResult(result, routeQueryResults.Select(o => o.MergeResult), async);
var dbContexts = routeQueryResults.Select(o => o.DbContext);
var dbContexts = routeQueryResults.Where(o=>o!=null).Select(o => o.DbContext).ToArray();
foreach (var dbContext in dbContexts)
{
await streamMergeContext.DbContextDisposeAsync(dbContext);

View File

@ -30,50 +30,59 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions
/// Email: 326308290@qq.com
internal abstract class AbstractEnumeratorExecutor<TResult> : AbstractExecutor<IStreamMergeAsyncEnumerator<TResult>>
{
protected AbstractEnumeratorExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
}
protected abstract IStreamMergeCombine GetStreamMergeCombine();
public override ICircuitBreaker CreateCircuitBreaker()
{
return new EnumeratorCircuitBreaker(GetStreamMergeContext());
}
protected override void MergeParallelExecuteResult(LinkedList<IStreamMergeAsyncEnumerator<TResult>> previewResults, IEnumerable<IStreamMergeAsyncEnumerator<TResult>> parallelResults, bool async)
protected override void MergeParallelExecuteResult(
LinkedList<IStreamMergeAsyncEnumerator<TResult>> previewResults,
IEnumerable<IStreamMergeAsyncEnumerator<TResult>> parallelResults, bool async)
{
var previewResultsCount = previewResults.Count;
if (previewResultsCount > 1)
{
throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} {nameof(previewResults)} has more than one element in container");
throw new ShardingCoreInvalidOperationException(
$"{typeof(TResult)} {nameof(previewResults)} has more than one element in container");
}
var parallelCount = parallelResults.Count();
if (parallelCount == 0)
return;
//聚合
if (previewResults is LinkedList<IStreamMergeAsyncEnumerator<TResult>> previewInMemoryStreamEnumeratorResults && parallelResults is IEnumerable<IStreamMergeAsyncEnumerator<TResult>> parallelStreamEnumeratorResults)
if (previewResults is LinkedList<IStreamMergeAsyncEnumerator<TResult>>
previewInMemoryStreamEnumeratorResults &&
parallelResults is IEnumerable<IStreamMergeAsyncEnumerator<TResult>> parallelStreamEnumeratorResults)
{
var mergeAsyncEnumerators = new LinkedList<IStreamMergeAsyncEnumerator<TResult>>();
if (previewResultsCount == 1)
{
mergeAsyncEnumerators.AddLast(previewInMemoryStreamEnumeratorResults.First());
}
foreach (var parallelStreamEnumeratorResult in parallelStreamEnumeratorResults)
{
mergeAsyncEnumerators.AddLast(parallelStreamEnumeratorResult);
}
var combineStreamMergeAsyncEnumerator = CombineInMemoryStreamMergeAsyncEnumerator(mergeAsyncEnumerators.ToArray());
var inMemoryStreamMergeAsyncEnumerator = new InMemoryStreamMergeAsyncEnumerator<TResult>(combineStreamMergeAsyncEnumerator, async);
var combineStreamMergeAsyncEnumerator =
CombineInMemoryStreamMergeAsyncEnumerator(mergeAsyncEnumerators.ToArray());
var inMemoryStreamMergeAsyncEnumerator =
new InMemoryStreamMergeAsyncEnumerator<TResult>(combineStreamMergeAsyncEnumerator, async);
previewInMemoryStreamEnumeratorResults.Clear();
previewInMemoryStreamEnumeratorResults.AddLast(inMemoryStreamMergeAsyncEnumerator);
//合并
return;
}
throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} is not {typeof(IStreamMergeAsyncEnumerator<TResult>)}");
throw new ShardingCoreInvalidOperationException(
$"{typeof(TResult)} is not {typeof(IStreamMergeAsyncEnumerator<TResult>)}");
}
/// <summary>
@ -81,9 +90,11 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions
/// </summary>
/// <param name="streamsAsyncEnumerators"></param>
/// <returns></returns>
public virtual IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
public virtual IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return GetStreamMergeCombine().StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
return GetStreamMergeCombine()
.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
public virtual IStreamMergeAsyncEnumerator<TResult> CombineInMemoryStreamMergeAsyncEnumerator(
@ -99,7 +110,8 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions
/// <param name="async"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<IStreamMergeAsyncEnumerator<TResult>> AsyncParallelEnumerator(IQueryable<TResult> queryable, bool async,
public async Task<IStreamMergeAsyncEnumerator<TResult>> AsyncParallelEnumerator(IQueryable<TResult> queryable,
bool async,
CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
@ -114,6 +126,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions
return new StreamMergeAsyncEnumerator<TResult>(enumerator);
}
}
/// <summary>
/// 获取异步迭代器
/// </summary>
@ -127,11 +140,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions
return enumator;
#endif
#if EFCORE2
var enumator = new EFCore2TryCurrentAsyncEnumerator<TResult>(newQueryable.AsAsyncEnumerable().GetEnumerator());
var enumator =
new EFCore2TryCurrentAsyncEnumerator<TResult>(newQueryable.AsAsyncEnumerable().GetEnumerator());
await enumator.MoveNext();
return enumator;
#endif
}
/// <summary>
/// 获取同步迭代器
/// </summary>
@ -144,7 +159,8 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions
return enumator;
}
protected override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>> ExecuteUnitAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
protected override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>> ExecuteUnitAsync(
SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var shardingMergeResult = await ExecuteUnitAsync0(sqlExecutorUnit, cancellationToken);
var dbContext = shardingMergeResult.DbContext;
@ -153,31 +169,32 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions
//first last 等操作没有skip就可以回收如果没有元素就可以回收
//single如果没有元素就可以回收
//enumerable如果没有元素就可以回收
if (sqlExecutorUnit.ConnectionMode != ConnectionModeEnum.CONNECTION_STRICTLY)
var streamMergeContext = GetStreamMergeContext();
if (DisposeInExecuteUnit(streamMergeContext, sqlExecutorUnit.ConnectionMode, streamMergeAsyncEnumerator))
{
var streamMergeContext = GetStreamMergeContext();
if (DisposeInExecuteUnit(streamMergeContext,streamMergeAsyncEnumerator))
{
var disConnectionStreamMergeAsyncEnumerator = new OneAtMostElementStreamMergeAsyncEnumerator<TResult>(streamMergeAsyncEnumerator);
await streamMergeContext.DbContextDisposeAsync(dbContext);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>(null,
disConnectionStreamMergeAsyncEnumerator);
}
var disConnectionStreamMergeAsyncEnumerator =
new OneAtMostElementStreamMergeAsyncEnumerator<TResult>(streamMergeAsyncEnumerator);
await streamMergeContext.DbContextDisposeAsync(dbContext);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>(null,
disConnectionStreamMergeAsyncEnumerator);
}
return shardingMergeResult;
}
/// <summary>
/// 是否需要在执行单元中直接回收掉链接有助于提高吞吐量
/// </summary>
/// <param name="streamMergeContext"></param>
/// <param name="connectionMode"></param>
/// <param name="streamMergeAsyncEnumerator"></param>
/// <returns></returns>
private bool DisposeInExecuteUnit(StreamMergeContext streamMergeContext,IStreamMergeAsyncEnumerator<TResult> streamMergeAsyncEnumerator)
private bool DisposeInExecuteUnit(StreamMergeContext streamMergeContext, ConnectionModeEnum connectionMode,
IStreamMergeAsyncEnumerator<TResult> streamMergeAsyncEnumerator)
{
var queryMethodName = streamMergeContext.MergeQueryCompilerContext.GetQueryMethodName();
var hasElement = streamMergeAsyncEnumerator.HasElement();
var notConnectionStrictly = connectionMode != ConnectionModeEnum.CONNECTION_STRICTLY;
switch (queryMethodName)
{
case nameof(Queryable.First):
@ -186,19 +203,32 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions
case nameof(Queryable.LastOrDefault):
{
var skip = streamMergeContext.GetSkip();
return skip is null or < 0||!hasElement;
}
return (skip is null or < 0 || !hasElement) && notConnectionStrictly;
}
case nameof(Queryable.Single):
case nameof(Queryable.SingleOrDefault):
case QueryCompilerContext.ENUMERABLE:
{
return !hasElement;
return !hasElement && notConnectionStrictly;
}
// case nameof(Queryable.Count):
// case nameof(Queryable.LongCount):
// case nameof(Queryable.Any):
// case nameof(Queryable.All):
// case nameof(Queryable.Sum):
// case nameof(Queryable.Max):
// case nameof(Queryable.Min):
// case nameof(Queryable.Average):
// case nameof(Queryable.Contains):
// {
// return true;
// }
}
return false;
}
protected abstract Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>> ExecuteUnitAsync0(
SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken());
}
}
}

View File

@ -26,16 +26,18 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions
protected override async Task<ShardingMergeResult<RouteQueryResult<TResult>>> ExecuteUnitAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var streamMergeContext = GetStreamMergeContext();
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var shardingDbContext = GetStreamMergeContext().CreateDbContext(sqlExecutorUnit.RouteUnit);
var shardingDbContext = streamMergeContext.CreateDbContext(sqlExecutorUnit.RouteUnit);
var newQueryable = GetStreamMergeContext().GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var queryResult = await EFCoreQueryAsync(newQueryable, cancellationToken);
var routeQueryResult = new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
return new ShardingMergeResult<RouteQueryResult<TResult>>(shardingDbContext, routeQueryResult);
await streamMergeContext.DbContextDisposeAsync(shardingDbContext);
return new ShardingMergeResult<RouteQueryResult<TResult>>(null, routeQueryResult);
}
protected abstract Task<TResult> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken());

View File

@ -48,8 +48,9 @@ namespace ShardingCore.Sharding.MergeEngines
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = asyncEnumeratorStreamMergeEngine.ToFixedElementStreamList(1);
if (list.VirtualElementCount >= (skip.GetValueOrDefault() + 1))
var maxVirtualElementCount = skip.GetValueOrDefault() + 1;
var list = asyncEnumeratorStreamMergeEngine.ToFixedElementStreamList(1,maxVirtualElementCount);
if (list.VirtualElementCount >= maxVirtualElementCount)
return list.FirstOrDefault();
return default;
}
@ -60,8 +61,9 @@ namespace ShardingCore.Sharding.MergeEngines
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1, cancellationToken);
if (list.VirtualElementCount >= (skip.GetValueOrDefault() + 1))
var maxVirtualElementCount = skip.GetValueOrDefault() + 1;
var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1,maxVirtualElementCount, cancellationToken);
if (list.VirtualElementCount >= maxVirtualElementCount)
return list.FirstOrDefault();
return default;
}

View File

@ -48,9 +48,10 @@ namespace ShardingCore.Sharding.MergeEngines
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var list = asyncEnumeratorStreamMergeEngine.ToFixedElementStreamList(1);
var maxVirtualElementCount = skip.GetValueOrDefault() + 1;
var list = asyncEnumeratorStreamMergeEngine.ToFixedElementStreamList(1,maxVirtualElementCount);
if (list.VirtualElementCount >= (skip.GetValueOrDefault() + 1))
if (list.VirtualElementCount >= maxVirtualElementCount)
return list.First();
throw new InvalidOperationException("Sequence contains no elements.");
}
@ -60,10 +61,10 @@ namespace ShardingCore.Sharding.MergeEngines
var skip = _streamMergeContext.Skip;
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var maxVirtualElementCount = skip.GetValueOrDefault() + 1;
var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1,maxVirtualElementCount, cancellationToken);
var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1, cancellationToken);
if (list.VirtualElementCount >= (skip.GetValueOrDefault() + 1))
if (list.VirtualElementCount >= maxVirtualElementCount)
return list.First();
throw new InvalidOperationException("Sequence contains no elements.");
}

View File

@ -27,6 +27,9 @@
<Compile Include="..\..\src\ShardingCore\**\*.cs" />
<Compile Remove="..\..\src\ShardingCore\obj\**" />
<Compile Remove="..\..\src\ShardingCore\bin\**" />
<Compile Update="..\..\src\ShardingCore\Sharding\Enumerators\StreamMergeAsync\ConstantElementStreamMergeAsyncEnumerator.cs">
<Link>Sharding\Enumerators\StreamMergeAsync\ConstantElementStreamMergeAsyncEnumerator.cs</Link>
</Compile>
</ItemGroup>