优化单元测试和代码

This commit is contained in:
xuejmnet 2021-01-30 21:59:03 +08:00
parent 2cdccdcce7
commit 071f0356d6
19 changed files with 323 additions and 683 deletions

View File

@ -55,7 +55,7 @@ namespace ShardingCore.Core
/// 异步获取列表
/// </summary>
/// <returns></returns>
Task<List<T>> ToListAsync();
Task<List<T>> ToListAsync(int capacity=20);
/// <summary>
@ -111,6 +111,21 @@ namespace ShardingCore.Core
/// </summary>
/// <returns></returns>
Task<float> FloatSumAsync();
/// <summary>
/// 平均数
/// </summary>
/// <returns></returns>
Task<decimal> DecimalAverageAsync();
/// <summary>
/// 平均数
/// </summary>
/// <returns></returns>
Task<double> DoubleAverageAsync();
/// <summary>
/// 平均数
/// </summary>
/// <returns></returns>
Task<float> FloatAverageAsync();
}
}

View File

@ -13,7 +13,7 @@ namespace ShardingCore.Core.Internal.PriorityQueues {
public PriorityQueue()
: this(defaultCapacity) {
}
public PriorityQueue(int initCapacity,bool ascending = false,IComparer<T> comparer=null) {
public PriorityQueue(int initCapacity,bool ascending = true,IComparer<T> comparer=null) {
buffer = new T[initCapacity];
heapLength = 0;
descending = ascending;

View File

@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.PriorityQueues;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
@ -16,23 +17,23 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
internal class MultiOrderStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IEnumerable<IStreamMergeAsyncEnumerator<T>> _sources;
private readonly IEnumerable<IStreamMergeAsyncEnumerator<T>> _enumerators;
private readonly PriorityQueue<IOrderStreamMergeAsyncEnumerator<T>> _queue;
private IStreamMergeAsyncEnumerator<T> _currentEnumerator;
private bool skipFirst;
public MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeAsyncEnumerator<T>> sources)
public MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeAsyncEnumerator<T>> enumerators)
{
_mergeContext = mergeContext;
_sources = sources;
_queue = new PriorityQueue<IOrderStreamMergeAsyncEnumerator<T>>(sources.Count(),true);
_enumerators = enumerators;
_queue = new PriorityQueue<IOrderStreamMergeAsyncEnumerator<T>>(enumerators.Count());
skipFirst = true;
SetOrderEnumerator();
}
private void SetOrderEnumerator()
{
foreach (var source in _sources)
foreach (var source in _enumerators)
{
var orderStreamEnumerator = new OrderStreamMergeAsyncEnumerator<T>(_mergeContext, source);
if (orderStreamEnumerator.HasElement())
@ -42,10 +43,14 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
}
}
_currentEnumerator = _queue.IsEmpty() ? _sources.FirstOrDefault() : _queue.Peek();
_currentEnumerator = _queue.IsEmpty() ? _enumerators.FirstOrDefault() : _queue.Peek();
}
#if EFCORE2
public async Task<bool> MoveNext(CancellationToken cancellationToken)
#endif
#if !EFCORE2
public async ValueTask<bool> MoveNextAsync()
#endif
{
if (_queue.IsEmpty())
return false;
@ -56,7 +61,12 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
}
var first = _queue.Poll();
#if EFCORE2
if (await first.MoveNext(cancellationToken))
#endif
#if !EFCORE2
if (await first.MoveNextAsync())
#endif
{
_queue.Offer(first);
}
@ -88,13 +98,26 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
public T ReallyCurrent => _queue.IsEmpty()?default(T):_queue.Peek().ReallyCurrent;
#if !EFCORE2
public async ValueTask DisposeAsync()
{
foreach (var source in _sources)
foreach (var enumerator in _enumerators)
{
await source.DisposeAsync();
await enumerator.DisposeAsync();
}
}
#endif
#if EFCORE2
public void Dispose()
{
foreach (var enumerator in _enumerators)
{
enumerator.Dispose();
}
}
#endif
public T Current => skipFirst ? default : _currentEnumerator.Current;
}

View File

@ -1,72 +0,0 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
{
/*
* @Author: xjm
* @Description:
* @Date: Friday, 29 January 2021 20:55:42
* @Email: 326308290@qq.com
*/
internal class NoPaginationStreamMergeEngine<T>:IStreamMergeAsyncEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IStreamMergeAsyncEnumerator<T> _enumerator;
public NoPaginationStreamMergeEngine(StreamMergeContext<T> mergeContext,IEnumerable<IStreamMergeAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_enumerator = new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext,sources);;
}
public async ValueTask<bool> MoveNextAsync()
{
//如果合并数据的时候不需要跳过也没有take多少那么就是直接next
var skip = _mergeContext.Skip;
var take = _mergeContext.Take;
var realSkip = 0;
var realTake = 0;
var has = await _enumerator.MoveNextAsync();
if (has)
{
//获取真实的需要跳过的条数
if (skip.HasValue)
{
if (realSkip < skip)
{
realSkip++;
}
}
if (take.HasValue)
{
realTake++;
if (realTake <= take.Value)
return false;
}
}
return has;
}
public T Current => _enumerator.Current;
public bool SkipFirst()
{
return _enumerator.SkipFirst();
}
public bool HasElement()
{
return _enumerator.HasElement();
}
public T ReallyCurrent => _enumerator.ReallyCurrent;
public async ValueTask DisposeAsync()
{
await _enumerator.DisposeAsync();
}
}
}

