diff --git a/src/ShardingCore/Core/IShardingQueryable.cs b/src/ShardingCore/Core/IShardingQueryable.cs
index 83df1a2e..649e6800 100644
--- a/src/ShardingCore/Core/IShardingQueryable.cs
+++ b/src/ShardingCore/Core/IShardingQueryable.cs
@@ -55,7 +55,7 @@ namespace ShardingCore.Core
/// 异步获取列表
///
///
- Task> ToListAsync();
+ Task> ToListAsync(int capacity=20);
///
@@ -111,6 +111,21 @@ namespace ShardingCore.Core
///
///
Task FloatSumAsync();
+ ///
+ /// 平均数
+ ///
+ ///
+ Task DecimalAverageAsync();
+ ///
+ /// 平均数
+ ///
+ ///
+ Task DoubleAverageAsync();
+ ///
+ /// 平均数
+ ///
+ ///
+ Task FloatAverageAsync();
}
}
\ No newline at end of file
diff --git a/src/ShardingCore/Core/Internal/PriorityQueues/PriorityQueue.cs b/src/ShardingCore/Core/Internal/PriorityQueues/PriorityQueue.cs
index 36908f81..88590611 100644
--- a/src/ShardingCore/Core/Internal/PriorityQueues/PriorityQueue.cs
+++ b/src/ShardingCore/Core/Internal/PriorityQueues/PriorityQueue.cs
@@ -13,7 +13,7 @@ namespace ShardingCore.Core.Internal.PriorityQueues {
public PriorityQueue()
: this(defaultCapacity) {
}
- public PriorityQueue(int initCapacity,bool ascending = false,IComparer comparer=null) {
+ public PriorityQueue(int initCapacity,bool ascending = true,IComparer comparer=null) {
buffer = new T[initCapacity];
heapLength = 0;
descending = ascending;
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/MultiOrderStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/MultiOrderStreamMergeAsyncEnumerator.cs
index 99a4ab6c..06e47ac8 100644
--- a/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/MultiOrderStreamMergeAsyncEnumerator.cs
+++ b/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/MultiOrderStreamMergeAsyncEnumerator.cs
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.PriorityQueues;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
@@ -16,23 +17,23 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
internal class MultiOrderStreamMergeAsyncEnumerator : IStreamMergeAsyncEnumerator
{
private readonly StreamMergeContext _mergeContext;
- private readonly IEnumerable> _sources;
+ private readonly IEnumerable> _enumerators;
private readonly PriorityQueue> _queue;
private IStreamMergeAsyncEnumerator _currentEnumerator;
private bool skipFirst;
- public MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext mergeContext, IEnumerable> sources)
+ public MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext mergeContext, IEnumerable> enumerators)
{
_mergeContext = mergeContext;
- _sources = sources;
- _queue = new PriorityQueue>(sources.Count(),true);
+ _enumerators = enumerators;
+ _queue = new PriorityQueue>(enumerators.Count());
skipFirst = true;
SetOrderEnumerator();
}
private void SetOrderEnumerator()
{
- foreach (var source in _sources)
+ foreach (var source in _enumerators)
{
var orderStreamEnumerator = new OrderStreamMergeAsyncEnumerator(_mergeContext, source);
if (orderStreamEnumerator.HasElement())
@@ -42,10 +43,14 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
}
}
- _currentEnumerator = _queue.IsEmpty() ? _sources.FirstOrDefault() : _queue.Peek();
+ _currentEnumerator = _queue.IsEmpty() ? _enumerators.FirstOrDefault() : _queue.Peek();
}
-
+#if EFCORE2
+ public async Task MoveNext(CancellationToken cancellationToken)
+#endif
+#if !EFCORE2
public async ValueTask MoveNextAsync()
+#endif
{
if (_queue.IsEmpty())
return false;
@@ -56,7 +61,12 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
}
var first = _queue.Poll();
+#if EFCORE2
+ if (await first.MoveNext(cancellationToken))
+#endif
+#if !EFCORE2
if (await first.MoveNextAsync())
+#endif
{
_queue.Offer(first);
}
@@ -88,13 +98,26 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
public T ReallyCurrent => _queue.IsEmpty()?default(T):_queue.Peek().ReallyCurrent;
+#if !EFCORE2
+
public async ValueTask DisposeAsync()
{
- foreach (var source in _sources)
+ foreach (var enumerator in _enumerators)
{
- await source.DisposeAsync();
+ await enumerator.DisposeAsync();
}
}
+#endif
+
+#if EFCORE2
+ public void Dispose()
+ {
+ foreach (var enumerator in _enumerators)
+ {
+ enumerator.Dispose();
+ }
+ }
+#endif
public T Current => skipFirst ? default : _currentEnumerator.Current;
}
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/NoPaginationStreamMergeEngine.cs b/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/NoPaginationStreamMergeEngine.cs
deleted file mode 100644
index ef4bcc30..00000000
--- a/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/NoPaginationStreamMergeEngine.cs
+++ /dev/null
@@ -1,72 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-using ShardingCore.Core.Internal.StreamMerge.Abstractions;
-
-namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
-{
-/*
-* @Author: xjm
-* @Description:
-* @Date: Friday, 29 January 2021 20:55:42
-* @Email: 326308290@qq.com
-*/
- internal class NoPaginationStreamMergeEngine:IStreamMergeAsyncEnumerator
- {
- private readonly StreamMergeContext _mergeContext;
- private readonly IStreamMergeAsyncEnumerator _enumerator;
-
- public NoPaginationStreamMergeEngine(StreamMergeContext mergeContext,IEnumerable> sources)
- {
- _mergeContext = mergeContext;
- _enumerator = new MultiOrderStreamMergeAsyncEnumerator(_mergeContext,sources);;
- }
-
- public async ValueTask MoveNextAsync()
- {
- //如果合并数据的时候不需要跳过也没有take多少那么就是直接next
- var skip = _mergeContext.Skip;
- var take = _mergeContext.Take;
- var realSkip = 0;
- var realTake = 0;
- var has = await _enumerator.MoveNextAsync();
- if (has)
- {
- //获取真实的需要跳过的条数
- if (skip.HasValue)
- {
- if (realSkip < skip)
- {
- realSkip++;
- }
- }
- if (take.HasValue)
- {
- realTake++;
- if (realTake <= take.Value)
- return false;
- }
- }
-
- return has;
- }
-
- public T Current => _enumerator.Current;
- public bool SkipFirst()
- {
- return _enumerator.SkipFirst();
- }
-
- public bool HasElement()
- {
- return _enumerator.HasElement();
- }
-
- public T ReallyCurrent => _enumerator.ReallyCurrent;
-
- public async ValueTask DisposeAsync()
- {
- await _enumerator.DisposeAsync();
- }
- }
-}
\ No newline at end of file
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/NoPaginationStreamMergeEnumerator.cs b/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/NoPaginationStreamMergeEnumerator.cs
new file mode 100644
index 00000000..9616618f
--- /dev/null
+++ b/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/NoPaginationStreamMergeEnumerator.cs
@@ -0,0 +1,103 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using ShardingCore.Core.Internal.StreamMerge.Abstractions;
+
+namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
+{
+/*
+* @Author: xjm
+* @Description:
+* @Date: Friday, 29 January 2021 20:55:42
+* @Email: 326308290@qq.com
+*/
+ internal class NoPaginationStreamMergeEnumerator:IStreamMergeAsyncEnumerator
+ {
+ private readonly StreamMergeContext _mergeContext;
+ private readonly IStreamMergeAsyncEnumerator _enumerator;
+ private readonly int? _skip;
+ private readonly int? _take;
+ private int realSkip=0;
+ private int realTake = 0;
+
+ public NoPaginationStreamMergeEnumerator(StreamMergeContext mergeContext,IEnumerable> sources)
+ {
+ _mergeContext = mergeContext;
+ _skip = mergeContext.Skip;
+ _take = mergeContext.Take;
+ _enumerator = new MultiOrderStreamMergeAsyncEnumerator(_mergeContext,sources);;
+ }
+#if !EFCORE2
+
+ public async ValueTask MoveNextAsync()
+#endif
+#if EFCORE2
+
+ public async Task MoveNext(CancellationToken cancellationToken)
+#endif
+ {
+ //如果合并数据的时候不需要跳过也没有take多少那么就是直接next
+ while (_skip.GetValueOrDefault() > this.realSkip)
+ {
+#if !EFCORE2
+ var has = await _enumerator.MoveNextAsync();
+
+#endif
+#if EFCORE2
+ var has = await _enumerator.MoveNext(cancellationToken);
+#endif
+ realSkip++;
+ if (!has)
+ return false;
+ }
+
+#if !EFCORE2
+ var next = await _enumerator.MoveNextAsync();
+
+#endif
+#if EFCORE2
+ var next = await _enumerator.MoveNext(cancellationToken);
+#endif
+ if (next)
+ {
+ if (_take.HasValue)
+ {
+ realTake++;
+ if (realTake >= _take.Value)
+ return false;
+ }
+ }
+
+ return next;
+ }
+
+ public T Current => _enumerator.Current;
+ public bool SkipFirst()
+ {
+ return _enumerator.SkipFirst();
+ }
+
+ public bool HasElement()
+ {
+ return _enumerator.HasElement();
+ }
+
+ public T ReallyCurrent => _enumerator.ReallyCurrent;
+
+#if !EFCORE2
+
+ public async ValueTask DisposeAsync()
+ {
+ await _enumerator.DisposeAsync();
+ }
+#endif
+
+#if EFCORE2
+ public void Dispose()
+ {
+ _enumerator.Dispose();
+ }
+#endif
+ }
+}
\ No newline at end of file
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/OrderStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/OrderStreamMergeAsyncEnumerator.cs
index 6a474627..7b9a8b21 100644
--- a/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/OrderStreamMergeAsyncEnumerator.cs
+++ b/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/OrderStreamMergeAsyncEnumerator.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
using ShardingCore.Extensions;
@@ -20,13 +21,13 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
///
private readonly StreamMergeContext _mergeContext;
- private readonly IStreamMergeAsyncEnumerator _source;
+ private readonly IStreamMergeAsyncEnumerator _enumerator;
private List _orderValues;
- public OrderStreamMergeAsyncEnumerator(StreamMergeContext mergeContext, IStreamMergeAsyncEnumerator source)
+ public OrderStreamMergeAsyncEnumerator(StreamMergeContext mergeContext, IStreamMergeAsyncEnumerator enumerator)
{
_mergeContext = mergeContext;
- _source = source;
+ _enumerator = enumerator;
SetOrderValues();
}
@@ -34,26 +35,40 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
{
_orderValues = HasElement() ? GetCurrentOrderValues() : new List(0);
}
+
+#if !EFCORE2
+
public async ValueTask MoveNextAsync()
{
- var has = await _source.MoveNextAsync();
+ var has = await _enumerator.MoveNextAsync();
SetOrderValues();
return has;
}
+#endif
+
+#if EFCORE2
- public T Current =>_source.Current;
+ public async Task MoveNext(CancellationToken cancellationToken)
+ {
+ var has = await _enumerator.MoveNext(cancellationToken);
+ SetOrderValues();
+ return has;
+ }
+#endif
+
+ public T Current =>_enumerator.Current;
public bool SkipFirst()
{
- return _source.SkipFirst();
+ return _enumerator.SkipFirst();
}
public bool HasElement()
{
- return _source.HasElement();
+ return _enumerator.HasElement();
}
- public T ReallyCurrent => _source.ReallyCurrent;
+ public T ReallyCurrent => _enumerator.ReallyCurrent;
private List GetCurrentOrderValues()
{
@@ -62,7 +77,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
var list = new List(_mergeContext.Orders.Count());
foreach (var order in _mergeContext.Orders)
{
- var value = _source.ReallyCurrent.GetValueByExpression(order.PropertyExpression);
+ var value = _enumerator.ReallyCurrent.GetValueByExpression(order.PropertyExpression);
if (value is IComparable comparable)
list.Add(comparable);
else
@@ -89,10 +104,19 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
{
return _orderValues ?? new List(0);
}
+#if !EFCORE2
- public ValueTask DisposeAsync()
+ public async ValueTask DisposeAsync()
{
- return _source.DisposeAsync();
+ await _enumerator.DisposeAsync();
}
+#endif
+
+#if EFCORE2
+ public void Dispose()
+ {
+ _enumerator.Dispose();
+ }
+#endif
}
}
\ No newline at end of file
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/StreamMergeAsyncEnumerator.cs b/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/StreamMergeAsyncEnumerator.cs
index f926ae13..ea9faa98 100644
--- a/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/StreamMergeAsyncEnumerator.cs
+++ b/src/ShardingCore/Core/Internal/StreamMerge/Enumerators/StreamMergeAsyncEnumerator.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
@@ -24,6 +25,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
}
+#if !EFCORE2
public async ValueTask MoveNextAsync()
{
if (skip)
@@ -34,6 +36,25 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
return await _source.MoveNextAsync();
}
+#endif
+#if EFCORE2
+ public async Task MoveNext()
+ {
+ if (skip)
+ {
+ skip = false;
+ return null!=_source.Current;
+ }
+ return await _source.MoveNext();
+ }
+
+#endif
+
+ public Task MoveNext(CancellationToken cancellationToken)
+ {
+ throw new NotImplementedException();
+ }
+
public T Current => skip?default:_source.Current;
public bool SkipFirst()
@@ -52,10 +73,19 @@ namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
}
public T ReallyCurrent => _source.Current;
+#if !EFCORE2
public async ValueTask DisposeAsync()
{
await _source.DisposeAsync();
}
+#endif
+
+#if EFCORE2
+ public void Dispose()
+ {
+ _source.Dispose();
+ }
+#endif
}
}
\ No newline at end of file
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericMergeEngine.cs b/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericInMemoryMergeEngine.cs
similarity index 82%
rename from src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericMergeEngine.cs
rename to src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericInMemoryMergeEngine.cs
index 89e1d6b9..01be8e40 100644
--- a/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericMergeEngine.cs
+++ b/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericInMemoryMergeEngine.cs
@@ -4,7 +4,6 @@ using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.RoutingRuleEngines;
-using ShardingCore.Core.Internal.StreamMerge.ListMerge;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Extensions;
@@ -16,14 +15,18 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
* @Date: Thursday, 28 January 2021 17:04:29
* @Email: 326308290@qq.com
*/
- internal class GenericMergeEngine
+ internal class GenericInMemoryMergeEngine
{
private readonly StreamMergeContext _mergeContext;
- public GenericMergeEngine(StreamMergeContext mergeContext)
+ private GenericInMemoryMergeEngine(StreamMergeContext mergeContext)
{
_mergeContext = mergeContext;
}
+ public static GenericInMemoryMergeEngine Create(StreamMergeContext mergeContext)
+ {
+ return new GenericInMemoryMergeEngine(mergeContext);
+ }
private async Task EFCoreExecute(IQueryable newQueryable,RouteResult routeResult,Func> efQuery)
{
@@ -33,7 +36,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
return await efQuery(newQueryable);
}
}
- public async Task> Execute(Func> efQuery)
+ public async Task> ExecuteAsync(Func> efQuery)
{
//从各个分表获取数据
List parallelDbContexts = new List(_mergeContext.RouteResults.Count());
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericStreamMergeEngine.cs b/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericStreamMergeEngine.cs
index 49f51b78..86044726 100644
--- a/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericStreamMergeEngine.cs
+++ b/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericStreamMergeEngine.cs
@@ -6,10 +6,13 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
using ShardingCore.Core.Internal.StreamMerge.Enumerators;
-using ShardingCore.Core.Internal.StreamMerge.ListMerge;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Extensions;
+#if EFCORE2
+using Microsoft.EntityFrameworkCore.Extensions.Internal;
+#endif
+
namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
{
/*
@@ -18,7 +21,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
* @Date: Friday, 29 January 2021 15:40:15
* @Email: 326308290@qq.com
*/
- internal class GenericStreamMergeEngine :IStreamMergeEngine
+ internal class GenericStreamMergeEngine : IStreamMergeEngine
{
private readonly StreamMergeContext _mergeContext;
private readonly List _parallelDbContexts;
@@ -33,6 +36,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
{
return new GenericStreamMergeEngine(mergeContext);
}
+
private async Task> GetAsyncEnumerator(IQueryable newQueryable, RouteResult routeResult)
{
using (var scope = _mergeContext.CreateScope())
@@ -50,7 +54,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
}
}
- public async Task>> GetStreamEnumerator()
+ public async Task> GetStreamEnumerator()
{
var enumeratorTasks = _mergeContext.RouteResults.Select(routeResult =>
{
@@ -61,10 +65,13 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
var newQueryable = (IQueryable) _mergeContext.GetReWriteQueryable().ReplaceDbContextQueryable(shardingDbContext);
var asyncEnumerator = await GetAsyncEnumerator(newQueryable, routeResult);
- return new StreamMergeAsyncEnumerator( asyncEnumerator);
+ return new StreamMergeAsyncEnumerator(asyncEnumerator);
});
}).ToArray();
- return (await Task.WhenAll(enumeratorTasks)).ToList();
+ var streamEnumerators = await Task.WhenAll(enumeratorTasks);
+ if (_mergeContext.Skip.HasValue || _mergeContext.Take.HasValue)
+ return new NoPaginationStreamMergeEnumerator(_mergeContext,streamEnumerators );
+ return new MultiOrderStreamMergeAsyncEnumerator(_mergeContext, streamEnumerators);
}
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericStreamMergeProxyEngine.cs b/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericStreamMergeProxyEngine.cs
index a519e3bf..4a744484 100644
--- a/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericStreamMergeProxyEngine.cs
+++ b/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/GenericStreamMergeProxyEngine.cs
@@ -31,13 +31,13 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
public async Task> ToListAsync(int capacity=20)
{
- var enumerator = new NoPaginationStreamMergeEngine(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
+ var enumerator = await _streamMergeEngine.GetStreamEnumerator();
var list = new List(capacity);
#if !EFCORE2
while (await enumerator.MoveNextAsync())
#endif
#if EFCORE2
- while (await enumerator.MoveNextAsync())
+ while (await enumerator.MoveNext())
#endif
{
list.Add(enumerator.Current);
@@ -45,60 +45,6 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
return list;
}
-
- public async Task FirstOrDefaultAsync()
- {
- var enumerator = new NoPaginationStreamMergeEngine(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
-
-#if !EFCORE2
- while (await enumerator.MoveNextAsync())
-#endif
-#if EFCORE2
- while (await enumerator.MoveNextAsync())
-#endif
- {
- return enumerator.Current;
- }
-
- return default;
- }
-
- public async Task AnyAsync()
- {
- var enumerator = (IStreamMergeAsyncEnumerator)new NoPaginationStreamMergeEngine(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
-
-#if !EFCORE2
- while (await enumerator.MoveNextAsync())
-#endif
-#if EFCORE2
- while (await enumerator.MoveNextAsync())
-#endif
- {
- if (!enumerator.Current)
- return false;
- }
-
- return true;
- }
-
-
- public async Task CountAsync()
- {
- var enumerator = (IStreamMergeAsyncEnumerator)new NoPaginationStreamMergeEngine(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
- var result = 0;
-#if !EFCORE2
- while (await enumerator.MoveNextAsync())
-#endif
-#if EFCORE2
- while (await enumerator.MoveNextAsync())
-#endif
- {
- result += enumerator.Current;
- }
-
- return result;
- }
-
public void Dispose()
{
_streamMergeEngine.Dispose();
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/ListStreamMergeProxyEngine.cs b/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/ListStreamMergeProxyEngine.cs
deleted file mode 100644
index 869ac58c..00000000
--- a/src/ShardingCore/Core/Internal/StreamMerge/GenericMerges/ListStreamMergeProxyEngine.cs
+++ /dev/null
@@ -1,69 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-using ShardingCore.Core.Internal.StreamMerge.Enumerators;
-
-namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
-{
-/*
-* @Author: xjm
-* @Description:
-* @Date: Friday, 29 January 2021 17:55:29
-* @Email: 326308290@qq.com
-*/
- internal class ListStreamMergeProxyEngine:IDisposable
- {
- private readonly StreamMergeContext _mergeContext;
- private IStreamMergeEngine _streamMergeEngine;
- private const int defaultCapacity = 0x10;//默认容量为16
-
-
- public ListStreamMergeProxyEngine(StreamMergeContext mergeContext)
- {
- _mergeContext = mergeContext;
- _streamMergeEngine = GenericStreamMergeEngine.Create(mergeContext);
- }
-
- public async Task> ToListAsync()
- {
- //如果合并数据的时候不需要跳过也没有take多少那么就是直接next
- var skip = _mergeContext.Skip;
- var take = _mergeContext.Take;
- var list = new List(skip.GetValueOrDefault() + take ?? defaultCapacity);
- var realSkip = 0;
- var realTake = 0;
- var enumerator = new MultiOrderStreamMergeAsyncEnumerator(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
-#if !EFCORE2
- while (await enumerator.MoveNextAsync())
-#endif
-#if EFCORE2
- while (await enumerator.MoveNextAsync())
-#endif
- {
- //获取真实的需要跳过的条数
- if (skip.HasValue)
- {
- if (realSkip < skip)
- {
- realSkip++;
- continue;
- }
- }
- list.Add(enumerator.Current);
- if (take.HasValue)
- {
- realTake++;
- if(realTake<=take.Value)
- break;
- }
- }
-
- return list;
-
- }
- public void Dispose()
- {
- _streamMergeEngine.Dispose();
- }
- }
-}
\ No newline at end of file
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/IStreamMergeEngine.cs b/src/ShardingCore/Core/Internal/StreamMerge/IStreamMergeEngine.cs
index 46cfec9a..ac42bdce 100644
--- a/src/ShardingCore/Core/Internal/StreamMerge/IStreamMergeEngine.cs
+++ b/src/ShardingCore/Core/Internal/StreamMerge/IStreamMergeEngine.cs
@@ -13,6 +13,6 @@ namespace ShardingCore.Core.Internal.StreamMerge
*/
internal interface IStreamMergeEngine:IDisposable
{
- Task>> GetStreamEnumerator();
+ Task> GetStreamEnumerator();
}
}
\ No newline at end of file
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/ListMerge/OrderAsyncEnumerator.cs b/src/ShardingCore/Core/Internal/StreamMerge/ListMerge/OrderAsyncEnumerator.cs
deleted file mode 100644
index 33445a26..00000000
--- a/src/ShardingCore/Core/Internal/StreamMerge/ListMerge/OrderAsyncEnumerator.cs
+++ /dev/null
@@ -1,148 +0,0 @@
-// using System.Collections.Generic;
-// using System.Linq;
-// using System.Threading;
-// using System.Threading.Tasks;
-// using ShardingCore.Core.Internal.PriorityQueues;
-//
-// namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
-// {
-// /*
-// * @Author: xjm
-// * @Description:
-// * @Date: Monday, 25 January 2021 08:06:12
-// * @Email: 326308290@qq.com
-// */
-// #if !EFCORE2
-// internal class OrderAsyncEnumerator : IAsyncEnumerator
-// {
-// private readonly StreamMergeContext _mergeContext;
-// private readonly List> _sources;
-// private readonly PriorityQueue> _queue;
-// private bool skipFirst;
-// private IAsyncEnumerator _currentEnumerator;
-//
-// public OrderAsyncEnumerator(StreamMergeContext mergeContext,List> sources)
-// {
-// _mergeContext = mergeContext;
-// _sources = sources;
-// _queue = new PriorityQueue>(sources.Count);
-// skipFirst = true;
-// SetOrderEnumerator();
-// }
-//
-// private void SetOrderEnumerator()
-// {
-// foreach (var source in _sources)
-// {
-// var orderMergeItem = new OrderMergeItem(_mergeContext, source);
-// if (null!=orderMergeItem.GetCurrentEnumerator().Current)
-// _queue.Offer(orderMergeItem);
-// }
-// _currentEnumerator = _queue.IsEmpty() ? _sources.FirstOrDefault() : _queue.Peek().GetCurrentEnumerator();
-// }
-//
-// public async ValueTask MoveNextAsync()
-// {
-// if (_queue.IsEmpty())
-// return false;
-// if (skipFirst)
-// {
-// skipFirst = false;
-// return true;
-// }
-//
-// var first = _queue.Poll();
-// if (await first.MoveNextAsync())
-// {
-// _queue.Offer(first);
-// }
-//
-// if (_queue.IsEmpty())
-// {
-// return false;
-// }
-//
-// _currentEnumerator = _queue.Peek().GetCurrentEnumerator();
-// return true;
-// }
-//
-// public async ValueTask DisposeAsync()
-// {
-// foreach (var source in _sources)
-// {
-// await source.DisposeAsync();
-// }
-// }
-//
-// public T Current => _currentEnumerator.Current;
-// }
-// #endif
-// #if EFCORE2
-//
-// internal class OrderAsyncEnumerator : IAsyncEnumerator
-// {
-// private readonly StreamMergeContext _mergeContext;
-// private readonly List> _sources;
-// private readonly PriorityQueue> _queue;
-// private bool skipFirst;
-// private IAsyncEnumerator _currentEnumerator;
-//
-// public OrderAsyncEnumerator(StreamMergeContext mergeContext, List> sources)
-// {
-// _mergeContext = mergeContext;
-// _sources = sources;
-// _queue = new PriorityQueue>(sources.Count);
-// skipFirst = true;
-// SetOrderEnumerator();
-// }
-//
-// private void SetOrderEnumerator()
-// {
-// foreach (var source in _sources)
-// {
-// var orderMergeItem = new OrderMergeItem(_mergeContext, source);
-// if (null!=orderMergeItem.GetCurrentEnumerator().Current)
-// _queue.Offer(orderMergeItem);
-// }
-//
-// _currentEnumerator = _queue.IsEmpty() ? _sources.FirstOrDefault() : _queue.Peek().GetCurrentEnumerator();
-// }
-//
-// public async Task MoveNext(CancellationToken cancellationToken)
-// {
-// if (_queue.IsEmpty())
-// return false;
-// if (skipFirst)
-// {
-// skipFirst = false;
-// return true;
-// }
-//
-// var first = _queue.Poll();
-// if (await first.MoveNextAsync())
-// {
-// _queue.Offer(first);
-// }
-//
-// if (_queue.IsEmpty())
-// {
-// return false;
-// }
-//
-// _currentEnumerator = _queue.Peek().GetCurrentEnumerator();
-// return true;
-// }
-//
-// public void Dispose()
-// {
-// foreach (var source in _sources)
-// {
-// source.Dispose();
-// }
-// }
-//
-//
-// public T Current => _currentEnumerator.Current;
-// }
-// #endif
-// }
\ No newline at end of file
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/ListMerge/OrderMergeItem.cs b/src/ShardingCore/Core/Internal/StreamMerge/ListMerge/OrderMergeItem.cs
deleted file mode 100644
index 5ee0dca1..00000000
--- a/src/ShardingCore/Core/Internal/StreamMerge/ListMerge/OrderMergeItem.cs
+++ /dev/null
@@ -1,158 +0,0 @@
-// using System;
-// using System.Collections.Generic;
-// using System.Linq;
-// using System.Threading.Tasks;
-// using ShardingCore.Extensions;
-//
-// namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
-// {
-// /*
-// * @Author: xjm
-// * @Description:
-// * @Date: Monday, 25 January 2021 11:33:57
-// * @Email: 326308290@qq.com
-// */
-// #if !EFCORE2
-//
-// internal class OrderMergeItem : IComparable>, IAsyncDisposable
-// {
-// ///
-// /// 合并数据上下文
-// ///
-// private readonly StreamMergeContext _mergeContext;
-//
-// private readonly IAsyncEnumerator _source;
-// private List _orderValues;
-//
-// public OrderMergeItem(StreamMergeContext mergeContext, IAsyncEnumerator source)
-// {
-// _mergeContext = mergeContext;
-// _source = source;
-// SetOrderValues(null!=GetCurrentEnumerator().Current);
-// }
-//
-// public IAsyncEnumerator GetCurrentEnumerator() => _source;
-//
-// private void SetOrderValues(bool hasElement)
-// {
-// _orderValues = hasElement ? GetCurrentOrderValues() : new List(0);
-// }
-// public async Task MoveNextAsync()
-// {
-// var has = await _source.MoveNextAsync();
-// SetOrderValues(has);
-// return has;
-// }
-//
-// public bool HasElement()
-// {
-//
-// }
-//
-// private List GetCurrentOrderValues()
-// {
-// if (!_mergeContext.Orders.Any())
-// return new List(0);
-// var list = new List(_mergeContext.Orders.Count());
-// foreach (var order in _mergeContext.Orders)
-// {
-// var value = GetCurrentEnumerator().Current.GetValueByExpression(order.PropertyExpression);
-// if (value is IComparable comparable)
-// list.Add(comparable);
-// else
-// throw new NotSupportedException($"order by value [{order}] must implements IComparable");
-// }
-//
-// return list;
-// }
-//
-// public int CompareTo(OrderMergeItem other)
-// {
-// int i = 0;
-// foreach (var order in _mergeContext.Orders) {
-// int result = CompareHelper.CompareToWith(_orderValues[i], other._orderValues[i], order.IsAsc);
-// if (0 != result) {
-// return result;
-// }
-// i++;
-// }
-// return 0;
-// }
-//
-// public ValueTask DisposeAsync()
-// {
-// return _source.DisposeAsync();
-// }
-// }
-// #endif
-// #if EFCORE2
-//
-// internal class OrderMergeItem : IComparable>, IDisposable
-// {
-// ///
-// /// 合并数据上下文
-// ///
-// private readonly StreamMergeContext _mergeContext;
-//
-// private readonly IAsyncEnumerator _source;
-// private List _orderValues;
-//
-// public OrderMergeItem(StreamMergeContext mergeContext, IAsyncEnumerator source)
-// {
-// _mergeContext = mergeContext;
-// _source = source;
-// SetOrderValues(null!=GetCurrentEnumerator().Current);
-// }
-//
-// public IAsyncEnumerator GetCurrentEnumerator() => _source;
-//
-// private void SetOrderValues(bool hasElement)
-// {
-// _orderValues = hasElement ? GetCurrentOrderValues() : new List(0);
-// }
-//
-// public async Task MoveNextAsync()
-// {
-// var has = await _source.MoveNext();
-// SetOrderValues(has);
-// return has;
-// }
-//
-// private List GetCurrentOrderValues()
-// {
-// if (!_mergeContext.Orders.Any())
-// return new List(0);
-// var list = new List(_mergeContext.Orders.Count());
-// foreach (var order in _mergeContext.Orders)
-// {
-// var value = GetCurrentEnumerator().Current.GetValueByExpression(order.PropertyExpression);
-// if (value is IComparable comparable)
-// list.Add(comparable);
-// else
-// throw new NotSupportedException($"order by value [{order}] must implements IComparable");
-// }
-//
-// return list;
-// }
-//
-// public int CompareTo(OrderMergeItem other)
-// {
-// int i = 0;
-// foreach (var order in _mergeContext.Orders) {
-// int result = CompareHelper.CompareToWith(_orderValues[i], other._orderValues[i], order.IsAsc);
-// if (0 != result) {
-// return result;
-// }
-// i++;
-// }
-// return 0;
-// }
-//
-//
-// public void Dispose()
-// {
-// _source.Dispose();
-// }
-// }
-// #endif
-// }
\ No newline at end of file
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/ListMerge/StreamMergeListEngine.cs b/src/ShardingCore/Core/Internal/StreamMerge/ListMerge/StreamMergeListEngine.cs
deleted file mode 100644
index eff8d877..00000000
--- a/src/ShardingCore/Core/Internal/StreamMerge/ListMerge/StreamMergeListEngine.cs
+++ /dev/null
@@ -1,66 +0,0 @@
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
-using Microsoft.EntityFrameworkCore;
-using ShardingCore.Core.Internal.StreamMerge.Abstractions;
-using ShardingCore.Core.Internal.StreamMerge.Enumerators;
-using ShardingCore.Extensions;
-
-namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
-{
-/*
-* @Author: xjm
-* @Description:
-* @Date: Monday, 25 January 2021 07:57:59
-* @Email: 326308290@qq.com
-*/
- internal class StreamMergeListEngine
- {
- private const int defaultCapacity = 0x10;//默认容量为16
- private readonly StreamMergeContext _mergeContext;
- private readonly IStreamMergeAsyncEnumerator _streamMergeAsyncEnumerator;
-
- public StreamMergeListEngine(StreamMergeContext mergeContext,IEnumerable> sources)
- {
- _mergeContext = mergeContext;
- _streamMergeAsyncEnumerator = new MultiOrderStreamMergeAsyncEnumerator(_mergeContext,sources);
- }
-
- public async Task> Execute()
- {
- //如果合并数据的时候不需要跳过也没有take多少那么就是直接next
- var skip = _mergeContext.Skip;
- var take = _mergeContext.Take;
- var list = new List(skip.GetValueOrDefault() + take ?? defaultCapacity);
- var realSkip = 0;
- var realTake = 0;
-#if !EFCORE2
- while (await _streamMergeAsyncEnumerator.MoveNextAsync())
-#endif
-#if EFCORE2
- while (await enumerator.MoveNextAsync())
-#endif
- {
- //获取真实的需要跳过的条数
- if (skip.HasValue)
- {
- if (realSkip < skip)
- {
- realSkip++;
- continue;
- }
- }
- list.Add(_streamMergeAsyncEnumerator.Current);
- if (take.HasValue)
- {
- realTake++;
- if(realTake<=take.Value)
- break;
- }
- }
-
- return list;
- }
-
- }
-}
\ No newline at end of file
diff --git a/src/ShardingCore/Core/Internal/StreamMerge/ListSourceMerges/StreamMergeListSourceEngine.cs b/src/ShardingCore/Core/Internal/StreamMerge/ListSourceMerges/StreamMergeListSourceEngine.cs
deleted file mode 100644
index 566e8457..00000000
--- a/src/ShardingCore/Core/Internal/StreamMerge/ListSourceMerges/StreamMergeListSourceEngine.cs
+++ /dev/null
@@ -1,42 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
-using Microsoft.EntityFrameworkCore;
-using ShardingCore.Core.Internal.RoutingRuleEngines;
-using ShardingCore.Core.Internal.StreamMerge.GenericMerges;
-using ShardingCore.Core.Internal.StreamMerge.ListMerge;
-using ShardingCore.Core.ShardingAccessors;
-using ShardingCore.Extensions;
-#if EFCORE2
-using Microsoft.EntityFrameworkCore.Extensions.Internal;
-#endif
-
-namespace ShardingCore.Core.Internal.StreamMerge.ListSourceMerges
-{
-/*
-* @Author: xjm
-* @Description:
-* @Date: Thursday, 28 January 2021 14:45:19
-* @Email: 326308290@qq.com
-*/
- internal class StreamMergeListSourceEngine
- {
- private readonly StreamMergeContext _mergeContext;
-
- public StreamMergeListSourceEngine(StreamMergeContext mergeContext)
- {
- _mergeContext = mergeContext;
- }
-
- public async Task> Execute()
- {
- using (var engine =new GenericStreamMergeEngine(_mergeContext))
- {
- var enumerators = await engine.GetStreamEnumerator();
-
- return await new StreamMergeListEngine(_mergeContext, enumerators).Execute();
- }
- }
- }
-}
\ No newline at end of file
diff --git a/src/ShardingCore/Core/ShardingQueryable.cs b/src/ShardingCore/Core/ShardingQueryable.cs
index 0f6c96e4..6d6d6097 100644
--- a/src/ShardingCore/Core/ShardingQueryable.cs
+++ b/src/ShardingCore/Core/ShardingQueryable.cs
@@ -8,12 +8,7 @@ using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge;
using ShardingCore.Core.Internal.StreamMerge.GenericMerges;
-using ShardingCore.Core.Internal.StreamMerge.ListMerge;
-using ShardingCore.Core.Internal.StreamMerge.ListSourceMerges;
-using ShardingCore.Core.ShardingAccessors;
-using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualTables;
-using ShardingCore.DbContexts;
using ShardingCore.Extensions;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
@@ -31,6 +26,7 @@ namespace ShardingCore.Core
/// 分表查询构造器
///
///
+
public class ShardingQueryable : IShardingQueryable
{
private IQueryable _source;
@@ -41,13 +37,18 @@ namespace ShardingCore.Core
private readonly IRoutingRuleEngineFactory _routingRuleEngineFactory;
- public ShardingQueryable(IQueryable source)
+ private ShardingQueryable(IQueryable source)
{
_source = source;
_streamMergeContextFactory = ShardingContainer.Services.GetService();
_routingRuleEngineFactory=ShardingContainer.Services.GetService();
}
+ public static ShardingQueryable Create(IQueryable source)
+ {
+ return new ShardingQueryable(source);
+ }
+
public IShardingQueryable EnableAutoRouteParse()
{
@@ -104,7 +105,7 @@ namespace ShardingCore.Core
}
private async Task> GetGenericMergeEngine(Func> efQuery)
{
- return await new GenericMergeEngine(GetContext()).Execute(efQuery);
+ return await GenericInMemoryMergeEngine.Create(GetContext()).ExecuteAsync(efQuery);
}
public async Task CountAsync()
@@ -120,12 +121,12 @@ namespace ShardingCore.Core
}
- public async Task> ToListAsync()
+ public async Task> ToListAsync(int capacity=20)
{
var context = GetContext();
using (var engine = GenericStreamMergeProxyEngine.Create(context))
{
- return await engine.ToListAsync();
+ return await engine.ToListAsync(capacity);
}
}
@@ -133,10 +134,13 @@ namespace ShardingCore.Core
public async Task FirstOrDefaultAsync()
{
var context = GetContext();
- using (var engine = GenericStreamMergeProxyEngine.Create(context))
- {
- return await engine.FirstOrDefaultAsync();
- }
+ var result= await GenericInMemoryMergeEngine.Create(context).ExecuteAsync(async queryable => await EntityFrameworkQueryableExtensions.FirstOrDefaultAsync((IQueryable) queryable));
+ var q = result.Where(o => o != null).AsQueryable();
+ if (context.Orders.Any())
+ return q.OrderWithExpression(context.Orders).FirstOrDefault();
+
+ return q.FirstOrDefault();
+
}
public async Task AnyAsync()
@@ -145,6 +149,7 @@ namespace ShardingCore.Core
.Any(o => o);
}
+
public async Task MaxAsync()
{
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.MaxAsync((IQueryable) queryable));
@@ -197,6 +202,28 @@ namespace ShardingCore.Core
return results.Sum();
}
+ public async Task DecimalAverageAsync()
+ {
+ if (typeof(T) != typeof(decimal))
+ throw new InvalidOperationException($"{typeof(T)} cast to decimal failed");
+ var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.AverageAsync((IQueryable) queryable));
+ return results.Sum()/results.Count();
+ }
+ public async Task DoubleAverageAsync()
+ {
+ if (typeof(T) != typeof(double))
+ throw new InvalidOperationException($"{typeof(T)} cast to double failed");
+ var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.AverageAsync((IQueryable) queryable));
+ return results.Sum()/results.Count();
+ }
+
+ public async Task FloatAverageAsync()
+ {
+ if (typeof(T) != typeof(float))
+ throw new InvalidOperationException($"{typeof(T)} cast to float failed");
+ var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.AverageAsync((IQueryable) queryable));
+ return results.Sum()/results.Count();
+ }
}
}
\ No newline at end of file
diff --git a/src/ShardingCore/Extensions/ShardingExtension.cs b/src/ShardingCore/Extensions/ShardingExtension.cs
index 2416e597..70db92ad 100644
--- a/src/ShardingCore/Extensions/ShardingExtension.cs
+++ b/src/ShardingCore/Extensions/ShardingExtension.cs
@@ -23,11 +23,11 @@ namespace ShardingCore.Extensions
///
public static IShardingQueryable AsSharding(this IQueryable source)
{
- return new ShardingQueryable(source);
+ return ShardingQueryable.Create(source);
}
public static async Task ShardingAnyAsync(this IQueryable source, Expression> predicate)
{
- return await new ShardingQueryable(source.Where(predicate)).AnyAsync();
+ return await ShardingQueryable.Create(source.Where(predicate)).AnyAsync();
}
///
/// 分页
@@ -37,7 +37,7 @@ namespace ShardingCore.Extensions
///
///
///
- public static async Task> ToShardingPageResultAsync(this IQueryable source, int pageIndex, int pageSize)
+ public static async Task> ShardingPageResultAsync(this IQueryable source, int pageIndex, int pageSize)
{
//设置每次获取多少页
var take = pageSize <= 0 ? 1 : pageSize;
@@ -46,7 +46,7 @@ namespace ShardingCore.Extensions
//需要跳过多少页
var skip = (index - 1) * take;
//获取每次总记录数
- var count = await new ShardingQueryable(source).CountAsync();
+ var count = await ShardingQueryable.Create(source).CountAsync();
//当数据库数量小于要跳过的条数就说明没数据直接返回不在查询list
if (count <= skip)
@@ -55,7 +55,7 @@ namespace ShardingCore.Extensions
int remainingCount = count - skip;
//当剩余条数小于take数就取remainingCount
var realTake = remainingCount < take ? remainingCount : take;
- var data = await new ShardingQueryable(source.Skip(skip).Take(realTake)).ToListAsync();
+ var data = await ShardingQueryable.Create(source.Skip(skip).Take(realTake)).ToListAsync(realTake);
return new ShardingPagedResult(data, count);
}
///
@@ -66,7 +66,7 @@ namespace ShardingCore.Extensions
///
public static List ToShardingList(this IQueryable source)
{
- return new ShardingQueryable(source).ToList();
+ return ShardingQueryable.Create(source).ToList();
}
///
/// 分页
@@ -76,7 +76,7 @@ namespace ShardingCore.Extensions
///
public static async Task> ToShardingListAsync(this IQueryable source)
{
- return await new ShardingQueryable(source).ToListAsync();
+ return await ShardingQueryable.Create(source).ToListAsync();
}
///
/// 第一条
@@ -86,7 +86,7 @@ namespace ShardingCore.Extensions
///
public static T ShardingFirstOrDefault(this IQueryable source)
{
- return new ShardingQueryable(source).FirstOrDefault();
+ return ShardingQueryable