View File

@ -0,0 +1,103 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
{
/*
* @Author: xjm
* @Description:
* @Date: Friday, 29 January 2021 20:55:42
* @Email: 326308290@qq.com
*/
internal class NoPaginationStreamMergeEnumerator<T>:IStreamMergeAsyncEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IStreamMergeAsyncEnumerator<T> _enumerator;
private readonly int? _skip;
private readonly int? _take;
private int realSkip=0;
private int realTake = 0;
public NoPaginationStreamMergeEnumerator(StreamMergeContext<T> mergeContext,IEnumerable<IStreamMergeAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_skip = mergeContext.Skip;
_take = mergeContext.Take;
_enumerator = new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext,sources);;
}
#if !EFCORE2
public async ValueTask<bool> MoveNextAsync()
#endif
#if EFCORE2
public async Task<bool> MoveNext(CancellationToken cancellationToken)
#endif
{
//如果合并数据的时候不需要跳过也没有take多少那么就是直接next
while (_skip.GetValueOrDefault() > this.realSkip)
{
#if !EFCORE2
var has = await _enumerator.MoveNextAsync();
#endif
#if EFCORE2
var has = await _enumerator.MoveNext(cancellationToken);
#endif
realSkip++;
if (!has)
return false;
}
#if !EFCORE2
var next = await _enumerator.MoveNextAsync();
#endif
#if EFCORE2
var next = await _enumerator.MoveNext(cancellationToken);
#endif
if (next)
{
if (_take.HasValue)
{
realTake++;
if (realTake >= _take.Value)
return false;
}
}
return next;
}
public T Current => _enumerator.Current;
public bool SkipFirst()
{
return _enumerator.SkipFirst();
}
public bool HasElement()
{
return _enumerator.HasElement();
}
public T ReallyCurrent => _enumerator.ReallyCurrent;
#if !EFCORE2
public async ValueTask DisposeAsync()
{
await _enumerator.DisposeAsync();
}
#endif
#if EFCORE2
public void Dispose()
{
_enumerator.Dispose();
}
#endif
}
}

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
using ShardingCore.Extensions;
@ -20,13 +21,13 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
/// </summary>
private readonly StreamMergeContext<T> _mergeContext;
private readonly IStreamMergeAsyncEnumerator<T> _source;
private readonly IStreamMergeAsyncEnumerator<T> _enumerator;
private List<IComparable> _orderValues;
public OrderStreamMergeAsyncEnumerator(StreamMergeContext<T> mergeContext, IStreamMergeAsyncEnumerator<T> source)
public OrderStreamMergeAsyncEnumerator(StreamMergeContext<T> mergeContext, IStreamMergeAsyncEnumerator<T> enumerator)
{
_mergeContext = mergeContext;
_source = source;
_enumerator = enumerator;
SetOrderValues();
}
@ -34,26 +35,40 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
{
_orderValues = HasElement() ? GetCurrentOrderValues() : new List<IComparable>(0);
}
#if !EFCORE2
public async ValueTask<bool> MoveNextAsync()
{
var has = await _source.MoveNextAsync();
var has = await _enumerator.MoveNextAsync();
SetOrderValues();
return has;
}
#endif
public T Current =>_source.Current;
#if EFCORE2
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
var has = await _enumerator.MoveNext(cancellationToken);
SetOrderValues();
return has;
}
#endif
public T Current =>_enumerator.Current;
public bool SkipFirst()
{
return _source.SkipFirst();
return _enumerator.SkipFirst();
}
public bool HasElement()
{
return _source.HasElement();
return _enumerator.HasElement();
}
public T ReallyCurrent => _source.ReallyCurrent;
public T ReallyCurrent => _enumerator.ReallyCurrent;
private List<IComparable> GetCurrentOrderValues()
{
@ -62,7 +77,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
var list = new List<IComparable>(_mergeContext.Orders.Count());
foreach (var order in _mergeContext.Orders)
{
var value = _source.ReallyCurrent.GetValueByExpression(order.PropertyExpression);
var value = _enumerator.ReallyCurrent.GetValueByExpression(order.PropertyExpression);
if (value is IComparable comparable)
list.Add(comparable);
else
@ -89,10 +104,19 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
{
return _orderValues ?? new List<IComparable>(0);
}
#if !EFCORE2
public ValueTask DisposeAsync()
public async ValueTask DisposeAsync()
{
return _source.DisposeAsync();
}
await _enumerator.DisposeAsync();
}
#endif
#if EFCORE2
public void Dispose()
{
_enumerator.Dispose();
}
#endif
}
}

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
@ -24,6 +25,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
}
#if !EFCORE2
public async ValueTask<bool> MoveNextAsync()
{
if (skip)
@ -34,6 +36,25 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
return await _source.MoveNextAsync();
}
#endif
#if EFCORE2
public async Task<bool> MoveNext()
{
if (skip)
{
skip = false;
return null!=_source.Current;
}
return await _source.MoveNext();
}
#endif
public Task<bool> MoveNext(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public T Current => skip?default:_source.Current;
public bool SkipFirst()
@ -52,10 +73,19 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
}
public T ReallyCurrent => _source.Current;
#if !EFCORE2
public async ValueTask DisposeAsync()
{
await _source.DisposeAsync();
}
#endif
#if EFCORE2
public void Dispose()
{
_source.Dispose();
}
#endif
}
}

View File

@ -4,7 +4,6 @@ using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge.ListMerge;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Extensions;
@ -16,14 +15,18 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
* @Date: Thursday, 28 January 2021 17:04:29
* @Email: 326308290@qq.com
*/
internal class GenericMergeEngine<T>
internal class GenericInMemoryMergeEngine<T>
{
private readonly StreamMergeContext<T> _mergeContext;
public GenericMergeEngine(StreamMergeContext<T> mergeContext)
private GenericInMemoryMergeEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
}
public static GenericInMemoryMergeEngine<T> Create<T>(StreamMergeContext<T> mergeContext)
{
return new GenericInMemoryMergeEngine<T>(mergeContext);
}
private async Task<TResult> EFCoreExecute<TResult>(IQueryable<T> newQueryable,RouteResult routeResult,Func<IQueryable, Task<TResult>> efQuery)
{
@ -33,7 +36,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
return await efQuery(newQueryable);
}
}
public async Task<List<TResult>> Execute<TResult>(Func<IQueryable, Task<TResult>> efQuery)
public async Task<List<TResult>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery)
{
//从各个分表获取数据
List<DbContext> parallelDbContexts = new List<DbContext>(_mergeContext.RouteResults.Count());

View File

@ -6,10 +6,13 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
using ShardingCore.Core.Internal.StreamMerge.Enumerators;
using ShardingCore.Core.Internal.StreamMerge.ListMerge;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Extensions;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
{
/*
@ -33,6 +36,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
{
return new GenericStreamMergeEngine<T>(mergeContext);
}
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable, RouteResult routeResult)
{
using (var scope = _mergeContext.CreateScope())
@ -50,7 +54,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
}
}
public async Task<IEnumerable<IStreamMergeAsyncEnumerator<T>>> GetStreamEnumerator()
public async Task<IStreamMergeAsyncEnumerator<T>> GetStreamEnumerator()
{
var enumeratorTasks = _mergeContext.RouteResults.Select(routeResult =>
{
@ -64,7 +68,10 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
return new StreamMergeAsyncEnumerator<T>(asyncEnumerator);
});
}).ToArray();
return (await Task.WhenAll(enumeratorTasks)).ToList();
var streamEnumerators = await Task.WhenAll(enumeratorTasks);
if (_mergeContext.Skip.HasValue || _mergeContext.Take.HasValue)
return new NoPaginationStreamMergeEnumerator<T>(_mergeContext,streamEnumerators );
return new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
}

View File

@ -31,13 +31,13 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
public async Task<List<T>> ToListAsync(int capacity=20)
{
var enumerator = new NoPaginationStreamMergeEngine<T>(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
var enumerator = await _streamMergeEngine.GetStreamEnumerator();
var list = new List<T>(capacity);
#if !EFCORE2
while (await enumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNextAsync())
while (await enumerator.MoveNext())
#endif
{
list.Add(enumerator.Current);
@ -45,60 +45,6 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
return list;
}
public async Task<T> FirstOrDefaultAsync()
{
var enumerator = new NoPaginationStreamMergeEngine<T>(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
#if !EFCORE2
while (await enumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNextAsync())
#endif
{
return enumerator.Current;
}
return default;
}
public async Task<bool> AnyAsync()
{
var enumerator = (IStreamMergeAsyncEnumerator<bool>)new NoPaginationStreamMergeEngine<T>(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
#if !EFCORE2
while (await enumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNextAsync())
#endif
{
if (!enumerator.Current)
return false;
}
return true;
}
public async Task<int> CountAsync()
{
var enumerator = (IStreamMergeAsyncEnumerator<int>)new NoPaginationStreamMergeEngine<T>(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
var result = 0;
#if !EFCORE2
while (await enumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNextAsync())
#endif
{
result += enumerator.Current;
}
return result;
}
public void Dispose()
{
_streamMergeEngine.Dispose();

View File

@ -1,69 +0,0 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Enumerators;
namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
{
/*
* @Author: xjm
* @Description:
* @Date: Friday, 29 January 2021 17:55:29
* @Email: 326308290@qq.com
*/
internal class ListStreamMergeProxyEngine<T>:IDisposable
{
private readonly StreamMergeContext<T> _mergeContext;
private IStreamMergeEngine<T> _streamMergeEngine;
private const int defaultCapacity = 0x10;//默认容量为16
public ListStreamMergeProxyEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
_streamMergeEngine = GenericStreamMergeEngine<T>.Create(mergeContext);
}
public async Task<List<T>> ToListAsync()
{
//如果合并数据的时候不需要跳过也没有take多少那么就是直接next
var skip = _mergeContext.Skip;
var take = _mergeContext.Take;
var list = new List<T>(skip.GetValueOrDefault() + take ?? defaultCapacity);
var realSkip = 0;
var realTake = 0;
var enumerator = new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
#if !EFCORE2
while (await enumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNextAsync())
#endif
{
//获取真实的需要跳过的条数
if (skip.HasValue)
{
if (realSkip < skip)
{
realSkip++;
continue;
}
}
list.Add(enumerator.Current);
if (take.HasValue)
{
realTake++;
if(realTake<=take.Value)
break;
}
}
return list;
}
public void Dispose()
{
_streamMergeEngine.Dispose();
}
}
}

View File

@ -13,6 +13,6 @@ namespace ShardingCore.Core.Internal.StreamMerge
*/
internal interface IStreamMergeEngine<T>:IDisposable
{
Task<IEnumerable<IStreamMergeAsyncEnumerator<T>>> GetStreamEnumerator();
Task<IStreamMergeAsyncEnumerator<T>> GetStreamEnumerator();
}
}

View File

@ -1,148 +0,0 @@
// using System.Collections.Generic;
// using System.Linq;
// using System.Threading;
// using System.Threading.Tasks;
// using ShardingCore.Core.Internal.PriorityQueues;
//
// namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
// {
// /*
// * @Author: xjm
// * @Description:
// * @Date: Monday, 25 January 2021 08:06:12
// * @Email: 326308290@qq.com
// */
// #if !EFCORE2
// internal class OrderAsyncEnumerator<T> : IAsyncEnumerator<T>
// {
// private readonly StreamMergeContext<T> _mergeContext;
// private readonly List<IAsyncEnumerator<T>> _sources;
// private readonly PriorityQueue<OrderMergeItem<T>> _queue;
// private bool skipFirst;
// private IAsyncEnumerator<T> _currentEnumerator;
//
// public OrderAsyncEnumerator(StreamMergeContext<T> mergeContext,List<IAsyncEnumerator<T>> sources)
// {
// _mergeContext = mergeContext;
// _sources = sources;
// _queue = new PriorityQueue<OrderMergeItem<T>>(sources.Count);
// skipFirst = true;
// SetOrderEnumerator();
// }
//
// private void SetOrderEnumerator()
// {
// foreach (var source in _sources)
// {
// var orderMergeItem = new OrderMergeItem<T>(_mergeContext, source);
// if (null!=orderMergeItem.GetCurrentEnumerator().Current)
// _queue.Offer(orderMergeItem);
// }
// _currentEnumerator = _queue.IsEmpty() ? _sources.FirstOrDefault() : _queue.Peek().GetCurrentEnumerator();
// }
//
// public async ValueTask<bool> MoveNextAsync()
// {
// if (_queue.IsEmpty())
// return false;
// if (skipFirst)
// {
// skipFirst = false;
// return true;
// }
//
// var first = _queue.Poll();
// if (await first.MoveNextAsync())
// {
// _queue.Offer(first);
// }
//
// if (_queue.IsEmpty())
// {
// return false;
// }
//
// _currentEnumerator = _queue.Peek().GetCurrentEnumerator();
// return true;
// }
//
// public async ValueTask DisposeAsync()
// {
// foreach (var source in _sources)
// {
// await source.DisposeAsync();
// }
// }
//
// public T Current => _currentEnumerator.Current;
// }
// #endif
// #if EFCORE2
//
// internal class OrderAsyncEnumerator<T> : IAsyncEnumerator<T>
// {
// private readonly StreamMergeContext<T> _mergeContext;
// private readonly List<IAsyncEnumerator<T>> _sources;
// private readonly PriorityQueue<OrderMergeItem<T>> _queue;
// private bool skipFirst;
// private IAsyncEnumerator<T> _currentEnumerator;
//
// public OrderAsyncEnumerator(StreamMergeContext<T> mergeContext, List<IAsyncEnumerator<T>> sources)
// {
// _mergeContext = mergeContext;
// _sources = sources;
// _queue = new PriorityQueue<OrderMergeItem<T>>(sources.Count);
// skipFirst = true;
// SetOrderEnumerator();
// }
//
// private void SetOrderEnumerator()
// {
// foreach (var source in _sources)
// {
// var orderMergeItem = new OrderMergeItem<T>(_mergeContext, source);
// if (null!=orderMergeItem.GetCurrentEnumerator().Current)
// _queue.Offer(orderMergeItem);
// }
//
// _currentEnumerator = _queue.IsEmpty() ? _sources.FirstOrDefault() : _queue.Peek().GetCurrentEnumerator();
// }
//
// public async Task<bool> MoveNext(CancellationToken cancellationToken)
// {
// if (_queue.IsEmpty())
// return false;
// if (skipFirst)
// {
// skipFirst = false;
// return true;
// }
//
// var first = _queue.Poll();
// if (await first.MoveNextAsync())
// {
// _queue.Offer(first);
// }
//
// if (_queue.IsEmpty())
// {
// return false;
// }
//
// _currentEnumerator = _queue.Peek().GetCurrentEnumerator();
// return true;
// }
//
// public void Dispose()
// {
// foreach (var source in _sources)
// {
// source.Dispose();
// }
// }
//
//
// public T Current => _currentEnumerator.Current;
// }
// #endif
// }

View File

@ -1,158 +0,0 @@
// using System;
// using System.Collections.Generic;
// using System.Linq;
// using System.Threading.Tasks;
// using ShardingCore.Extensions;
//
// namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
// {
// /*
// * @Author: xjm
// * @Description:
// * @Date: Monday, 25 January 2021 11:33:57
// * @Email: 326308290@qq.com
// */
// #if !EFCORE2
//
// internal class OrderMergeItem<T> : IComparable<OrderMergeItem<T>>, IAsyncDisposable
// {
// /// <summary>
// /// 合并数据上下文
// /// </summary>
// private readonly StreamMergeContext<T> _mergeContext;
//
// private readonly IAsyncEnumerator<T> _source;
// private List<IComparable> _orderValues;
//
// public OrderMergeItem(StreamMergeContext<T> mergeContext, IAsyncEnumerator<T> source)
// {
// _mergeContext = mergeContext;
// _source = source;
// SetOrderValues(null!=GetCurrentEnumerator().Current);
// }
//
// public IAsyncEnumerator<T> GetCurrentEnumerator() => _source;
//
// private void SetOrderValues(bool hasElement)
// {
// _orderValues = hasElement ? GetCurrentOrderValues() : new List<IComparable>(0);
// }
// public async Task<bool> MoveNextAsync()
// {
// var has = await _source.MoveNextAsync();
// SetOrderValues(has);
// return has;
// }
//
// public bool HasElement()
// {
//
// }
//
// private List<IComparable> GetCurrentOrderValues()
// {
// if (!_mergeContext.Orders.Any())
// return new List<IComparable>(0);
// var list = new List<IComparable>(_mergeContext.Orders.Count());
// foreach (var order in _mergeContext.Orders)
// {
// var value = GetCurrentEnumerator().Current.GetValueByExpression(order.PropertyExpression);
// if (value is IComparable comparable)
// list.Add(comparable);
// else
// throw new NotSupportedException($"order by value [{order}] must implements IComparable");
// }
//
// return list;
// }
//
// public int CompareTo(OrderMergeItem<T> other)
// {
// int i = 0;
// foreach (var order in _mergeContext.Orders) {
// int result = CompareHelper.CompareToWith(_orderValues[i], other._orderValues[i], order.IsAsc);
// if (0 != result) {
// return result;
// }
// i++;
// }
// return 0;
// }
//
// public ValueTask DisposeAsync()
// {
// return _source.DisposeAsync();
// }
// }
// #endif
// #if EFCORE2
//
// internal class OrderMergeItem<T> : IComparable<OrderMergeItem<T>>, IDisposable
// {
// /// <summary>
// /// 合并数据上下文
// /// </summary>
// private readonly StreamMergeContext<T> _mergeContext;
//
// private readonly IAsyncEnumerator<T> _source;
// private List<IComparable> _orderValues;
//
// public OrderMergeItem(StreamMergeContext<T> mergeContext, IAsyncEnumerator<T> source)
// {
// _mergeContext = mergeContext;
// _source = source;
// SetOrderValues(null!=GetCurrentEnumerator().Current);
// }
//
// public IAsyncEnumerator<T> GetCurrentEnumerator() => _source;
//
// private void SetOrderValues(bool hasElement)
// {
// _orderValues = hasElement ? GetCurrentOrderValues() : new List<IComparable>(0);
// }
//
// public async Task<bool> MoveNextAsync()
// {
// var has = await _source.MoveNext();
// SetOrderValues(has);
// return has;
// }
//
// private List<IComparable> GetCurrentOrderValues()
// {
// if (!_mergeContext.Orders.Any())
// return new List<IComparable>(0);
// var list = new List<IComparable>(_mergeContext.Orders.Count());
// foreach (var order in _mergeContext.Orders)
// {
// var value = GetCurrentEnumerator().Current.GetValueByExpression(order.PropertyExpression);
// if (value is IComparable comparable)
// list.Add(comparable);
// else
// throw new NotSupportedException($"order by value [{order}] must implements IComparable");
// }
//
// return list;
// }
//
// public int CompareTo(OrderMergeItem<T> other)
// {
// int i = 0;
// foreach (var order in _mergeContext.Orders) {
// int result = CompareHelper.CompareToWith(_orderValues[i], other._orderValues[i], order.IsAsc);
// if (0 != result) {
// return result;
// }
// i++;
// }
// return 0;
// }
//
//
// public void Dispose()
// {
// _source.Dispose();
// }
// }
// #endif
// }

View File

@ -1,66 +0,0 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
using ShardingCore.Core.Internal.StreamMerge.Enumerators;
using ShardingCore.Extensions;
namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
{
/*
* @Author: xjm
* @Description:
* @Date: Monday, 25 January 2021 07:57:59
* @Email: 326308290@qq.com
*/
internal class StreamMergeListEngine<T>
{
private const int defaultCapacity = 0x10;//默认容量为16
private readonly StreamMergeContext<T> _mergeContext;
private readonly IStreamMergeAsyncEnumerator<T> _streamMergeAsyncEnumerator;
public StreamMergeListEngine(StreamMergeContext<T> mergeContext,IEnumerable<IStreamMergeAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_streamMergeAsyncEnumerator = new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext,sources);
}
public async Task<List<T>> Execute()
{
//如果合并数据的时候不需要跳过也没有take多少那么就是直接next
var skip = _mergeContext.Skip;
var take = _mergeContext.Take;
var list = new List<T>(skip.GetValueOrDefault() + take ?? defaultCapacity);
var realSkip = 0;
var realTake = 0;
#if !EFCORE2
while (await _streamMergeAsyncEnumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNextAsync())
#endif
{
//获取真实的需要跳过的条数
if (skip.HasValue)
{
if (realSkip < skip)
{
realSkip++;
continue;
}
}
list.Add(_streamMergeAsyncEnumerator.Current);
if (take.HasValue)
{
realTake++;
if(realTake<=take.Value)
break;
}
}
return list;
}
}
}

View File

@ -1,42 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge.GenericMerges;
using ShardingCore.Core.Internal.StreamMerge.ListMerge;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Extensions;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
namespace ShardingCore.Core.Internal.StreamMerge.ListSourceMerges
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 14:45:19
* @Email: 326308290@qq.com
*/
internal class StreamMergeListSourceEngine<T>
{
private readonly StreamMergeContext<T> _mergeContext;
public StreamMergeListSourceEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
}
public async Task<List<T>> Execute()
{
using (var engine =new GenericStreamMergeEngine<T>(_mergeContext))
{
var enumerators = await engine.GetStreamEnumerator();
return await new StreamMergeListEngine<T>(_mergeContext, enumerators).Execute();
}
}
}
}

View File

@ -8,12 +8,7 @@ using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge;
using ShardingCore.Core.Internal.StreamMerge.GenericMerges;
using ShardingCore.Core.Internal.StreamMerge.ListMerge;
using ShardingCore.Core.Internal.StreamMerge.ListSourceMerges;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualTables;
using ShardingCore.DbContexts;
using ShardingCore.Extensions;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
@ -31,6 +26,7 @@ namespace ShardingCore.Core
/// 分表查询构造器
/// </summary>
/// <typeparam name="T"></typeparam>
public class ShardingQueryable<T> : IShardingQueryable<T>
{
private IQueryable<T> _source;
@ -41,13 +37,18 @@ namespace ShardingCore.Core
private readonly IRoutingRuleEngineFactory _routingRuleEngineFactory;
public ShardingQueryable(IQueryable<T> source)
private ShardingQueryable(IQueryable<T> source)
{
_source = source;
_streamMergeContextFactory = ShardingContainer.Services.GetService<IStreamMergeContextFactory>();
_routingRuleEngineFactory=ShardingContainer.Services.GetService<IRoutingRuleEngineFactory>();
}
public static ShardingQueryable<TSource> Create<TSource>(IQueryable<TSource> source)
{
return new ShardingQueryable<TSource>(source);
}
public IShardingQueryable<T> EnableAutoRouteParse()
{
@ -104,7 +105,7 @@ namespace ShardingCore.Core
}
private async Task<List<TResult>> GetGenericMergeEngine<TResult>(Func<IQueryable, Task<TResult>> efQuery)
{
return await new GenericMergeEngine<T>(GetContext()).Execute(efQuery);
return await GenericInMemoryMergeEngine<T>.Create(GetContext()).ExecuteAsync(efQuery);
}
public async Task<int> CountAsync()
@ -120,12 +121,12 @@ namespace ShardingCore.Core
}
public async Task<List<T>> ToListAsync()
public async Task<List<T>> ToListAsync(int capacity=20)
{
var context = GetContext();
using (var engine = GenericStreamMergeProxyEngine<T>.Create(context))
{
return await engine.ToListAsync();
return await engine.ToListAsync(capacity);
}
}
@ -133,10 +134,13 @@ namespace ShardingCore.Core
public async Task<T> FirstOrDefaultAsync()
{
var context = GetContext();
using (var engine = GenericStreamMergeProxyEngine<T>.Create(context))
{
return await engine.FirstOrDefaultAsync();
}
var result= await GenericInMemoryMergeEngine<T>.Create(context).ExecuteAsync(async queryable => await EntityFrameworkQueryableExtensions.FirstOrDefaultAsync((IQueryable<T>) queryable));
var q = result.Where(o => o != null).AsQueryable();
if (context.Orders.Any())
return q.OrderWithExpression(context.Orders).FirstOrDefault();
return q.FirstOrDefault();
}
public async Task<bool> AnyAsync()
@ -145,6 +149,7 @@ namespace ShardingCore.Core
.Any(o => o);
}
public async Task<T> MaxAsync()
{
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.MaxAsync((IQueryable<T>) queryable));
@ -197,6 +202,28 @@ namespace ShardingCore.Core
return results.Sum();
}
public async Task<decimal> DecimalAverageAsync()
{
if (typeof(T) != typeof(decimal))
throw new InvalidOperationException($"{typeof(T)} cast to decimal failed");
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.AverageAsync((IQueryable<decimal>) queryable));
return results.Sum()/results.Count();
}
public async Task<double> DoubleAverageAsync()
{
if (typeof(T) != typeof(double))
throw new InvalidOperationException($"{typeof(T)} cast to double failed");
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.AverageAsync((IQueryable<double>) queryable));
return results.Sum()/results.Count();
}
public async Task<float> FloatAverageAsync()
{
if (typeof(T) != typeof(float))
throw new InvalidOperationException($"{typeof(T)} cast to float failed");
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.AverageAsync((IQueryable<float>) queryable));
return results.Sum()/results.Count();
}
}
}

View File

@ -23,11 +23,11 @@ namespace ShardingCore.Extensions
/// <returns></returns>
public static IShardingQueryable<T> AsSharding<T>(this IQueryable<T> source)
{
return new ShardingQueryable<T>(source);
return ShardingQueryable<T>.Create(source);
}
public static async Task<bool> ShardingAnyAsync<T>(this IQueryable<T> source, Expression<Func<T, bool>> predicate)
{
return await new ShardingQueryable<T>(source.Where(predicate)).AnyAsync();
return await ShardingQueryable<T>.Create(source.Where(predicate)).AnyAsync();
}
/// <summary>
/// 分页
@ -37,7 +37,7 @@ namespace ShardingCore.Extensions
/// <param name="pageSize"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static async Task<ShardingPagedResult<T>> ToShardingPageResultAsync<T>(this IQueryable<T> source, int pageIndex, int pageSize)
public static async Task<ShardingPagedResult<T>> ShardingPageResultAsync<T>(this IQueryable<T> source, int pageIndex, int pageSize)
{
//设置每次获取多少页
var take = pageSize <= 0 ? 1 : pageSize;
@ -46,7 +46,7 @@ namespace ShardingCore.Extensions
//需要跳过多少页
var skip = (index - 1) * take;
//获取每次总记录数
var count = await new ShardingQueryable<T>(source).CountAsync();
var count = await ShardingQueryable<T>.Create(source).CountAsync();
//当数据库数量小于要跳过的条数就说明没数据直接返回不在查询list
if (count <= skip)
@ -55,7 +55,7 @@ namespace ShardingCore.Extensions
int remainingCount = count - skip;
//当剩余条数小于take数就取remainingCount
var realTake = remainingCount < take ? remainingCount : take;
var data = await new ShardingQueryable<T>(source.Skip(skip).Take(realTake)).ToListAsync();
var data = await ShardingQueryable<T>.Create(source.Skip(skip).Take(realTake)).ToListAsync(realTake);
return new ShardingPagedResult<T>(data, count);
}
/// <summary>
@ -66,7 +66,7 @@ namespace ShardingCore.Extensions
/// <returns></returns>
public static List<T> ToShardingList<T>(this IQueryable<T> source)
{
return new ShardingQueryable<T>(source).ToList();
return ShardingQueryable<T>.Create(source).ToList();
}
/// <summary>
/// 分页
@ -76,7 +76,7 @@ namespace ShardingCore.Extensions
/// <returns></returns>
public static async Task<List<T>> ToShardingListAsync<T>(this IQueryable<T> source)
{
return await new ShardingQueryable<T>(source).ToListAsync();
return await ShardingQueryable<T>.Create(source).ToListAsync();
}
/// <summary>
/// 第一条
@ -86,7 +86,7 @@ namespace ShardingCore.Extensions
/// <returns></returns>
public static T ShardingFirstOrDefault<T>(this IQueryable<T> source)
{
return new ShardingQueryable<T>(source).FirstOrDefault();
return ShardingQueryable<T>.Create(source).FirstOrDefault();
}
/// <summary>
/// 第一条
@ -96,7 +96,7 @@ namespace ShardingCore.Extensions
/// <returns></returns>
public static async Task<T> ShardingFirstOrDefaultAsync<T>(this IQueryable<T> source)
{
return await new ShardingQueryable<T>(source).FirstOrDefaultAsync();
return await ShardingQueryable<T>.Create(source).FirstOrDefaultAsync();
}
/// <summary>
/// 最大
@ -106,7 +106,7 @@ namespace ShardingCore.Extensions
/// <returns></returns>
public static T ShardingMax<T>(this IQueryable<T> source)
{
return new ShardingQueryable<T>(source).Max();
return ShardingQueryable<T>.Create(source).Max();
}
/// <summary>
/// 最大
@ -116,7 +116,7 @@ namespace ShardingCore.Extensions
/// <returns></returns>
public static async Task<T> ShardingMaxAsync<T>(this IQueryable<T> source)
{
return await new ShardingQueryable<T>(source).MaxAsync();
return await ShardingQueryable<T>.Create(source).MaxAsync();
}
/// <summary>
/// 最大
@ -144,11 +144,11 @@ namespace ShardingCore.Extensions
}
public static T ShardingMin<T>(this IQueryable<T> source)
{
return new ShardingQueryable<T>(source).Min();
return ShardingQueryable<T>.Create(source).Min();
}
public static async Task<T> ShardingMinAsync<T>(this IQueryable<T> source)
{
return await new ShardingQueryable<T>(source).MinAsync();
return await ShardingQueryable<T>.Create(source).MinAsync();
}
/// <summary>
/// 最小
@ -176,27 +176,27 @@ namespace ShardingCore.Extensions
}
public static int ShardingCount<T>(this IQueryable<T> source)
{
return new ShardingQueryable<T>(source).Count();
return ShardingQueryable<T>.Create(source).Count();
}
public static async Task<int> ShardingCountAsync<T>(this IQueryable<T> source)
{
return await new ShardingQueryable<T>(source).CountAsync();
return await ShardingQueryable<T>.Create(source).CountAsync();
}
public static long ShardingLongCount<T>(this IQueryable<T> source)
{
return new ShardingQueryable<T>(source).LongCount();
return ShardingQueryable<T>.Create(source).LongCount();
}
public static async Task<long> ShardingLongCountAsync<T>(this IQueryable<T> source)
{
return await new ShardingQueryable<T>(source).LongCountAsync();
return await ShardingQueryable<T>.Create(source).LongCountAsync();
}
public static int ShardingSum(this IQueryable<int> source)
{
return new ShardingQueryable<int>(source).Sum();
return ShardingQueryable<int>.Create(source).Sum();
}
public static async Task<int> ShardingSumAsync(this IQueryable<int> source)
{
return await new ShardingQueryable<int>(source).SumAsync();
return await ShardingQueryable<int>.Create(source).SumAsync();
}
public static int ShardingSum<T>(this IQueryable<T> source,Expression<Func<T,int>> keySelector)
{
@ -208,11 +208,11 @@ namespace ShardingCore.Extensions
}
public static long ShardingSum(this IQueryable<long> source)
{
return new ShardingQueryable<long>(source).LongSum();
return ShardingQueryable<long>.Create(source).LongSum();
}
public static async Task<long> ShardingSumAsync(this IQueryable<long> source)
{
return await new ShardingQueryable<long>(source).LongSumAsync();
return await ShardingQueryable<long>.Create(source).LongSumAsync();
}
public static long ShardingSum<T>(this IQueryable<T> source,Expression<Func<T,long>> keySelector)
{
@ -224,11 +224,11 @@ namespace ShardingCore.Extensions
}
public static double ShardingSum(this IQueryable<double> source)
{
return new ShardingQueryable<double>(source).DoubleSum();
return ShardingQueryable<double>.Create(source).DoubleSum();
}
public static async Task<double> ShardingSumAsync(this IQueryable<double> source)
{
return await new ShardingQueryable<double>(source).DoubleSumAsync();
return await ShardingQueryable<double>.Create(source).DoubleSumAsync();
}
public static double ShardingSum<T>(this IQueryable<T> source,Expression<Func<T,double>> keySelector)
{
@ -240,11 +240,11 @@ namespace ShardingCore.Extensions
}
public static decimal ShardingSum(this IQueryable<decimal> source)
{
return new ShardingQueryable<decimal>(source).DecimalSum();
return ShardingQueryable<decimal>.Create(source).DecimalSum();
}
public static async Task<decimal> ShardingSumAsync(this IQueryable<decimal> source)
{
return await new ShardingQueryable<decimal>(source).DecimalSumAsync();
return await ShardingQueryable<decimal>.Create(source).DecimalSumAsync();
}
public static decimal ShardingSum<T>(this IQueryable<T> source,Expression<Func<T,decimal>> keySelector)
{
@ -256,11 +256,11 @@ namespace ShardingCore.Extensions
}
public static float ShardingSum(this IQueryable<float> source)
{
return new ShardingQueryable<float>(source).FloatSum();
return ShardingQueryable<float>.Create(source).FloatSum();
}
public static async Task<float> ShardingSumAsync(this IQueryable<float> source)
{
return await new ShardingQueryable<float>(source).FloatSumAsync();
return await ShardingQueryable<float>.Create(source).FloatSumAsync();
}
public static float ShardingSum<T>(this IQueryable<T> source,Expression<Func<T,float>> keySelector)
{

View File

@ -63,6 +63,8 @@ namespace ShardingCore.Test50.MySql
Assert.Contains(sysUserMods, o =>o.Id==id);
Assert.Contains(sysUserRanges, o =>o.Id==id);
}
Assert.DoesNotContain(sysUserMods,o=>o.Age>4);
Assert.DoesNotContain(sysUserRanges,o=>o.Age>4);
}
[Fact]
public async Task ToList_Id_Eq_Test()
@ -85,6 +87,21 @@ namespace ShardingCore.Test50.MySql
Assert.DoesNotContain(ranges,o=>o.Id=="3");
}
[Fact]
public async Task ToList_Id_Not_Eq_Skip_Test()
{
var mods=await _virtualDbContext.Set<SysUserMod>().Where(o=>o.Id!="3").OrderBy(o=>o.Age).Skip(2).ToShardingListAsync();
Assert.Equal(97,mods.Count);
Assert.DoesNotContain(mods,o=>o.Id=="3");
Assert.Equal(4,mods[0].Age);
Assert.Equal(5,mods[1].Age);
var modsDesc=await _virtualDbContext.Set<SysUserMod>().Where(o=>o.Id!="3").OrderByDescending(o=>o.Age).Skip(13).ToShardingListAsync();
Assert.Equal(86,modsDesc.Count);
Assert.DoesNotContain(mods,o=>o.Id=="3");
Assert.Equal(87,modsDesc[0].Age);
Assert.Equal(86,modsDesc[1].Age);
}
[Fact]
public async Task ToList_Name_Eq_Test()
{
var mods=await _virtualDbContext.Set<SysUserMod>().Where(o=>o.Name=="name_3").ToShardingListAsync();